1use std::collections::HashMap;
16use std::sync::atomic::AtomicUsize;
17use std::sync::Arc;
18
19use common_telemetry::{debug, warn};
20use datatypes::arrow::record_batch::RecordBatch;
21use datatypes::schema::SkippingIndexType;
22use datatypes::vectors::Helper;
23use index::bloom_filter::creator::BloomFilterCreator;
24use mito_codec::index::{IndexValueCodec, IndexValuesCodec};
25use mito_codec::row_converter::SortField;
26use puffin::puffin_manager::{PuffinWriter, PutOptions};
27use snafu::{ensure, ResultExt};
28use store_api::metadata::RegionMetadataRef;
29use store_api::storage::ColumnId;
30use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
31
32use crate::error::{
33 BiErrorsSnafu, BloomFilterFinishSnafu, EncodeSnafu, IndexOptionsSnafu,
34 OperateAbortedIndexSnafu, PuffinAddBlobSnafu, PushBloomFilterValueSnafu, Result,
35};
36use crate::read::Batch;
37use crate::sst::file::FileId;
38use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
39use crate::sst::index::intermediate::{
40 IntermediateLocation, IntermediateManager, TempFileProvider,
41};
42use crate::sst::index::puffin_manager::SstPuffinWriter;
43use crate::sst::index::statistics::{ByteCount, RowCount, Statistics};
44use crate::sst::index::TYPE_BLOOM_FILTER_INDEX;
45
46const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
48
49pub struct BloomFilterIndexer {
51 creators: HashMap<ColumnId, BloomFilterCreator>,
53
54 temp_file_provider: Arc<TempFileProvider>,
56
57 codec: IndexValuesCodec,
59
60 aborted: bool,
62
63 stats: Statistics,
65
66 global_memory_usage: Arc<AtomicUsize>,
68
69 metadata: RegionMetadataRef,
71}
72
73impl BloomFilterIndexer {
74 pub fn new(
76 sst_file_id: FileId,
77 metadata: &RegionMetadataRef,
78 intermediate_manager: IntermediateManager,
79 memory_usage_threshold: Option<usize>,
80 ) -> Result<Option<Self>> {
81 let mut creators = HashMap::new();
82
83 let temp_file_provider = Arc::new(TempFileProvider::new(
84 IntermediateLocation::new(&metadata.region_id, &sst_file_id),
85 intermediate_manager,
86 ));
87 let global_memory_usage = Arc::new(AtomicUsize::new(0));
88
89 for column in &metadata.column_metadatas {
90 let options =
91 column
92 .column_schema
93 .skipping_index_options()
94 .context(IndexOptionsSnafu {
95 column_name: &column.column_schema.name,
96 })?;
97
98 let options = match options {
99 Some(options) if options.index_type == SkippingIndexType::BloomFilter => options,
100 _ => continue,
101 };
102
103 let creator = BloomFilterCreator::new(
104 options.granularity as _,
105 options.false_positive_rate(),
106 temp_file_provider.clone(),
107 global_memory_usage.clone(),
108 memory_usage_threshold,
109 );
110 creators.insert(column.column_id, creator);
111 }
112
113 if creators.is_empty() {
114 return Ok(None);
115 }
116
117 let codec = IndexValuesCodec::from_tag_columns(
118 metadata.primary_key_encoding,
119 metadata.primary_key_columns(),
120 );
121 let indexer = Self {
122 creators,
123 temp_file_provider,
124 codec,
125 aborted: false,
126 stats: Statistics::new(TYPE_BLOOM_FILTER_INDEX),
127 global_memory_usage,
128 metadata: metadata.clone(),
129 };
130 Ok(Some(indexer))
131 }
132
133 pub async fn update(&mut self, batch: &mut Batch) -> Result<()> {
138 ensure!(!self.aborted, OperateAbortedIndexSnafu);
139
140 if self.creators.is_empty() {
141 return Ok(());
142 }
143
144 if let Err(update_err) = self.do_update(batch).await {
145 if let Err(err) = self.do_cleanup().await {
147 if cfg!(any(test, feature = "test")) {
148 panic!("Failed to clean up index creator, err: {err:?}",);
149 } else {
150 warn!(err; "Failed to clean up index creator");
151 }
152 }
153 return Err(update_err);
154 }
155
156 Ok(())
157 }
158
159 pub async fn update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
161 ensure!(!self.aborted, OperateAbortedIndexSnafu);
162
163 if self.creators.is_empty() || batch.num_rows() == 0 {
164 return Ok(());
165 }
166
167 if let Err(update_err) = self.do_update_flat(batch).await {
168 if let Err(err) = self.do_cleanup().await {
170 if cfg!(any(test, feature = "test")) {
171 panic!("Failed to clean up index creator, err: {err:?}",);
172 } else {
173 warn!(err; "Failed to clean up index creator");
174 }
175 }
176 return Err(update_err);
177 }
178
179 Ok(())
180 }
181
182 pub async fn finish(
187 &mut self,
188 puffin_writer: &mut SstPuffinWriter,
189 ) -> Result<(RowCount, ByteCount)> {
190 ensure!(!self.aborted, OperateAbortedIndexSnafu);
191
192 if self.stats.row_count() == 0 {
193 return Ok((0, 0));
195 }
196
197 let finish_res = self.do_finish(puffin_writer).await;
198 if let Err(err) = self.do_cleanup().await {
200 if cfg!(any(test, feature = "test")) {
201 panic!("Failed to clean up index creator, err: {err:?}",);
202 } else {
203 warn!(err; "Failed to clean up index creator");
204 }
205 }
206
207 finish_res.map(|_| (self.stats.row_count(), self.stats.byte_count()))
208 }
209
210 pub async fn abort(&mut self) -> Result<()> {
214 if self.aborted {
215 return Ok(());
216 }
217 self.aborted = true;
218
219 self.do_cleanup().await
220 }
221
222 async fn do_update(&mut self, batch: &mut Batch) -> Result<()> {
223 let mut guard = self.stats.record_update();
224
225 let n = batch.num_rows();
226 guard.inc_row_count(n);
227
228 for (col_id, creator) in &mut self.creators {
229 match self.codec.pk_col_info(*col_id) {
230 Some(col_info) => {
232 let pk_idx = col_info.idx;
233 let field = &col_info.field;
234 let elems = batch
235 .pk_col_value(self.codec.decoder(), pk_idx, *col_id)?
236 .filter(|v| !v.is_null())
237 .map(|v| {
238 let mut buf = vec![];
239 IndexValueCodec::encode_nonnull_value(
240 v.as_value_ref(),
241 field,
242 &mut buf,
243 )
244 .context(EncodeSnafu)?;
245 Ok(buf)
246 })
247 .transpose()?;
248 creator
249 .push_n_row_elems(n, elems)
250 .await
251 .context(PushBloomFilterValueSnafu)?;
252 }
253 None => {
255 let Some(values) = batch.field_col_value(*col_id) else {
256 debug!(
257 "Column {} not found in the batch during building bloom filter index",
258 col_id
259 );
260 continue;
261 };
262 let sort_field = SortField::new(values.data.data_type());
263 for i in 0..n {
264 let value = values.data.get_ref(i);
265 let elems = (!value.is_null())
266 .then(|| {
267 let mut buf = vec![];
268 IndexValueCodec::encode_nonnull_value(value, &sort_field, &mut buf)
269 .context(EncodeSnafu)?;
270 Ok(buf)
271 })
272 .transpose()?;
273
274 creator
275 .push_row_elems(elems)
276 .await
277 .context(PushBloomFilterValueSnafu)?;
278 }
279 }
280 }
281 }
282
283 Ok(())
284 }
285
286 async fn do_update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
287 let mut guard = self.stats.record_update();
288
289 let n = batch.num_rows();
290 guard.inc_row_count(n);
291
292 for (col_id, creator) in &mut self.creators {
293 if let Some(column_meta) = self.metadata.column_by_id(*col_id) {
295 let column_name = &column_meta.column_schema.name;
296
297 if let Some(column_array) = batch.column_by_name(column_name) {
299 let vector = Helper::try_into_vector(column_array.clone())
301 .context(crate::error::ConvertVectorSnafu)?;
302 let sort_field = SortField::new(vector.data_type());
303
304 for i in 0..n {
305 let value = vector.get_ref(i);
306 let elems = (!value.is_null())
307 .then(|| {
308 let mut buf = vec![];
309 IndexValueCodec::encode_nonnull_value(value, &sort_field, &mut buf)
310 .context(EncodeSnafu)?;
311 Ok(buf)
312 })
313 .transpose()?;
314
315 creator
316 .push_row_elems(elems)
317 .await
318 .context(PushBloomFilterValueSnafu)?;
319 }
320 } else {
321 debug!(
322 "Column {} not found in the batch during building bloom filter index",
323 column_name
324 );
325 for _ in 0..n {
327 creator
328 .push_row_elems(None)
329 .await
330 .context(PushBloomFilterValueSnafu)?;
331 }
332 }
333 }
334 }
335
336 Ok(())
337 }
338
339 async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> {
341 let mut guard = self.stats.record_finish();
342
343 for (id, creator) in &mut self.creators {
344 let written_bytes = Self::do_finish_single_creator(id, creator, puffin_writer).await?;
345 guard.inc_byte_count(written_bytes);
346 }
347
348 Ok(())
349 }
350
351 async fn do_cleanup(&mut self) -> Result<()> {
352 let mut _guard = self.stats.record_cleanup();
353
354 self.creators.clear();
355 self.temp_file_provider.cleanup().await
356 }
357
358 async fn do_finish_single_creator(
379 col_id: &ColumnId,
380 creator: &mut BloomFilterCreator,
381 puffin_writer: &mut SstPuffinWriter,
382 ) -> Result<ByteCount> {
383 let (tx, rx) = tokio::io::duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);
384
385 let blob_name = format!("{}-{}", INDEX_BLOB_TYPE, col_id);
386 let (index_finish, puffin_add_blob) = futures::join!(
387 creator.finish(tx.compat_write()),
388 puffin_writer.put_blob(
389 &blob_name,
390 rx.compat(),
391 PutOptions::default(),
392 Default::default(),
393 )
394 );
395
396 match (
397 puffin_add_blob.context(PuffinAddBlobSnafu),
398 index_finish.context(BloomFilterFinishSnafu),
399 ) {
400 (Err(e1), Err(e2)) => BiErrorsSnafu {
401 first: Box::new(e1),
402 second: Box::new(e2),
403 }
404 .fail()?,
405
406 (Ok(_), e @ Err(_)) => e?,
407 (e @ Err(_), Ok(_)) => e.map(|_| ())?,
408 (Ok(written_bytes), Ok(_)) => {
409 return Ok(written_bytes);
410 }
411 }
412
413 Ok(0)
414 }
415
416 pub fn memory_usage(&self) -> usize {
418 self.global_memory_usage
419 .load(std::sync::atomic::Ordering::Relaxed)
420 }
421
422 pub fn column_ids(&self) -> impl Iterator<Item = ColumnId> + use<'_> {
424 self.creators.keys().copied()
425 }
426}
427
428#[cfg(test)]
429pub(crate) mod tests {
430
431 use api::v1::SemanticType;
432 use datatypes::data_type::ConcreteDataType;
433 use datatypes::schema::{ColumnSchema, SkippingIndexOptions};
434 use datatypes::value::ValueRef;
435 use datatypes::vectors::{UInt64Vector, UInt8Vector};
436 use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl};
437 use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
438 use object_store::services::Memory;
439 use object_store::ObjectStore;
440 use puffin::puffin_manager::{PuffinManager, PuffinReader};
441 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
442 use store_api::storage::RegionId;
443
444 use super::*;
445 use crate::access_layer::FilePathProvider;
446 use crate::read::BatchColumn;
447 use crate::sst::file::RegionFileId;
448 use crate::sst::index::puffin_manager::PuffinManagerFactory;
449
450 pub fn mock_object_store() -> ObjectStore {
451 ObjectStore::new(Memory::default()).unwrap().finish()
452 }
453
454 pub async fn new_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
455 IntermediateManager::init_fs(path).await.unwrap()
456 }
457
458 pub struct TestPathProvider;
459
460 impl FilePathProvider for TestPathProvider {
461 fn build_index_file_path(&self, file_id: RegionFileId) -> String {
462 file_id.file_id().to_string()
463 }
464
465 fn build_sst_file_path(&self, file_id: RegionFileId) -> String {
466 file_id.file_id().to_string()
467 }
468 }
469
470 pub fn mock_region_metadata() -> RegionMetadataRef {
487 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
488 builder
489 .push_column_metadata(ColumnMetadata {
490 column_schema: ColumnSchema::new(
491 "tag_str",
492 ConcreteDataType::string_datatype(),
493 false,
494 )
495 .with_skipping_options(SkippingIndexOptions::new_unchecked(
496 2,
497 0.01,
498 SkippingIndexType::BloomFilter,
499 ))
500 .unwrap(),
501 semantic_type: SemanticType::Tag,
502 column_id: 1,
503 })
504 .push_column_metadata(ColumnMetadata {
505 column_schema: ColumnSchema::new(
506 "ts",
507 ConcreteDataType::timestamp_millisecond_datatype(),
508 false,
509 ),
510 semantic_type: SemanticType::Timestamp,
511 column_id: 2,
512 })
513 .push_column_metadata(ColumnMetadata {
514 column_schema: ColumnSchema::new(
515 "field_u64",
516 ConcreteDataType::uint64_datatype(),
517 false,
518 )
519 .with_skipping_options(SkippingIndexOptions::new_unchecked(
520 4,
521 0.01,
522 SkippingIndexType::BloomFilter,
523 ))
524 .unwrap(),
525 semantic_type: SemanticType::Field,
526 column_id: 3,
527 })
528 .primary_key(vec![1]);
529
530 Arc::new(builder.build().unwrap())
531 }
532
533 pub fn new_batch(str_tag: impl AsRef<str>, u64_field: impl IntoIterator<Item = u64>) -> Batch {
534 let fields = vec![(0, SortField::new(ConcreteDataType::string_datatype()))];
535 let codec = DensePrimaryKeyCodec::with_fields(fields);
536 let row: [ValueRef; 1] = [str_tag.as_ref().into()];
537 let primary_key = codec.encode(row.into_iter()).unwrap();
538
539 let u64_field = BatchColumn {
540 column_id: 3,
541 data: Arc::new(UInt64Vector::from_iter_values(u64_field)),
542 };
543 let num_rows = u64_field.data.len();
544
545 Batch::new(
546 primary_key,
547 Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
548 0, num_rows,
549 ))),
550 Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
551 0, num_rows,
552 ))),
553 Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(
554 1, num_rows,
555 ))),
556 vec![u64_field],
557 )
558 .unwrap()
559 }
560
561 #[tokio::test]
562 async fn test_bloom_filter_indexer() {
563 let prefix = "test_bloom_filter_indexer_";
564 let tempdir = common_test_util::temp_dir::create_temp_dir(prefix);
565 let object_store = mock_object_store();
566 let intm_mgr = new_intm_mgr(tempdir.path().to_string_lossy()).await;
567 let region_metadata = mock_region_metadata();
568 let memory_usage_threshold = Some(1024);
569
570 let file_id = FileId::random();
571 let mut indexer =
572 BloomFilterIndexer::new(file_id, ®ion_metadata, intm_mgr, memory_usage_threshold)
573 .unwrap()
574 .unwrap();
575
576 let mut batch = new_batch("tag1", 0..10);
578 indexer.update(&mut batch).await.unwrap();
579
580 let mut batch = new_batch("tag2", 10..20);
581 indexer.update(&mut batch).await.unwrap();
582
583 let (_d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
584 let puffin_manager = factory.build(object_store, TestPathProvider);
585
586 let file_id = RegionFileId::new(region_metadata.region_id, file_id);
587 let mut puffin_writer = puffin_manager.writer(&file_id).await.unwrap();
588 let (row_count, byte_count) = indexer.finish(&mut puffin_writer).await.unwrap();
589 assert_eq!(row_count, 20);
590 assert!(byte_count > 0);
591 puffin_writer.finish().await.unwrap();
592
593 let puffin_reader = puffin_manager.reader(&file_id).await.unwrap();
594
595 {
597 let blob_guard = puffin_reader
598 .blob("greptime-bloom-filter-v1-1")
599 .await
600 .unwrap();
601 let reader = blob_guard.reader().await.unwrap();
602 let bloom_filter = BloomFilterReaderImpl::new(reader);
603 let metadata = bloom_filter.metadata().await.unwrap();
604
605 assert_eq!(metadata.segment_count, 10);
606 for i in 0..5 {
607 let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[i] as usize];
608 let bf = bloom_filter.bloom_filter(loc).await.unwrap();
609 assert!(bf.contains(b"tag1"));
610 }
611 for i in 5..10 {
612 let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[i] as usize];
613 let bf = bloom_filter.bloom_filter(loc).await.unwrap();
614 assert!(bf.contains(b"tag2"));
615 }
616 }
617
618 {
620 let sort_field = SortField::new(ConcreteDataType::uint64_datatype());
621
622 let blob_guard = puffin_reader
623 .blob("greptime-bloom-filter-v1-3")
624 .await
625 .unwrap();
626 let reader = blob_guard.reader().await.unwrap();
627 let bloom_filter = BloomFilterReaderImpl::new(reader);
628 let metadata = bloom_filter.metadata().await.unwrap();
629
630 assert_eq!(metadata.segment_count, 5);
631 for i in 0u64..20 {
632 let idx = i as usize / 4;
633 let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[idx] as usize];
634 let bf = bloom_filter.bloom_filter(loc).await.unwrap();
635 let mut buf = vec![];
636 IndexValueCodec::encode_nonnull_value(ValueRef::UInt64(i), &sort_field, &mut buf)
637 .unwrap();
638
639 assert!(bf.contains(&buf));
640 }
641 }
642 }
643}