mito2/sst/index/bloom_filter/
creator.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::atomic::AtomicUsize;
17use std::sync::Arc;
18
19use common_telemetry::{debug, warn};
20use datatypes::arrow::record_batch::RecordBatch;
21use datatypes::schema::SkippingIndexType;
22use datatypes::vectors::Helper;
23use index::bloom_filter::creator::BloomFilterCreator;
24use mito_codec::index::{IndexValueCodec, IndexValuesCodec};
25use mito_codec::row_converter::SortField;
26use puffin::puffin_manager::{PuffinWriter, PutOptions};
27use snafu::{ensure, ResultExt};
28use store_api::metadata::RegionMetadataRef;
29use store_api::storage::ColumnId;
30use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
31
32use crate::error::{
33    BiErrorsSnafu, BloomFilterFinishSnafu, EncodeSnafu, IndexOptionsSnafu,
34    OperateAbortedIndexSnafu, PuffinAddBlobSnafu, PushBloomFilterValueSnafu, Result,
35};
36use crate::read::Batch;
37use crate::sst::file::FileId;
38use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
39use crate::sst::index::intermediate::{
40    IntermediateLocation, IntermediateManager, TempFileProvider,
41};
42use crate::sst::index::puffin_manager::SstPuffinWriter;
43use crate::sst::index::statistics::{ByteCount, RowCount, Statistics};
44use crate::sst::index::TYPE_BLOOM_FILTER_INDEX;
45
46/// The buffer size for the pipe used to send index data to the puffin blob.
47const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
48
49/// The indexer for the bloom filter index.
50pub struct BloomFilterIndexer {
51    /// The bloom filter creators.
52    creators: HashMap<ColumnId, BloomFilterCreator>,
53
54    /// The provider for intermediate files.
55    temp_file_provider: Arc<TempFileProvider>,
56
57    /// Codec for decoding primary keys.
58    codec: IndexValuesCodec,
59
60    /// Whether the indexing process has been aborted.
61    aborted: bool,
62
63    /// The statistics of the indexer.
64    stats: Statistics,
65
66    /// The global memory usage.
67    global_memory_usage: Arc<AtomicUsize>,
68
69    /// Region metadata for column lookups.
70    metadata: RegionMetadataRef,
71}
72
73impl BloomFilterIndexer {
74    /// Creates a new bloom filter indexer.
75    pub fn new(
76        sst_file_id: FileId,
77        metadata: &RegionMetadataRef,
78        intermediate_manager: IntermediateManager,
79        memory_usage_threshold: Option<usize>,
80    ) -> Result<Option<Self>> {
81        let mut creators = HashMap::new();
82
83        let temp_file_provider = Arc::new(TempFileProvider::new(
84            IntermediateLocation::new(&metadata.region_id, &sst_file_id),
85            intermediate_manager,
86        ));
87        let global_memory_usage = Arc::new(AtomicUsize::new(0));
88
89        for column in &metadata.column_metadatas {
90            let options =
91                column
92                    .column_schema
93                    .skipping_index_options()
94                    .context(IndexOptionsSnafu {
95                        column_name: &column.column_schema.name,
96                    })?;
97
98            let options = match options {
99                Some(options) if options.index_type == SkippingIndexType::BloomFilter => options,
100                _ => continue,
101            };
102
103            let creator = BloomFilterCreator::new(
104                options.granularity as _,
105                options.false_positive_rate(),
106                temp_file_provider.clone(),
107                global_memory_usage.clone(),
108                memory_usage_threshold,
109            );
110            creators.insert(column.column_id, creator);
111        }
112
113        if creators.is_empty() {
114            return Ok(None);
115        }
116
117        let codec = IndexValuesCodec::from_tag_columns(
118            metadata.primary_key_encoding,
119            metadata.primary_key_columns(),
120        );
121        let indexer = Self {
122            creators,
123            temp_file_provider,
124            codec,
125            aborted: false,
126            stats: Statistics::new(TYPE_BLOOM_FILTER_INDEX),
127            global_memory_usage,
128            metadata: metadata.clone(),
129        };
130        Ok(Some(indexer))
131    }
132
133    /// Updates index with a batch of rows.
134    /// Garbage will be cleaned up if failed to update.
135    ///
136    /// TODO(zhongzc): duplicate with `mito2::sst::index::inverted_index::creator::InvertedIndexCreator`
137    pub async fn update(&mut self, batch: &mut Batch) -> Result<()> {
138        ensure!(!self.aborted, OperateAbortedIndexSnafu);
139
140        if self.creators.is_empty() {
141            return Ok(());
142        }
143
144        if let Err(update_err) = self.do_update(batch).await {
145            // clean up garbage if failed to update
146            if let Err(err) = self.do_cleanup().await {
147                if cfg!(any(test, feature = "test")) {
148                    panic!("Failed to clean up index creator, err: {err:?}",);
149                } else {
150                    warn!(err; "Failed to clean up index creator");
151                }
152            }
153            return Err(update_err);
154        }
155
156        Ok(())
157    }
158
159    /// Updates the bloom filter index with the given flat format RecordBatch.
160    pub async fn update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
161        ensure!(!self.aborted, OperateAbortedIndexSnafu);
162
163        if self.creators.is_empty() || batch.num_rows() == 0 {
164            return Ok(());
165        }
166
167        if let Err(update_err) = self.do_update_flat(batch).await {
168            // clean up garbage if failed to update
169            if let Err(err) = self.do_cleanup().await {
170                if cfg!(any(test, feature = "test")) {
171                    panic!("Failed to clean up index creator, err: {err:?}",);
172                } else {
173                    warn!(err; "Failed to clean up index creator");
174                }
175            }
176            return Err(update_err);
177        }
178
179        Ok(())
180    }
181
182    /// Finishes index creation and cleans up garbage.
183    /// Returns the number of rows and bytes written.
184    ///
185    /// TODO(zhongzc): duplicate with `mito2::sst::index::inverted_index::creator::InvertedIndexCreator`
186    pub async fn finish(
187        &mut self,
188        puffin_writer: &mut SstPuffinWriter,
189    ) -> Result<(RowCount, ByteCount)> {
190        ensure!(!self.aborted, OperateAbortedIndexSnafu);
191
192        if self.stats.row_count() == 0 {
193            // no IO is performed, no garbage to clean up, just return
194            return Ok((0, 0));
195        }
196
197        let finish_res = self.do_finish(puffin_writer).await;
198        // clean up garbage no matter finish successfully or not
199        if let Err(err) = self.do_cleanup().await {
200            if cfg!(any(test, feature = "test")) {
201                panic!("Failed to clean up index creator, err: {err:?}",);
202            } else {
203                warn!(err; "Failed to clean up index creator");
204            }
205        }
206
207        finish_res.map(|_| (self.stats.row_count(), self.stats.byte_count()))
208    }
209
210    /// Aborts index creation and clean up garbage.
211    ///
212    /// TODO(zhongzc): duplicate with `mito2::sst::index::inverted_index::creator::InvertedIndexCreator`
213    pub async fn abort(&mut self) -> Result<()> {
214        if self.aborted {
215            return Ok(());
216        }
217        self.aborted = true;
218
219        self.do_cleanup().await
220    }
221
222    async fn do_update(&mut self, batch: &mut Batch) -> Result<()> {
223        let mut guard = self.stats.record_update();
224
225        let n = batch.num_rows();
226        guard.inc_row_count(n);
227
228        for (col_id, creator) in &mut self.creators {
229            match self.codec.pk_col_info(*col_id) {
230                // tags
231                Some(col_info) => {
232                    let pk_idx = col_info.idx;
233                    let field = &col_info.field;
234                    let elems = batch
235                        .pk_col_value(self.codec.decoder(), pk_idx, *col_id)?
236                        .filter(|v| !v.is_null())
237                        .map(|v| {
238                            let mut buf = vec![];
239                            IndexValueCodec::encode_nonnull_value(
240                                v.as_value_ref(),
241                                field,
242                                &mut buf,
243                            )
244                            .context(EncodeSnafu)?;
245                            Ok(buf)
246                        })
247                        .transpose()?;
248                    creator
249                        .push_n_row_elems(n, elems)
250                        .await
251                        .context(PushBloomFilterValueSnafu)?;
252                }
253                // fields
254                None => {
255                    let Some(values) = batch.field_col_value(*col_id) else {
256                        debug!(
257                            "Column {} not found in the batch during building bloom filter index",
258                            col_id
259                        );
260                        continue;
261                    };
262                    let sort_field = SortField::new(values.data.data_type());
263                    for i in 0..n {
264                        let value = values.data.get_ref(i);
265                        let elems = (!value.is_null())
266                            .then(|| {
267                                let mut buf = vec![];
268                                IndexValueCodec::encode_nonnull_value(value, &sort_field, &mut buf)
269                                    .context(EncodeSnafu)?;
270                                Ok(buf)
271                            })
272                            .transpose()?;
273
274                        creator
275                            .push_row_elems(elems)
276                            .await
277                            .context(PushBloomFilterValueSnafu)?;
278                    }
279                }
280            }
281        }
282
283        Ok(())
284    }
285
286    async fn do_update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
287        let mut guard = self.stats.record_update();
288
289        let n = batch.num_rows();
290        guard.inc_row_count(n);
291
292        for (col_id, creator) in &mut self.creators {
293            // Get the column name from metadata
294            if let Some(column_meta) = self.metadata.column_by_id(*col_id) {
295                let column_name = &column_meta.column_schema.name;
296
297                // Find the column in the RecordBatch by name
298                if let Some(column_array) = batch.column_by_name(column_name) {
299                    // Convert Arrow array to VectorRef
300                    let vector = Helper::try_into_vector(column_array.clone())
301                        .context(crate::error::ConvertVectorSnafu)?;
302                    let sort_field = SortField::new(vector.data_type());
303
304                    for i in 0..n {
305                        let value = vector.get_ref(i);
306                        let elems = (!value.is_null())
307                            .then(|| {
308                                let mut buf = vec![];
309                                IndexValueCodec::encode_nonnull_value(value, &sort_field, &mut buf)
310                                    .context(EncodeSnafu)?;
311                                Ok(buf)
312                            })
313                            .transpose()?;
314
315                        creator
316                            .push_row_elems(elems)
317                            .await
318                            .context(PushBloomFilterValueSnafu)?;
319                    }
320                } else {
321                    debug!(
322                        "Column {} not found in the batch during building bloom filter index",
323                        column_name
324                    );
325                    // Push empty elements to maintain alignment
326                    for _ in 0..n {
327                        creator
328                            .push_row_elems(None)
329                            .await
330                            .context(PushBloomFilterValueSnafu)?;
331                    }
332                }
333            }
334        }
335
336        Ok(())
337    }
338
339    /// TODO(zhongzc): duplicate with `mito2::sst::index::inverted_index::creator::InvertedIndexCreator`
340    async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> {
341        let mut guard = self.stats.record_finish();
342
343        for (id, creator) in &mut self.creators {
344            let written_bytes = Self::do_finish_single_creator(id, creator, puffin_writer).await?;
345            guard.inc_byte_count(written_bytes);
346        }
347
348        Ok(())
349    }
350
351    async fn do_cleanup(&mut self) -> Result<()> {
352        let mut _guard = self.stats.record_cleanup();
353
354        self.creators.clear();
355        self.temp_file_provider.cleanup().await
356    }
357
358    /// Data flow of finishing index:
359    ///
360    /// ```text
361    ///                               (In Memory Buffer)
362    ///                                    ┌──────┐
363    ///  ┌─────────────┐                   │ PIPE │
364    ///  │             │ write index data  │      │
365    ///  │ IndexWriter ├──────────────────►│ tx   │
366    ///  │             │                   │      │
367    ///  └─────────────┘                   │      │
368    ///                  ┌─────────────────┤ rx   │
369    ///  ┌─────────────┐ │ read as blob    └──────┘
370    ///  │             │ │
371    ///  │ PuffinWriter├─┤
372    ///  │             │ │ copy to file    ┌──────┐
373    ///  └─────────────┘ └────────────────►│ File │
374    ///                                    └──────┘
375    /// ```
376    ///
377    /// TODO(zhongzc): duplicate with `mito2::sst::index::inverted_index::creator::InvertedIndexCreator`
378    async fn do_finish_single_creator(
379        col_id: &ColumnId,
380        creator: &mut BloomFilterCreator,
381        puffin_writer: &mut SstPuffinWriter,
382    ) -> Result<ByteCount> {
383        let (tx, rx) = tokio::io::duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);
384
385        let blob_name = format!("{}-{}", INDEX_BLOB_TYPE, col_id);
386        let (index_finish, puffin_add_blob) = futures::join!(
387            creator.finish(tx.compat_write()),
388            puffin_writer.put_blob(
389                &blob_name,
390                rx.compat(),
391                PutOptions::default(),
392                Default::default(),
393            )
394        );
395
396        match (
397            puffin_add_blob.context(PuffinAddBlobSnafu),
398            index_finish.context(BloomFilterFinishSnafu),
399        ) {
400            (Err(e1), Err(e2)) => BiErrorsSnafu {
401                first: Box::new(e1),
402                second: Box::new(e2),
403            }
404            .fail()?,
405
406            (Ok(_), e @ Err(_)) => e?,
407            (e @ Err(_), Ok(_)) => e.map(|_| ())?,
408            (Ok(written_bytes), Ok(_)) => {
409                return Ok(written_bytes);
410            }
411        }
412
413        Ok(0)
414    }
415
416    /// Returns the memory usage of the indexer.
417    pub fn memory_usage(&self) -> usize {
418        self.global_memory_usage
419            .load(std::sync::atomic::Ordering::Relaxed)
420    }
421
422    /// Returns the column ids to be indexed.
423    pub fn column_ids(&self) -> impl Iterator<Item = ColumnId> + use<'_> {
424        self.creators.keys().copied()
425    }
426}
427
428#[cfg(test)]
429pub(crate) mod tests {
430
431    use api::v1::SemanticType;
432    use datatypes::data_type::ConcreteDataType;
433    use datatypes::schema::{ColumnSchema, SkippingIndexOptions};
434    use datatypes::value::ValueRef;
435    use datatypes::vectors::{UInt64Vector, UInt8Vector};
436    use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl};
437    use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
438    use object_store::services::Memory;
439    use object_store::ObjectStore;
440    use puffin::puffin_manager::{PuffinManager, PuffinReader};
441    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
442    use store_api::storage::RegionId;
443
444    use super::*;
445    use crate::access_layer::FilePathProvider;
446    use crate::read::BatchColumn;
447    use crate::sst::file::RegionFileId;
448    use crate::sst::index::puffin_manager::PuffinManagerFactory;
449
450    pub fn mock_object_store() -> ObjectStore {
451        ObjectStore::new(Memory::default()).unwrap().finish()
452    }
453
454    pub async fn new_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
455        IntermediateManager::init_fs(path).await.unwrap()
456    }
457
458    pub struct TestPathProvider;
459
460    impl FilePathProvider for TestPathProvider {
461        fn build_index_file_path(&self, file_id: RegionFileId) -> String {
462            file_id.file_id().to_string()
463        }
464
465        fn build_sst_file_path(&self, file_id: RegionFileId) -> String {
466            file_id.file_id().to_string()
467        }
468    }
469
470    /// tag_str:
471    ///   - type: string
472    ///   - index: bloom filter
473    ///   - granularity: 2
474    ///   - column_id: 1
475    ///
476    /// ts:
477    ///   - type: timestamp
478    ///   - index: time index
479    ///   - column_id: 2
480    ///
481    /// field_u64:
482    ///   - type: uint64
483    ///   - index: bloom filter
484    ///   - granularity: 4
485    ///   - column_id: 3
486    pub fn mock_region_metadata() -> RegionMetadataRef {
487        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
488        builder
489            .push_column_metadata(ColumnMetadata {
490                column_schema: ColumnSchema::new(
491                    "tag_str",
492                    ConcreteDataType::string_datatype(),
493                    false,
494                )
495                .with_skipping_options(SkippingIndexOptions::new_unchecked(
496                    2,
497                    0.01,
498                    SkippingIndexType::BloomFilter,
499                ))
500                .unwrap(),
501                semantic_type: SemanticType::Tag,
502                column_id: 1,
503            })
504            .push_column_metadata(ColumnMetadata {
505                column_schema: ColumnSchema::new(
506                    "ts",
507                    ConcreteDataType::timestamp_millisecond_datatype(),
508                    false,
509                ),
510                semantic_type: SemanticType::Timestamp,
511                column_id: 2,
512            })
513            .push_column_metadata(ColumnMetadata {
514                column_schema: ColumnSchema::new(
515                    "field_u64",
516                    ConcreteDataType::uint64_datatype(),
517                    false,
518                )
519                .with_skipping_options(SkippingIndexOptions::new_unchecked(
520                    4,
521                    0.01,
522                    SkippingIndexType::BloomFilter,
523                ))
524                .unwrap(),
525                semantic_type: SemanticType::Field,
526                column_id: 3,
527            })
528            .primary_key(vec![1]);
529
530        Arc::new(builder.build().unwrap())
531    }
532
533    pub fn new_batch(str_tag: impl AsRef<str>, u64_field: impl IntoIterator<Item = u64>) -> Batch {
534        let fields = vec![(0, SortField::new(ConcreteDataType::string_datatype()))];
535        let codec = DensePrimaryKeyCodec::with_fields(fields);
536        let row: [ValueRef; 1] = [str_tag.as_ref().into()];
537        let primary_key = codec.encode(row.into_iter()).unwrap();
538
539        let u64_field = BatchColumn {
540            column_id: 3,
541            data: Arc::new(UInt64Vector::from_iter_values(u64_field)),
542        };
543        let num_rows = u64_field.data.len();
544
545        Batch::new(
546            primary_key,
547            Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
548                0, num_rows,
549            ))),
550            Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
551                0, num_rows,
552            ))),
553            Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(
554                1, num_rows,
555            ))),
556            vec![u64_field],
557        )
558        .unwrap()
559    }
560
561    #[tokio::test]
562    async fn test_bloom_filter_indexer() {
563        let prefix = "test_bloom_filter_indexer_";
564        let tempdir = common_test_util::temp_dir::create_temp_dir(prefix);
565        let object_store = mock_object_store();
566        let intm_mgr = new_intm_mgr(tempdir.path().to_string_lossy()).await;
567        let region_metadata = mock_region_metadata();
568        let memory_usage_threshold = Some(1024);
569
570        let file_id = FileId::random();
571        let mut indexer =
572            BloomFilterIndexer::new(file_id, &region_metadata, intm_mgr, memory_usage_threshold)
573                .unwrap()
574                .unwrap();
575
576        // push 20 rows
577        let mut batch = new_batch("tag1", 0..10);
578        indexer.update(&mut batch).await.unwrap();
579
580        let mut batch = new_batch("tag2", 10..20);
581        indexer.update(&mut batch).await.unwrap();
582
583        let (_d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
584        let puffin_manager = factory.build(object_store, TestPathProvider);
585
586        let file_id = RegionFileId::new(region_metadata.region_id, file_id);
587        let mut puffin_writer = puffin_manager.writer(&file_id).await.unwrap();
588        let (row_count, byte_count) = indexer.finish(&mut puffin_writer).await.unwrap();
589        assert_eq!(row_count, 20);
590        assert!(byte_count > 0);
591        puffin_writer.finish().await.unwrap();
592
593        let puffin_reader = puffin_manager.reader(&file_id).await.unwrap();
594
595        // tag_str
596        {
597            let blob_guard = puffin_reader
598                .blob("greptime-bloom-filter-v1-1")
599                .await
600                .unwrap();
601            let reader = blob_guard.reader().await.unwrap();
602            let bloom_filter = BloomFilterReaderImpl::new(reader);
603            let metadata = bloom_filter.metadata().await.unwrap();
604
605            assert_eq!(metadata.segment_count, 10);
606            for i in 0..5 {
607                let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[i] as usize];
608                let bf = bloom_filter.bloom_filter(loc).await.unwrap();
609                assert!(bf.contains(b"tag1"));
610            }
611            for i in 5..10 {
612                let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[i] as usize];
613                let bf = bloom_filter.bloom_filter(loc).await.unwrap();
614                assert!(bf.contains(b"tag2"));
615            }
616        }
617
618        // field_u64
619        {
620            let sort_field = SortField::new(ConcreteDataType::uint64_datatype());
621
622            let blob_guard = puffin_reader
623                .blob("greptime-bloom-filter-v1-3")
624                .await
625                .unwrap();
626            let reader = blob_guard.reader().await.unwrap();
627            let bloom_filter = BloomFilterReaderImpl::new(reader);
628            let metadata = bloom_filter.metadata().await.unwrap();
629
630            assert_eq!(metadata.segment_count, 5);
631            for i in 0u64..20 {
632                let idx = i as usize / 4;
633                let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[idx] as usize];
634                let bf = bloom_filter.bloom_filter(loc).await.unwrap();
635                let mut buf = vec![];
636                IndexValueCodec::encode_nonnull_value(ValueRef::UInt64(i), &sort_field, &mut buf)
637                    .unwrap();
638
639                assert!(bf.contains(&buf));
640            }
641        }
642    }
643}