1use std::collections::HashMap;
16use std::sync::atomic::AtomicUsize;
17use std::sync::Arc;
18
19use common_telemetry::{debug, warn};
20use datatypes::schema::SkippingIndexType;
21use index::bloom_filter::creator::BloomFilterCreator;
22use mito_codec::index::{IndexValueCodec, IndexValuesCodec};
23use mito_codec::row_converter::SortField;
24use puffin::puffin_manager::{PuffinWriter, PutOptions};
25use snafu::{ensure, ResultExt};
26use store_api::metadata::RegionMetadataRef;
27use store_api::storage::ColumnId;
28use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
29
30use crate::error::{
31 BiErrorsSnafu, BloomFilterFinishSnafu, EncodeSnafu, IndexOptionsSnafu,
32 OperateAbortedIndexSnafu, PuffinAddBlobSnafu, PushBloomFilterValueSnafu, Result,
33};
34use crate::read::Batch;
35use crate::sst::file::FileId;
36use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
37use crate::sst::index::intermediate::{
38 IntermediateLocation, IntermediateManager, TempFileProvider,
39};
40use crate::sst::index::puffin_manager::SstPuffinWriter;
41use crate::sst::index::statistics::{ByteCount, RowCount, Statistics};
42use crate::sst::index::TYPE_BLOOM_FILTER_INDEX;
43
44const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
46
47pub struct BloomFilterIndexer {
49 creators: HashMap<ColumnId, BloomFilterCreator>,
51
52 temp_file_provider: Arc<TempFileProvider>,
54
55 codec: IndexValuesCodec,
57
58 aborted: bool,
60
61 stats: Statistics,
63
64 global_memory_usage: Arc<AtomicUsize>,
66}
67
68impl BloomFilterIndexer {
69 pub fn new(
71 sst_file_id: FileId,
72 metadata: &RegionMetadataRef,
73 intermediate_manager: IntermediateManager,
74 memory_usage_threshold: Option<usize>,
75 ) -> Result<Option<Self>> {
76 let mut creators = HashMap::new();
77
78 let temp_file_provider = Arc::new(TempFileProvider::new(
79 IntermediateLocation::new(&metadata.region_id, &sst_file_id),
80 intermediate_manager,
81 ));
82 let global_memory_usage = Arc::new(AtomicUsize::new(0));
83
84 for column in &metadata.column_metadatas {
85 let options =
86 column
87 .column_schema
88 .skipping_index_options()
89 .context(IndexOptionsSnafu {
90 column_name: &column.column_schema.name,
91 })?;
92
93 let options = match options {
94 Some(options) if options.index_type == SkippingIndexType::BloomFilter => options,
95 _ => continue,
96 };
97
98 let creator = BloomFilterCreator::new(
99 options.granularity as _,
100 options.false_positive_rate(),
101 temp_file_provider.clone(),
102 global_memory_usage.clone(),
103 memory_usage_threshold,
104 );
105 creators.insert(column.column_id, creator);
106 }
107
108 if creators.is_empty() {
109 return Ok(None);
110 }
111
112 let codec = IndexValuesCodec::from_tag_columns(
113 metadata.primary_key_encoding,
114 metadata.primary_key_columns(),
115 );
116 let indexer = Self {
117 creators,
118 temp_file_provider,
119 codec,
120 aborted: false,
121 stats: Statistics::new(TYPE_BLOOM_FILTER_INDEX),
122 global_memory_usage,
123 };
124 Ok(Some(indexer))
125 }
126
127 pub async fn update(&mut self, batch: &mut Batch) -> Result<()> {
132 ensure!(!self.aborted, OperateAbortedIndexSnafu);
133
134 if self.creators.is_empty() {
135 return Ok(());
136 }
137
138 if let Err(update_err) = self.do_update(batch).await {
139 if let Err(err) = self.do_cleanup().await {
141 if cfg!(any(test, feature = "test")) {
142 panic!("Failed to clean up index creator, err: {err:?}",);
143 } else {
144 warn!(err; "Failed to clean up index creator");
145 }
146 }
147 return Err(update_err);
148 }
149
150 Ok(())
151 }
152
153 pub async fn finish(
158 &mut self,
159 puffin_writer: &mut SstPuffinWriter,
160 ) -> Result<(RowCount, ByteCount)> {
161 ensure!(!self.aborted, OperateAbortedIndexSnafu);
162
163 if self.stats.row_count() == 0 {
164 return Ok((0, 0));
166 }
167
168 let finish_res = self.do_finish(puffin_writer).await;
169 if let Err(err) = self.do_cleanup().await {
171 if cfg!(any(test, feature = "test")) {
172 panic!("Failed to clean up index creator, err: {err:?}",);
173 } else {
174 warn!(err; "Failed to clean up index creator");
175 }
176 }
177
178 finish_res.map(|_| (self.stats.row_count(), self.stats.byte_count()))
179 }
180
181 pub async fn abort(&mut self) -> Result<()> {
185 if self.aborted {
186 return Ok(());
187 }
188 self.aborted = true;
189
190 self.do_cleanup().await
191 }
192
193 async fn do_update(&mut self, batch: &mut Batch) -> Result<()> {
194 let mut guard = self.stats.record_update();
195
196 let n = batch.num_rows();
197 guard.inc_row_count(n);
198
199 for (col_id, creator) in &mut self.creators {
200 match self.codec.pk_col_info(*col_id) {
201 Some(col_info) => {
203 let pk_idx = col_info.idx;
204 let field = &col_info.field;
205 let elems = batch
206 .pk_col_value(self.codec.decoder(), pk_idx, *col_id)?
207 .filter(|v| !v.is_null())
208 .map(|v| {
209 let mut buf = vec![];
210 IndexValueCodec::encode_nonnull_value(
211 v.as_value_ref(),
212 field,
213 &mut buf,
214 )
215 .context(EncodeSnafu)?;
216 Ok(buf)
217 })
218 .transpose()?;
219 creator
220 .push_n_row_elems(n, elems)
221 .await
222 .context(PushBloomFilterValueSnafu)?;
223 }
224 None => {
226 let Some(values) = batch.field_col_value(*col_id) else {
227 debug!(
228 "Column {} not found in the batch during building bloom filter index",
229 col_id
230 );
231 continue;
232 };
233 let sort_field = SortField::new(values.data.data_type());
234 for i in 0..n {
235 let value = values.data.get_ref(i);
236 let elems = (!value.is_null())
237 .then(|| {
238 let mut buf = vec![];
239 IndexValueCodec::encode_nonnull_value(value, &sort_field, &mut buf)
240 .context(EncodeSnafu)?;
241 Ok(buf)
242 })
243 .transpose()?;
244
245 creator
246 .push_row_elems(elems)
247 .await
248 .context(PushBloomFilterValueSnafu)?;
249 }
250 }
251 }
252 }
253
254 Ok(())
255 }
256
257 async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> {
259 let mut guard = self.stats.record_finish();
260
261 for (id, creator) in &mut self.creators {
262 let written_bytes = Self::do_finish_single_creator(id, creator, puffin_writer).await?;
263 guard.inc_byte_count(written_bytes);
264 }
265
266 Ok(())
267 }
268
269 async fn do_cleanup(&mut self) -> Result<()> {
270 let mut _guard = self.stats.record_cleanup();
271
272 self.creators.clear();
273 self.temp_file_provider.cleanup().await
274 }
275
276 async fn do_finish_single_creator(
297 col_id: &ColumnId,
298 creator: &mut BloomFilterCreator,
299 puffin_writer: &mut SstPuffinWriter,
300 ) -> Result<ByteCount> {
301 let (tx, rx) = tokio::io::duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);
302
303 let blob_name = format!("{}-{}", INDEX_BLOB_TYPE, col_id);
304 let (index_finish, puffin_add_blob) = futures::join!(
305 creator.finish(tx.compat_write()),
306 puffin_writer.put_blob(
307 &blob_name,
308 rx.compat(),
309 PutOptions::default(),
310 Default::default(),
311 )
312 );
313
314 match (
315 puffin_add_blob.context(PuffinAddBlobSnafu),
316 index_finish.context(BloomFilterFinishSnafu),
317 ) {
318 (Err(e1), Err(e2)) => BiErrorsSnafu {
319 first: Box::new(e1),
320 second: Box::new(e2),
321 }
322 .fail()?,
323
324 (Ok(_), e @ Err(_)) => e?,
325 (e @ Err(_), Ok(_)) => e.map(|_| ())?,
326 (Ok(written_bytes), Ok(_)) => {
327 return Ok(written_bytes);
328 }
329 }
330
331 Ok(0)
332 }
333
334 pub fn memory_usage(&self) -> usize {
336 self.global_memory_usage
337 .load(std::sync::atomic::Ordering::Relaxed)
338 }
339
340 pub fn column_ids(&self) -> impl Iterator<Item = ColumnId> + use<'_> {
342 self.creators.keys().copied()
343 }
344}
345
346#[cfg(test)]
347pub(crate) mod tests {
348
349 use api::v1::SemanticType;
350 use datatypes::data_type::ConcreteDataType;
351 use datatypes::schema::{ColumnSchema, SkippingIndexOptions};
352 use datatypes::value::ValueRef;
353 use datatypes::vectors::{UInt64Vector, UInt8Vector};
354 use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl};
355 use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
356 use object_store::services::Memory;
357 use object_store::ObjectStore;
358 use puffin::puffin_manager::{PuffinManager, PuffinReader};
359 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
360 use store_api::storage::RegionId;
361
362 use super::*;
363 use crate::access_layer::FilePathProvider;
364 use crate::read::BatchColumn;
365 use crate::sst::index::puffin_manager::PuffinManagerFactory;
366
367 pub fn mock_object_store() -> ObjectStore {
368 ObjectStore::new(Memory::default()).unwrap().finish()
369 }
370
371 pub async fn new_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
372 IntermediateManager::init_fs(path).await.unwrap()
373 }
374
375 pub struct TestPathProvider;
376
377 impl FilePathProvider for TestPathProvider {
378 fn build_index_file_path(&self, file_id: FileId) -> String {
379 file_id.to_string()
380 }
381
382 fn build_sst_file_path(&self, file_id: FileId) -> String {
383 file_id.to_string()
384 }
385 }
386
387 pub fn mock_region_metadata() -> RegionMetadataRef {
404 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
405 builder
406 .push_column_metadata(ColumnMetadata {
407 column_schema: ColumnSchema::new(
408 "tag_str",
409 ConcreteDataType::string_datatype(),
410 false,
411 )
412 .with_skipping_options(SkippingIndexOptions::new_unchecked(
413 2,
414 0.01,
415 SkippingIndexType::BloomFilter,
416 ))
417 .unwrap(),
418 semantic_type: SemanticType::Tag,
419 column_id: 1,
420 })
421 .push_column_metadata(ColumnMetadata {
422 column_schema: ColumnSchema::new(
423 "ts",
424 ConcreteDataType::timestamp_millisecond_datatype(),
425 false,
426 ),
427 semantic_type: SemanticType::Timestamp,
428 column_id: 2,
429 })
430 .push_column_metadata(ColumnMetadata {
431 column_schema: ColumnSchema::new(
432 "field_u64",
433 ConcreteDataType::uint64_datatype(),
434 false,
435 )
436 .with_skipping_options(SkippingIndexOptions::new_unchecked(
437 4,
438 0.01,
439 SkippingIndexType::BloomFilter,
440 ))
441 .unwrap(),
442 semantic_type: SemanticType::Field,
443 column_id: 3,
444 })
445 .primary_key(vec![1]);
446
447 Arc::new(builder.build().unwrap())
448 }
449
450 pub fn new_batch(str_tag: impl AsRef<str>, u64_field: impl IntoIterator<Item = u64>) -> Batch {
451 let fields = vec![(0, SortField::new(ConcreteDataType::string_datatype()))];
452 let codec = DensePrimaryKeyCodec::with_fields(fields);
453 let row: [ValueRef; 1] = [str_tag.as_ref().into()];
454 let primary_key = codec.encode(row.into_iter()).unwrap();
455
456 let u64_field = BatchColumn {
457 column_id: 3,
458 data: Arc::new(UInt64Vector::from_iter_values(u64_field)),
459 };
460 let num_rows = u64_field.data.len();
461
462 Batch::new(
463 primary_key,
464 Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
465 0, num_rows,
466 ))),
467 Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
468 0, num_rows,
469 ))),
470 Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(
471 1, num_rows,
472 ))),
473 vec![u64_field],
474 )
475 .unwrap()
476 }
477
478 #[tokio::test]
479 async fn test_bloom_filter_indexer() {
480 let prefix = "test_bloom_filter_indexer_";
481 let tempdir = common_test_util::temp_dir::create_temp_dir(prefix);
482 let object_store = mock_object_store();
483 let intm_mgr = new_intm_mgr(tempdir.path().to_string_lossy()).await;
484 let region_metadata = mock_region_metadata();
485 let memory_usage_threshold = Some(1024);
486
487 let mut indexer = BloomFilterIndexer::new(
488 FileId::random(),
489 ®ion_metadata,
490 intm_mgr,
491 memory_usage_threshold,
492 )
493 .unwrap()
494 .unwrap();
495
496 let mut batch = new_batch("tag1", 0..10);
498 indexer.update(&mut batch).await.unwrap();
499
500 let mut batch = new_batch("tag2", 10..20);
501 indexer.update(&mut batch).await.unwrap();
502
503 let (_d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
504 let puffin_manager = factory.build(object_store, TestPathProvider);
505
506 let file_id = FileId::random();
507 let mut puffin_writer = puffin_manager.writer(&file_id).await.unwrap();
508 let (row_count, byte_count) = indexer.finish(&mut puffin_writer).await.unwrap();
509 assert_eq!(row_count, 20);
510 assert!(byte_count > 0);
511 puffin_writer.finish().await.unwrap();
512
513 let puffin_reader = puffin_manager.reader(&file_id).await.unwrap();
514
515 {
517 let blob_guard = puffin_reader
518 .blob("greptime-bloom-filter-v1-1")
519 .await
520 .unwrap();
521 let reader = blob_guard.reader().await.unwrap();
522 let bloom_filter = BloomFilterReaderImpl::new(reader);
523 let metadata = bloom_filter.metadata().await.unwrap();
524
525 assert_eq!(metadata.segment_count, 10);
526 for i in 0..5 {
527 let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[i] as usize];
528 let bf = bloom_filter.bloom_filter(loc).await.unwrap();
529 assert!(bf.contains(b"tag1"));
530 }
531 for i in 5..10 {
532 let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[i] as usize];
533 let bf = bloom_filter.bloom_filter(loc).await.unwrap();
534 assert!(bf.contains(b"tag2"));
535 }
536 }
537
538 {
540 let sort_field = SortField::new(ConcreteDataType::uint64_datatype());
541
542 let blob_guard = puffin_reader
543 .blob("greptime-bloom-filter-v1-3")
544 .await
545 .unwrap();
546 let reader = blob_guard.reader().await.unwrap();
547 let bloom_filter = BloomFilterReaderImpl::new(reader);
548 let metadata = bloom_filter.metadata().await.unwrap();
549
550 assert_eq!(metadata.segment_count, 5);
551 for i in 0u64..20 {
552 let idx = i as usize / 4;
553 let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[idx] as usize];
554 let bf = bloom_filter.bloom_filter(loc).await.unwrap();
555 let mut buf = vec![];
556 IndexValueCodec::encode_nonnull_value(ValueRef::UInt64(i), &sort_field, &mut buf)
557 .unwrap();
558
559 assert!(bf.contains(&buf));
560 }
561 }
562 }
563}