1use std::collections::HashMap;
16use std::sync::Arc;
17use std::sync::atomic::AtomicUsize;
18
19use api::v1::SemanticType;
20use common_telemetry::{debug, warn};
21use datatypes::arrow::record_batch::RecordBatch;
22use datatypes::schema::SkippingIndexType;
23use datatypes::vectors::Helper;
24use index::bloom_filter::creator::BloomFilterCreator;
25use index::target::IndexTarget;
26use mito_codec::index::{IndexValueCodec, IndexValuesCodec};
27use mito_codec::row_converter::{CompositeValues, SortField};
28use puffin::puffin_manager::{PuffinWriter, PutOptions};
29use snafu::{ResultExt, ensure};
30use store_api::codec::PrimaryKeyEncoding;
31use store_api::metadata::RegionMetadataRef;
32use store_api::storage::{ColumnId, FileId};
33use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
34
35use crate::error::{
36 BiErrorsSnafu, BloomFilterFinishSnafu, EncodeSnafu, IndexOptionsSnafu,
37 OperateAbortedIndexSnafu, PuffinAddBlobSnafu, PushBloomFilterValueSnafu, Result,
38};
39use crate::read::Batch;
40use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
41use crate::sst::index::intermediate::{
42 IntermediateLocation, IntermediateManager, TempFileProvider,
43};
44use crate::sst::index::puffin_manager::SstPuffinWriter;
45use crate::sst::index::statistics::{ByteCount, RowCount, Statistics};
46use crate::sst::index::{TYPE_BLOOM_FILTER_INDEX, decode_primary_keys_with_counts};
47
48const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
50
51pub struct BloomFilterIndexer {
53 creators: HashMap<ColumnId, BloomFilterCreator>,
55
56 temp_file_provider: Arc<TempFileProvider>,
58
59 codec: IndexValuesCodec,
61
62 aborted: bool,
64
65 stats: Statistics,
67
68 global_memory_usage: Arc<AtomicUsize>,
70
71 metadata: RegionMetadataRef,
73}
74
75impl BloomFilterIndexer {
76 pub fn new(
78 sst_file_id: FileId,
79 metadata: &RegionMetadataRef,
80 intermediate_manager: IntermediateManager,
81 memory_usage_threshold: Option<usize>,
82 ) -> Result<Option<Self>> {
83 let mut creators = HashMap::new();
84
85 let temp_file_provider = Arc::new(TempFileProvider::new(
86 IntermediateLocation::new(&metadata.region_id, &sst_file_id),
87 intermediate_manager,
88 ));
89 let global_memory_usage = Arc::new(AtomicUsize::new(0));
90
91 for column in &metadata.column_metadatas {
92 let options =
93 column
94 .column_schema
95 .skipping_index_options()
96 .context(IndexOptionsSnafu {
97 column_name: &column.column_schema.name,
98 })?;
99
100 let options = match options {
101 Some(options) if options.index_type == SkippingIndexType::BloomFilter => options,
102 _ => continue,
103 };
104
105 let creator = BloomFilterCreator::new(
106 options.granularity as _,
107 options.false_positive_rate(),
108 temp_file_provider.clone(),
109 global_memory_usage.clone(),
110 memory_usage_threshold,
111 );
112 creators.insert(column.column_id, creator);
113 }
114
115 if creators.is_empty() {
116 return Ok(None);
117 }
118
119 let codec = IndexValuesCodec::from_tag_columns(
120 metadata.primary_key_encoding,
121 metadata.primary_key_columns(),
122 );
123 let indexer = Self {
124 creators,
125 temp_file_provider,
126 codec,
127 aborted: false,
128 stats: Statistics::new(TYPE_BLOOM_FILTER_INDEX),
129 global_memory_usage,
130 metadata: metadata.clone(),
131 };
132 Ok(Some(indexer))
133 }
134
135 pub async fn update(&mut self, batch: &mut Batch) -> Result<()> {
140 ensure!(!self.aborted, OperateAbortedIndexSnafu);
141
142 if self.creators.is_empty() {
143 return Ok(());
144 }
145
146 if let Err(update_err) = self.do_update(batch).await {
147 if let Err(err) = self.do_cleanup().await {
149 if cfg!(any(test, feature = "test")) {
150 panic!("Failed to clean up index creator, err: {err:?}",);
151 } else {
152 warn!(err; "Failed to clean up index creator");
153 }
154 }
155 return Err(update_err);
156 }
157
158 Ok(())
159 }
160
161 pub async fn update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
163 ensure!(!self.aborted, OperateAbortedIndexSnafu);
164
165 if self.creators.is_empty() || batch.num_rows() == 0 {
166 return Ok(());
167 }
168
169 if let Err(update_err) = self.do_update_flat(batch).await {
170 if let Err(err) = self.do_cleanup().await {
172 if cfg!(any(test, feature = "test")) {
173 panic!("Failed to clean up index creator, err: {err:?}",);
174 } else {
175 warn!(err; "Failed to clean up index creator");
176 }
177 }
178 return Err(update_err);
179 }
180
181 Ok(())
182 }
183
184 pub async fn finish(
189 &mut self,
190 puffin_writer: &mut SstPuffinWriter,
191 ) -> Result<(RowCount, ByteCount)> {
192 ensure!(!self.aborted, OperateAbortedIndexSnafu);
193
194 if self.stats.row_count() == 0 {
195 return Ok((0, 0));
197 }
198
199 let finish_res = self.do_finish(puffin_writer).await;
200 if let Err(err) = self.do_cleanup().await {
202 if cfg!(any(test, feature = "test")) {
203 panic!("Failed to clean up index creator, err: {err:?}",);
204 } else {
205 warn!(err; "Failed to clean up index creator");
206 }
207 }
208
209 finish_res.map(|_| (self.stats.row_count(), self.stats.byte_count()))
210 }
211
212 pub async fn abort(&mut self) -> Result<()> {
216 if self.aborted {
217 return Ok(());
218 }
219 self.aborted = true;
220
221 self.do_cleanup().await
222 }
223
224 async fn do_update(&mut self, batch: &mut Batch) -> Result<()> {
225 let mut guard = self.stats.record_update();
226
227 let n = batch.num_rows();
228 guard.inc_row_count(n);
229
230 for (col_id, creator) in &mut self.creators {
231 match self.codec.pk_col_info(*col_id) {
232 Some(col_info) => {
234 let pk_idx = col_info.idx;
235 let field = &col_info.field;
236 let elems = batch
237 .pk_col_value(self.codec.decoder(), pk_idx, *col_id)?
238 .filter(|v| !v.is_null())
239 .map(|v| {
240 let mut buf = vec![];
241 IndexValueCodec::encode_nonnull_value(
242 v.as_value_ref(),
243 field,
244 &mut buf,
245 )
246 .context(EncodeSnafu)?;
247 Ok(buf)
248 })
249 .transpose()?;
250 creator
251 .push_n_row_elems(n, elems)
252 .await
253 .context(PushBloomFilterValueSnafu)?;
254 }
255 None => {
257 let Some(values) = batch.field_col_value(*col_id) else {
258 debug!(
259 "Column {} not found in the batch during building bloom filter index",
260 col_id
261 );
262 continue;
263 };
264 let sort_field = SortField::new(values.data.data_type());
265 for i in 0..n {
266 let value = values.data.get_ref(i);
267 let elems = (!value.is_null())
268 .then(|| {
269 let mut buf = vec![];
270 IndexValueCodec::encode_nonnull_value(value, &sort_field, &mut buf)
271 .context(EncodeSnafu)?;
272 Ok(buf)
273 })
274 .transpose()?;
275
276 creator
277 .push_row_elems(elems)
278 .await
279 .context(PushBloomFilterValueSnafu)?;
280 }
281 }
282 }
283 }
284
285 Ok(())
286 }
287
288 async fn do_update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
289 let mut guard = self.stats.record_update();
290
291 let n = batch.num_rows();
292 guard.inc_row_count(n);
293
294 let is_sparse = self.metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse;
295 let mut decoded_pks: Option<Vec<(CompositeValues, usize)>> = None;
296
297 for (col_id, creator) in &mut self.creators {
298 let column_meta = self.metadata.column_by_id(*col_id).unwrap();
300 let column_name = &column_meta.column_schema.name;
301 if let Some(column_array) = batch.column_by_name(column_name) {
302 let vector = Helper::try_into_vector(column_array.clone())
304 .context(crate::error::ConvertVectorSnafu)?;
305 let sort_field = SortField::new(vector.data_type());
306
307 for i in 0..n {
308 let value = vector.get_ref(i);
309 let elems = (!value.is_null())
310 .then(|| {
311 let mut buf = vec![];
312 IndexValueCodec::encode_nonnull_value(value, &sort_field, &mut buf)
313 .context(EncodeSnafu)?;
314 Ok(buf)
315 })
316 .transpose()?;
317
318 creator
319 .push_row_elems(elems)
320 .await
321 .context(PushBloomFilterValueSnafu)?;
322 }
323 } else if is_sparse && column_meta.semantic_type == SemanticType::Tag {
324 if decoded_pks.is_none() {
326 decoded_pks = Some(decode_primary_keys_with_counts(batch, &self.codec)?);
327 }
328
329 let pk_values_with_counts = decoded_pks.as_ref().unwrap();
330 let Some(col_info) = self.codec.pk_col_info(*col_id) else {
331 debug!(
332 "Column {} not found in primary key during building bloom filter index",
333 column_name
334 );
335 continue;
336 };
337 let pk_index = col_info.idx;
338 let field = &col_info.field;
339 for (decoded, count) in pk_values_with_counts {
340 let value = match decoded {
341 CompositeValues::Dense(dense) => dense.get(pk_index).map(|v| &v.1),
342 CompositeValues::Sparse(sparse) => sparse.get(col_id),
343 };
344
345 let elems = value
346 .filter(|v| !v.is_null())
347 .map(|v| {
348 let mut buf = vec![];
349 IndexValueCodec::encode_nonnull_value(
350 v.as_value_ref(),
351 field,
352 &mut buf,
353 )
354 .context(EncodeSnafu)?;
355 Ok(buf)
356 })
357 .transpose()?;
358
359 creator
360 .push_n_row_elems(*count, elems)
361 .await
362 .context(PushBloomFilterValueSnafu)?;
363 }
364 } else {
365 debug!(
366 "Column {} not found in the batch during building bloom filter index",
367 column_name
368 );
369 }
370 }
371
372 Ok(())
373 }
374
375 async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> {
377 let mut guard = self.stats.record_finish();
378
379 for (id, creator) in &mut self.creators {
380 let written_bytes = Self::do_finish_single_creator(id, creator, puffin_writer).await?;
381 guard.inc_byte_count(written_bytes);
382 }
383
384 Ok(())
385 }
386
387 async fn do_cleanup(&mut self) -> Result<()> {
388 let mut _guard = self.stats.record_cleanup();
389
390 self.creators.clear();
391 self.temp_file_provider.cleanup().await
392 }
393
394 async fn do_finish_single_creator(
415 col_id: &ColumnId,
416 creator: &mut BloomFilterCreator,
417 puffin_writer: &mut SstPuffinWriter,
418 ) -> Result<ByteCount> {
419 let (tx, rx) = tokio::io::duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);
420
421 let target_key = IndexTarget::ColumnId(*col_id);
422 let blob_name = format!("{INDEX_BLOB_TYPE}-{target_key}");
423 let (index_finish, puffin_add_blob) = futures::join!(
424 creator.finish(tx.compat_write()),
425 puffin_writer.put_blob(
426 &blob_name,
427 rx.compat(),
428 PutOptions::default(),
429 Default::default(),
430 )
431 );
432
433 match (
434 puffin_add_blob.context(PuffinAddBlobSnafu),
435 index_finish.context(BloomFilterFinishSnafu),
436 ) {
437 (Err(e1), Err(e2)) => BiErrorsSnafu {
438 first: Box::new(e1),
439 second: Box::new(e2),
440 }
441 .fail()?,
442
443 (Ok(_), e @ Err(_)) => e?,
444 (e @ Err(_), Ok(_)) => e.map(|_| ())?,
445 (Ok(written_bytes), Ok(_)) => {
446 return Ok(written_bytes);
447 }
448 }
449
450 Ok(0)
451 }
452
453 pub fn memory_usage(&self) -> usize {
455 self.global_memory_usage
456 .load(std::sync::atomic::Ordering::Relaxed)
457 }
458
459 pub fn column_ids(&self) -> impl Iterator<Item = ColumnId> + use<'_> {
461 self.creators.keys().copied()
462 }
463}
464
465#[cfg(test)]
466pub(crate) mod tests {
467
468 use api::v1::SemanticType;
469 use datatypes::data_type::ConcreteDataType;
470 use datatypes::schema::{ColumnSchema, SkippingIndexOptions};
471 use datatypes::value::ValueRef;
472 use datatypes::vectors::{UInt8Vector, UInt64Vector};
473 use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl};
474 use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
475 use object_store::ObjectStore;
476 use object_store::services::Memory;
477 use puffin::puffin_manager::{PuffinManager, PuffinReader};
478 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
479 use store_api::storage::RegionId;
480
481 use super::*;
482 use crate::access_layer::FilePathProvider;
483 use crate::read::BatchColumn;
484 use crate::sst::file::RegionFileId;
485 use crate::sst::index::puffin_manager::PuffinManagerFactory;
486
487 pub fn mock_object_store() -> ObjectStore {
488 ObjectStore::new(Memory::default()).unwrap().finish()
489 }
490
491 pub async fn new_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
492 IntermediateManager::init_fs(path).await.unwrap()
493 }
494
495 pub struct TestPathProvider;
496
497 impl FilePathProvider for TestPathProvider {
498 fn build_index_file_path(&self, file_id: RegionFileId) -> String {
499 file_id.file_id().to_string()
500 }
501
502 fn build_sst_file_path(&self, file_id: RegionFileId) -> String {
503 file_id.file_id().to_string()
504 }
505 }
506
507 pub fn mock_region_metadata() -> RegionMetadataRef {
524 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
525 builder
526 .push_column_metadata(ColumnMetadata {
527 column_schema: ColumnSchema::new(
528 "tag_str",
529 ConcreteDataType::string_datatype(),
530 false,
531 )
532 .with_skipping_options(SkippingIndexOptions::new_unchecked(
533 2,
534 0.01,
535 SkippingIndexType::BloomFilter,
536 ))
537 .unwrap(),
538 semantic_type: SemanticType::Tag,
539 column_id: 1,
540 })
541 .push_column_metadata(ColumnMetadata {
542 column_schema: ColumnSchema::new(
543 "ts",
544 ConcreteDataType::timestamp_millisecond_datatype(),
545 false,
546 ),
547 semantic_type: SemanticType::Timestamp,
548 column_id: 2,
549 })
550 .push_column_metadata(ColumnMetadata {
551 column_schema: ColumnSchema::new(
552 "field_u64",
553 ConcreteDataType::uint64_datatype(),
554 false,
555 )
556 .with_skipping_options(SkippingIndexOptions::new_unchecked(
557 4,
558 0.01,
559 SkippingIndexType::BloomFilter,
560 ))
561 .unwrap(),
562 semantic_type: SemanticType::Field,
563 column_id: 3,
564 })
565 .primary_key(vec![1]);
566
567 Arc::new(builder.build().unwrap())
568 }
569
570 pub fn new_batch(str_tag: impl AsRef<str>, u64_field: impl IntoIterator<Item = u64>) -> Batch {
571 let fields = vec![(0, SortField::new(ConcreteDataType::string_datatype()))];
572 let codec = DensePrimaryKeyCodec::with_fields(fields);
573 let row: [ValueRef; 1] = [str_tag.as_ref().into()];
574 let primary_key = codec.encode(row.into_iter()).unwrap();
575
576 let u64_field = BatchColumn {
577 column_id: 3,
578 data: Arc::new(UInt64Vector::from_iter_values(u64_field)),
579 };
580 let num_rows = u64_field.data.len();
581
582 Batch::new(
583 primary_key,
584 Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
585 0, num_rows,
586 ))),
587 Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
588 0, num_rows,
589 ))),
590 Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(
591 1, num_rows,
592 ))),
593 vec![u64_field],
594 )
595 .unwrap()
596 }
597
598 #[tokio::test]
599 async fn test_bloom_filter_indexer() {
600 let prefix = "test_bloom_filter_indexer_";
601 let tempdir = common_test_util::temp_dir::create_temp_dir(prefix);
602 let object_store = mock_object_store();
603 let intm_mgr = new_intm_mgr(tempdir.path().to_string_lossy()).await;
604 let region_metadata = mock_region_metadata();
605 let memory_usage_threshold = Some(1024);
606
607 let file_id = FileId::random();
608 let mut indexer =
609 BloomFilterIndexer::new(file_id, ®ion_metadata, intm_mgr, memory_usage_threshold)
610 .unwrap()
611 .unwrap();
612
613 let mut batch = new_batch("tag1", 0..10);
615 indexer.update(&mut batch).await.unwrap();
616
617 let mut batch = new_batch("tag2", 10..20);
618 indexer.update(&mut batch).await.unwrap();
619
620 let (_d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
621 let puffin_manager = factory.build(object_store, TestPathProvider);
622
623 let file_id = RegionFileId::new(region_metadata.region_id, file_id);
624 let mut puffin_writer = puffin_manager.writer(&file_id).await.unwrap();
625 let (row_count, byte_count) = indexer.finish(&mut puffin_writer).await.unwrap();
626 assert_eq!(row_count, 20);
627 assert!(byte_count > 0);
628 puffin_writer.finish().await.unwrap();
629
630 let puffin_reader = puffin_manager.reader(&file_id).await.unwrap();
631
632 {
634 let blob_guard = puffin_reader
635 .blob("greptime-bloom-filter-v1-1")
636 .await
637 .unwrap();
638 let reader = blob_guard.reader().await.unwrap();
639 let bloom_filter = BloomFilterReaderImpl::new(reader);
640 let metadata = bloom_filter.metadata().await.unwrap();
641
642 assert_eq!(metadata.segment_count, 10);
643 for i in 0..5 {
644 let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[i] as usize];
645 let bf = bloom_filter.bloom_filter(loc).await.unwrap();
646 assert!(bf.contains(b"tag1"));
647 }
648 for i in 5..10 {
649 let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[i] as usize];
650 let bf = bloom_filter.bloom_filter(loc).await.unwrap();
651 assert!(bf.contains(b"tag2"));
652 }
653 }
654
655 {
657 let sort_field = SortField::new(ConcreteDataType::uint64_datatype());
658
659 let blob_guard = puffin_reader
660 .blob("greptime-bloom-filter-v1-3")
661 .await
662 .unwrap();
663 let reader = blob_guard.reader().await.unwrap();
664 let bloom_filter = BloomFilterReaderImpl::new(reader);
665 let metadata = bloom_filter.metadata().await.unwrap();
666
667 assert_eq!(metadata.segment_count, 5);
668 for i in 0u64..20 {
669 let idx = i as usize / 4;
670 let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[idx] as usize];
671 let bf = bloom_filter.bloom_filter(loc).await.unwrap();
672 let mut buf = vec![];
673 IndexValueCodec::encode_nonnull_value(ValueRef::UInt64(i), &sort_field, &mut buf)
674 .unwrap();
675
676 assert!(bf.contains(&buf));
677 }
678 }
679 }
680}