1use std::collections::HashMap;
16use std::sync::Arc;
17use std::sync::atomic::AtomicUsize;
18
19use common_telemetry::{debug, warn};
20use datatypes::arrow::record_batch::RecordBatch;
21use datatypes::schema::SkippingIndexType;
22use datatypes::vectors::Helper;
23use index::bloom_filter::creator::BloomFilterCreator;
24use mito_codec::index::{IndexValueCodec, IndexValuesCodec};
25use mito_codec::row_converter::SortField;
26use puffin::puffin_manager::{PuffinWriter, PutOptions};
27use snafu::{ResultExt, ensure};
28use store_api::metadata::RegionMetadataRef;
29use store_api::storage::{ColumnId, FileId};
30use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
31
32use crate::error::{
33 BiErrorsSnafu, BloomFilterFinishSnafu, EncodeSnafu, IndexOptionsSnafu,
34 OperateAbortedIndexSnafu, PuffinAddBlobSnafu, PushBloomFilterValueSnafu, Result,
35};
36use crate::read::Batch;
37use crate::sst::index::TYPE_BLOOM_FILTER_INDEX;
38use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
39use crate::sst::index::intermediate::{
40 IntermediateLocation, IntermediateManager, TempFileProvider,
41};
42use crate::sst::index::puffin_manager::SstPuffinWriter;
43use crate::sst::index::statistics::{ByteCount, RowCount, Statistics};
44
45const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
47
48pub struct BloomFilterIndexer {
50 creators: HashMap<ColumnId, BloomFilterCreator>,
52
53 temp_file_provider: Arc<TempFileProvider>,
55
56 codec: IndexValuesCodec,
58
59 aborted: bool,
61
62 stats: Statistics,
64
65 global_memory_usage: Arc<AtomicUsize>,
67
68 metadata: RegionMetadataRef,
70}
71
72impl BloomFilterIndexer {
73 pub fn new(
75 sst_file_id: FileId,
76 metadata: &RegionMetadataRef,
77 intermediate_manager: IntermediateManager,
78 memory_usage_threshold: Option<usize>,
79 ) -> Result<Option<Self>> {
80 let mut creators = HashMap::new();
81
82 let temp_file_provider = Arc::new(TempFileProvider::new(
83 IntermediateLocation::new(&metadata.region_id, &sst_file_id),
84 intermediate_manager,
85 ));
86 let global_memory_usage = Arc::new(AtomicUsize::new(0));
87
88 for column in &metadata.column_metadatas {
89 let options =
90 column
91 .column_schema
92 .skipping_index_options()
93 .context(IndexOptionsSnafu {
94 column_name: &column.column_schema.name,
95 })?;
96
97 let options = match options {
98 Some(options) if options.index_type == SkippingIndexType::BloomFilter => options,
99 _ => continue,
100 };
101
102 let creator = BloomFilterCreator::new(
103 options.granularity as _,
104 options.false_positive_rate(),
105 temp_file_provider.clone(),
106 global_memory_usage.clone(),
107 memory_usage_threshold,
108 );
109 creators.insert(column.column_id, creator);
110 }
111
112 if creators.is_empty() {
113 return Ok(None);
114 }
115
116 let codec = IndexValuesCodec::from_tag_columns(
117 metadata.primary_key_encoding,
118 metadata.primary_key_columns(),
119 );
120 let indexer = Self {
121 creators,
122 temp_file_provider,
123 codec,
124 aborted: false,
125 stats: Statistics::new(TYPE_BLOOM_FILTER_INDEX),
126 global_memory_usage,
127 metadata: metadata.clone(),
128 };
129 Ok(Some(indexer))
130 }
131
132 pub async fn update(&mut self, batch: &mut Batch) -> Result<()> {
137 ensure!(!self.aborted, OperateAbortedIndexSnafu);
138
139 if self.creators.is_empty() {
140 return Ok(());
141 }
142
143 if let Err(update_err) = self.do_update(batch).await {
144 if let Err(err) = self.do_cleanup().await {
146 if cfg!(any(test, feature = "test")) {
147 panic!("Failed to clean up index creator, err: {err:?}",);
148 } else {
149 warn!(err; "Failed to clean up index creator");
150 }
151 }
152 return Err(update_err);
153 }
154
155 Ok(())
156 }
157
158 pub async fn update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
160 ensure!(!self.aborted, OperateAbortedIndexSnafu);
161
162 if self.creators.is_empty() || batch.num_rows() == 0 {
163 return Ok(());
164 }
165
166 if let Err(update_err) = self.do_update_flat(batch).await {
167 if let Err(err) = self.do_cleanup().await {
169 if cfg!(any(test, feature = "test")) {
170 panic!("Failed to clean up index creator, err: {err:?}",);
171 } else {
172 warn!(err; "Failed to clean up index creator");
173 }
174 }
175 return Err(update_err);
176 }
177
178 Ok(())
179 }
180
181 pub async fn finish(
186 &mut self,
187 puffin_writer: &mut SstPuffinWriter,
188 ) -> Result<(RowCount, ByteCount)> {
189 ensure!(!self.aborted, OperateAbortedIndexSnafu);
190
191 if self.stats.row_count() == 0 {
192 return Ok((0, 0));
194 }
195
196 let finish_res = self.do_finish(puffin_writer).await;
197 if let Err(err) = self.do_cleanup().await {
199 if cfg!(any(test, feature = "test")) {
200 panic!("Failed to clean up index creator, err: {err:?}",);
201 } else {
202 warn!(err; "Failed to clean up index creator");
203 }
204 }
205
206 finish_res.map(|_| (self.stats.row_count(), self.stats.byte_count()))
207 }
208
209 pub async fn abort(&mut self) -> Result<()> {
213 if self.aborted {
214 return Ok(());
215 }
216 self.aborted = true;
217
218 self.do_cleanup().await
219 }
220
221 async fn do_update(&mut self, batch: &mut Batch) -> Result<()> {
222 let mut guard = self.stats.record_update();
223
224 let n = batch.num_rows();
225 guard.inc_row_count(n);
226
227 for (col_id, creator) in &mut self.creators {
228 match self.codec.pk_col_info(*col_id) {
229 Some(col_info) => {
231 let pk_idx = col_info.idx;
232 let field = &col_info.field;
233 let elems = batch
234 .pk_col_value(self.codec.decoder(), pk_idx, *col_id)?
235 .filter(|v| !v.is_null())
236 .map(|v| {
237 let mut buf = vec![];
238 IndexValueCodec::encode_nonnull_value(
239 v.as_value_ref(),
240 field,
241 &mut buf,
242 )
243 .context(EncodeSnafu)?;
244 Ok(buf)
245 })
246 .transpose()?;
247 creator
248 .push_n_row_elems(n, elems)
249 .await
250 .context(PushBloomFilterValueSnafu)?;
251 }
252 None => {
254 let Some(values) = batch.field_col_value(*col_id) else {
255 debug!(
256 "Column {} not found in the batch during building bloom filter index",
257 col_id
258 );
259 continue;
260 };
261 let sort_field = SortField::new(values.data.data_type());
262 for i in 0..n {
263 let value = values.data.get_ref(i);
264 let elems = (!value.is_null())
265 .then(|| {
266 let mut buf = vec![];
267 IndexValueCodec::encode_nonnull_value(value, &sort_field, &mut buf)
268 .context(EncodeSnafu)?;
269 Ok(buf)
270 })
271 .transpose()?;
272
273 creator
274 .push_row_elems(elems)
275 .await
276 .context(PushBloomFilterValueSnafu)?;
277 }
278 }
279 }
280 }
281
282 Ok(())
283 }
284
285 async fn do_update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
286 let mut guard = self.stats.record_update();
287
288 let n = batch.num_rows();
289 guard.inc_row_count(n);
290
291 for (col_id, creator) in &mut self.creators {
292 if let Some(column_meta) = self.metadata.column_by_id(*col_id) {
294 let column_name = &column_meta.column_schema.name;
295
296 if let Some(column_array) = batch.column_by_name(column_name) {
298 let vector = Helper::try_into_vector(column_array.clone())
300 .context(crate::error::ConvertVectorSnafu)?;
301 let sort_field = SortField::new(vector.data_type());
302
303 for i in 0..n {
304 let value = vector.get_ref(i);
305 let elems = (!value.is_null())
306 .then(|| {
307 let mut buf = vec![];
308 IndexValueCodec::encode_nonnull_value(value, &sort_field, &mut buf)
309 .context(EncodeSnafu)?;
310 Ok(buf)
311 })
312 .transpose()?;
313
314 creator
315 .push_row_elems(elems)
316 .await
317 .context(PushBloomFilterValueSnafu)?;
318 }
319 } else {
320 debug!(
321 "Column {} not found in the batch during building bloom filter index",
322 column_name
323 );
324 for _ in 0..n {
326 creator
327 .push_row_elems(None)
328 .await
329 .context(PushBloomFilterValueSnafu)?;
330 }
331 }
332 }
333 }
334
335 Ok(())
336 }
337
338 async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> {
340 let mut guard = self.stats.record_finish();
341
342 for (id, creator) in &mut self.creators {
343 let written_bytes = Self::do_finish_single_creator(id, creator, puffin_writer).await?;
344 guard.inc_byte_count(written_bytes);
345 }
346
347 Ok(())
348 }
349
350 async fn do_cleanup(&mut self) -> Result<()> {
351 let mut _guard = self.stats.record_cleanup();
352
353 self.creators.clear();
354 self.temp_file_provider.cleanup().await
355 }
356
357 async fn do_finish_single_creator(
378 col_id: &ColumnId,
379 creator: &mut BloomFilterCreator,
380 puffin_writer: &mut SstPuffinWriter,
381 ) -> Result<ByteCount> {
382 let (tx, rx) = tokio::io::duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);
383
384 let blob_name = format!("{}-{}", INDEX_BLOB_TYPE, col_id);
385 let (index_finish, puffin_add_blob) = futures::join!(
386 creator.finish(tx.compat_write()),
387 puffin_writer.put_blob(
388 &blob_name,
389 rx.compat(),
390 PutOptions::default(),
391 Default::default(),
392 )
393 );
394
395 match (
396 puffin_add_blob.context(PuffinAddBlobSnafu),
397 index_finish.context(BloomFilterFinishSnafu),
398 ) {
399 (Err(e1), Err(e2)) => BiErrorsSnafu {
400 first: Box::new(e1),
401 second: Box::new(e2),
402 }
403 .fail()?,
404
405 (Ok(_), e @ Err(_)) => e?,
406 (e @ Err(_), Ok(_)) => e.map(|_| ())?,
407 (Ok(written_bytes), Ok(_)) => {
408 return Ok(written_bytes);
409 }
410 }
411
412 Ok(0)
413 }
414
415 pub fn memory_usage(&self) -> usize {
417 self.global_memory_usage
418 .load(std::sync::atomic::Ordering::Relaxed)
419 }
420
421 pub fn column_ids(&self) -> impl Iterator<Item = ColumnId> + use<'_> {
423 self.creators.keys().copied()
424 }
425}
426
427#[cfg(test)]
428pub(crate) mod tests {
429
430 use api::v1::SemanticType;
431 use datatypes::data_type::ConcreteDataType;
432 use datatypes::schema::{ColumnSchema, SkippingIndexOptions};
433 use datatypes::value::ValueRef;
434 use datatypes::vectors::{UInt8Vector, UInt64Vector};
435 use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl};
436 use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
437 use object_store::ObjectStore;
438 use object_store::services::Memory;
439 use puffin::puffin_manager::{PuffinManager, PuffinReader};
440 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
441 use store_api::storage::RegionId;
442
443 use super::*;
444 use crate::access_layer::FilePathProvider;
445 use crate::read::BatchColumn;
446 use crate::sst::file::RegionFileId;
447 use crate::sst::index::puffin_manager::PuffinManagerFactory;
448
449 pub fn mock_object_store() -> ObjectStore {
450 ObjectStore::new(Memory::default()).unwrap().finish()
451 }
452
453 pub async fn new_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
454 IntermediateManager::init_fs(path).await.unwrap()
455 }
456
457 pub struct TestPathProvider;
458
459 impl FilePathProvider for TestPathProvider {
460 fn build_index_file_path(&self, file_id: RegionFileId) -> String {
461 file_id.file_id().to_string()
462 }
463
464 fn build_sst_file_path(&self, file_id: RegionFileId) -> String {
465 file_id.file_id().to_string()
466 }
467 }
468
469 pub fn mock_region_metadata() -> RegionMetadataRef {
486 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
487 builder
488 .push_column_metadata(ColumnMetadata {
489 column_schema: ColumnSchema::new(
490 "tag_str",
491 ConcreteDataType::string_datatype(),
492 false,
493 )
494 .with_skipping_options(SkippingIndexOptions::new_unchecked(
495 2,
496 0.01,
497 SkippingIndexType::BloomFilter,
498 ))
499 .unwrap(),
500 semantic_type: SemanticType::Tag,
501 column_id: 1,
502 })
503 .push_column_metadata(ColumnMetadata {
504 column_schema: ColumnSchema::new(
505 "ts",
506 ConcreteDataType::timestamp_millisecond_datatype(),
507 false,
508 ),
509 semantic_type: SemanticType::Timestamp,
510 column_id: 2,
511 })
512 .push_column_metadata(ColumnMetadata {
513 column_schema: ColumnSchema::new(
514 "field_u64",
515 ConcreteDataType::uint64_datatype(),
516 false,
517 )
518 .with_skipping_options(SkippingIndexOptions::new_unchecked(
519 4,
520 0.01,
521 SkippingIndexType::BloomFilter,
522 ))
523 .unwrap(),
524 semantic_type: SemanticType::Field,
525 column_id: 3,
526 })
527 .primary_key(vec![1]);
528
529 Arc::new(builder.build().unwrap())
530 }
531
532 pub fn new_batch(str_tag: impl AsRef<str>, u64_field: impl IntoIterator<Item = u64>) -> Batch {
533 let fields = vec![(0, SortField::new(ConcreteDataType::string_datatype()))];
534 let codec = DensePrimaryKeyCodec::with_fields(fields);
535 let row: [ValueRef; 1] = [str_tag.as_ref().into()];
536 let primary_key = codec.encode(row.into_iter()).unwrap();
537
538 let u64_field = BatchColumn {
539 column_id: 3,
540 data: Arc::new(UInt64Vector::from_iter_values(u64_field)),
541 };
542 let num_rows = u64_field.data.len();
543
544 Batch::new(
545 primary_key,
546 Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
547 0, num_rows,
548 ))),
549 Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
550 0, num_rows,
551 ))),
552 Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(
553 1, num_rows,
554 ))),
555 vec![u64_field],
556 )
557 .unwrap()
558 }
559
560 #[tokio::test]
561 async fn test_bloom_filter_indexer() {
562 let prefix = "test_bloom_filter_indexer_";
563 let tempdir = common_test_util::temp_dir::create_temp_dir(prefix);
564 let object_store = mock_object_store();
565 let intm_mgr = new_intm_mgr(tempdir.path().to_string_lossy()).await;
566 let region_metadata = mock_region_metadata();
567 let memory_usage_threshold = Some(1024);
568
569 let file_id = FileId::random();
570 let mut indexer =
571 BloomFilterIndexer::new(file_id, ®ion_metadata, intm_mgr, memory_usage_threshold)
572 .unwrap()
573 .unwrap();
574
575 let mut batch = new_batch("tag1", 0..10);
577 indexer.update(&mut batch).await.unwrap();
578
579 let mut batch = new_batch("tag2", 10..20);
580 indexer.update(&mut batch).await.unwrap();
581
582 let (_d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
583 let puffin_manager = factory.build(object_store, TestPathProvider);
584
585 let file_id = RegionFileId::new(region_metadata.region_id, file_id);
586 let mut puffin_writer = puffin_manager.writer(&file_id).await.unwrap();
587 let (row_count, byte_count) = indexer.finish(&mut puffin_writer).await.unwrap();
588 assert_eq!(row_count, 20);
589 assert!(byte_count > 0);
590 puffin_writer.finish().await.unwrap();
591
592 let puffin_reader = puffin_manager.reader(&file_id).await.unwrap();
593
594 {
596 let blob_guard = puffin_reader
597 .blob("greptime-bloom-filter-v1-1")
598 .await
599 .unwrap();
600 let reader = blob_guard.reader().await.unwrap();
601 let bloom_filter = BloomFilterReaderImpl::new(reader);
602 let metadata = bloom_filter.metadata().await.unwrap();
603
604 assert_eq!(metadata.segment_count, 10);
605 for i in 0..5 {
606 let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[i] as usize];
607 let bf = bloom_filter.bloom_filter(loc).await.unwrap();
608 assert!(bf.contains(b"tag1"));
609 }
610 for i in 5..10 {
611 let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[i] as usize];
612 let bf = bloom_filter.bloom_filter(loc).await.unwrap();
613 assert!(bf.contains(b"tag2"));
614 }
615 }
616
617 {
619 let sort_field = SortField::new(ConcreteDataType::uint64_datatype());
620
621 let blob_guard = puffin_reader
622 .blob("greptime-bloom-filter-v1-3")
623 .await
624 .unwrap();
625 let reader = blob_guard.reader().await.unwrap();
626 let bloom_filter = BloomFilterReaderImpl::new(reader);
627 let metadata = bloom_filter.metadata().await.unwrap();
628
629 assert_eq!(metadata.segment_count, 5);
630 for i in 0u64..20 {
631 let idx = i as usize / 4;
632 let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[idx] as usize];
633 let bf = bloom_filter.bloom_filter(loc).await.unwrap();
634 let mut buf = vec![];
635 IndexValueCodec::encode_nonnull_value(ValueRef::UInt64(i), &sort_field, &mut buf)
636 .unwrap();
637
638 assert!(bf.contains(&buf));
639 }
640 }
641 }
642}