mito2/sst/index/bloom_filter/
creator.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::sync::atomic::AtomicUsize;
18
19use api::v1::SemanticType;
20use common_telemetry::{debug, warn};
21use datatypes::arrow::record_batch::RecordBatch;
22use datatypes::schema::SkippingIndexType;
23use datatypes::vectors::Helper;
24use index::bloom_filter::creator::BloomFilterCreator;
25use index::target::IndexTarget;
26use mito_codec::index::{IndexValueCodec, IndexValuesCodec};
27use mito_codec::row_converter::{CompositeValues, SortField};
28use puffin::puffin_manager::{PuffinWriter, PutOptions};
29use snafu::{ResultExt, ensure};
30use store_api::codec::PrimaryKeyEncoding;
31use store_api::metadata::RegionMetadataRef;
32use store_api::storage::{ColumnId, FileId};
33use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
34
35use crate::error::{
36    BiErrorsSnafu, BloomFilterFinishSnafu, EncodeSnafu, IndexOptionsSnafu,
37    OperateAbortedIndexSnafu, PuffinAddBlobSnafu, PushBloomFilterValueSnafu, Result,
38};
39use crate::read::Batch;
40use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
41use crate::sst::index::intermediate::{
42    IntermediateLocation, IntermediateManager, TempFileProvider,
43};
44use crate::sst::index::puffin_manager::SstPuffinWriter;
45use crate::sst::index::statistics::{ByteCount, RowCount, Statistics};
46use crate::sst::index::{TYPE_BLOOM_FILTER_INDEX, decode_primary_keys_with_counts};
47
48/// The buffer size for the pipe used to send index data to the puffin blob.
49const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
50
51/// The indexer for the bloom filter index.
52pub struct BloomFilterIndexer {
53    /// The bloom filter creators.
54    creators: HashMap<ColumnId, BloomFilterCreator>,
55
56    /// The provider for intermediate files.
57    temp_file_provider: Arc<TempFileProvider>,
58
59    /// Codec for decoding primary keys.
60    codec: IndexValuesCodec,
61
62    /// Whether the indexing process has been aborted.
63    aborted: bool,
64
65    /// The statistics of the indexer.
66    stats: Statistics,
67
68    /// The global memory usage.
69    global_memory_usage: Arc<AtomicUsize>,
70
71    /// Region metadata for column lookups.
72    metadata: RegionMetadataRef,
73}
74
75impl BloomFilterIndexer {
76    /// Creates a new bloom filter indexer.
77    pub fn new(
78        sst_file_id: FileId,
79        metadata: &RegionMetadataRef,
80        intermediate_manager: IntermediateManager,
81        memory_usage_threshold: Option<usize>,
82    ) -> Result<Option<Self>> {
83        let mut creators = HashMap::new();
84
85        let temp_file_provider = Arc::new(TempFileProvider::new(
86            IntermediateLocation::new(&metadata.region_id, &sst_file_id),
87            intermediate_manager,
88        ));
89        let global_memory_usage = Arc::new(AtomicUsize::new(0));
90
91        for column in &metadata.column_metadatas {
92            let options =
93                column
94                    .column_schema
95                    .skipping_index_options()
96                    .context(IndexOptionsSnafu {
97                        column_name: &column.column_schema.name,
98                    })?;
99
100            let options = match options {
101                Some(options) if options.index_type == SkippingIndexType::BloomFilter => options,
102                _ => continue,
103            };
104
105            let creator = BloomFilterCreator::new(
106                options.granularity as _,
107                options.false_positive_rate(),
108                temp_file_provider.clone(),
109                global_memory_usage.clone(),
110                memory_usage_threshold,
111            );
112            creators.insert(column.column_id, creator);
113        }
114
115        if creators.is_empty() {
116            return Ok(None);
117        }
118
119        let codec = IndexValuesCodec::from_tag_columns(
120            metadata.primary_key_encoding,
121            metadata.primary_key_columns(),
122        );
123        let indexer = Self {
124            creators,
125            temp_file_provider,
126            codec,
127            aborted: false,
128            stats: Statistics::new(TYPE_BLOOM_FILTER_INDEX),
129            global_memory_usage,
130            metadata: metadata.clone(),
131        };
132        Ok(Some(indexer))
133    }
134
135    /// Updates index with a batch of rows.
136    /// Garbage will be cleaned up if failed to update.
137    ///
138    /// TODO(zhongzc): duplicate with `mito2::sst::index::inverted_index::creator::InvertedIndexCreator`
139    pub async fn update(&mut self, batch: &mut Batch) -> Result<()> {
140        ensure!(!self.aborted, OperateAbortedIndexSnafu);
141
142        if self.creators.is_empty() {
143            return Ok(());
144        }
145
146        if let Err(update_err) = self.do_update(batch).await {
147            // clean up garbage if failed to update
148            if let Err(err) = self.do_cleanup().await {
149                if cfg!(any(test, feature = "test")) {
150                    panic!("Failed to clean up index creator, err: {err:?}",);
151                } else {
152                    warn!(err; "Failed to clean up index creator");
153                }
154            }
155            return Err(update_err);
156        }
157
158        Ok(())
159    }
160
161    /// Updates the bloom filter index with the given flat format RecordBatch.
162    pub async fn update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
163        ensure!(!self.aborted, OperateAbortedIndexSnafu);
164
165        if self.creators.is_empty() || batch.num_rows() == 0 {
166            return Ok(());
167        }
168
169        if let Err(update_err) = self.do_update_flat(batch).await {
170            // clean up garbage if failed to update
171            if let Err(err) = self.do_cleanup().await {
172                if cfg!(any(test, feature = "test")) {
173                    panic!("Failed to clean up index creator, err: {err:?}",);
174                } else {
175                    warn!(err; "Failed to clean up index creator");
176                }
177            }
178            return Err(update_err);
179        }
180
181        Ok(())
182    }
183
184    /// Finishes index creation and cleans up garbage.
185    /// Returns the number of rows and bytes written.
186    ///
187    /// TODO(zhongzc): duplicate with `mito2::sst::index::inverted_index::creator::InvertedIndexCreator`
188    pub async fn finish(
189        &mut self,
190        puffin_writer: &mut SstPuffinWriter,
191    ) -> Result<(RowCount, ByteCount)> {
192        ensure!(!self.aborted, OperateAbortedIndexSnafu);
193
194        if self.stats.row_count() == 0 {
195            // no IO is performed, no garbage to clean up, just return
196            return Ok((0, 0));
197        }
198
199        let finish_res = self.do_finish(puffin_writer).await;
200        // clean up garbage no matter finish successfully or not
201        if let Err(err) = self.do_cleanup().await {
202            if cfg!(any(test, feature = "test")) {
203                panic!("Failed to clean up index creator, err: {err:?}",);
204            } else {
205                warn!(err; "Failed to clean up index creator");
206            }
207        }
208
209        finish_res.map(|_| (self.stats.row_count(), self.stats.byte_count()))
210    }
211
212    /// Aborts index creation and clean up garbage.
213    ///
214    /// TODO(zhongzc): duplicate with `mito2::sst::index::inverted_index::creator::InvertedIndexCreator`
215    pub async fn abort(&mut self) -> Result<()> {
216        if self.aborted {
217            return Ok(());
218        }
219        self.aborted = true;
220
221        self.do_cleanup().await
222    }
223
224    async fn do_update(&mut self, batch: &mut Batch) -> Result<()> {
225        let mut guard = self.stats.record_update();
226
227        let n = batch.num_rows();
228        guard.inc_row_count(n);
229
230        for (col_id, creator) in &mut self.creators {
231            match self.codec.pk_col_info(*col_id) {
232                // tags
233                Some(col_info) => {
234                    let pk_idx = col_info.idx;
235                    let field = &col_info.field;
236                    let elems = batch
237                        .pk_col_value(self.codec.decoder(), pk_idx, *col_id)?
238                        .filter(|v| !v.is_null())
239                        .map(|v| {
240                            let mut buf = vec![];
241                            IndexValueCodec::encode_nonnull_value(
242                                v.as_value_ref(),
243                                field,
244                                &mut buf,
245                            )
246                            .context(EncodeSnafu)?;
247                            Ok(buf)
248                        })
249                        .transpose()?;
250                    creator
251                        .push_n_row_elems(n, elems)
252                        .await
253                        .context(PushBloomFilterValueSnafu)?;
254                }
255                // fields
256                None => {
257                    let Some(values) = batch.field_col_value(*col_id) else {
258                        debug!(
259                            "Column {} not found in the batch during building bloom filter index",
260                            col_id
261                        );
262                        continue;
263                    };
264                    let sort_field = SortField::new(values.data.data_type());
265                    for i in 0..n {
266                        let value = values.data.get_ref(i);
267                        let elems = (!value.is_null())
268                            .then(|| {
269                                let mut buf = vec![];
270                                IndexValueCodec::encode_nonnull_value(value, &sort_field, &mut buf)
271                                    .context(EncodeSnafu)?;
272                                Ok(buf)
273                            })
274                            .transpose()?;
275
276                        creator
277                            .push_row_elems(elems)
278                            .await
279                            .context(PushBloomFilterValueSnafu)?;
280                    }
281                }
282            }
283        }
284
285        Ok(())
286    }
287
288    async fn do_update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
289        let mut guard = self.stats.record_update();
290
291        let n = batch.num_rows();
292        guard.inc_row_count(n);
293
294        let is_sparse = self.metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse;
295        let mut decoded_pks: Option<Vec<(CompositeValues, usize)>> = None;
296
297        for (col_id, creator) in &mut self.creators {
298            // Safety: `creators` are created from the metadata so it won't be None.
299            let column_meta = self.metadata.column_by_id(*col_id).unwrap();
300            let column_name = &column_meta.column_schema.name;
301            if let Some(column_array) = batch.column_by_name(column_name) {
302                // Convert Arrow array to VectorRef
303                let vector = Helper::try_into_vector(column_array.clone())
304                    .context(crate::error::ConvertVectorSnafu)?;
305                let sort_field = SortField::new(vector.data_type());
306
307                for i in 0..n {
308                    let value = vector.get_ref(i);
309                    let elems = (!value.is_null())
310                        .then(|| {
311                            let mut buf = vec![];
312                            IndexValueCodec::encode_nonnull_value(value, &sort_field, &mut buf)
313                                .context(EncodeSnafu)?;
314                            Ok(buf)
315                        })
316                        .transpose()?;
317
318                    creator
319                        .push_row_elems(elems)
320                        .await
321                        .context(PushBloomFilterValueSnafu)?;
322                }
323            } else if is_sparse && column_meta.semantic_type == SemanticType::Tag {
324                // Column not found in batch, tries to decode from primary keys for sparse encoding.
325                if decoded_pks.is_none() {
326                    decoded_pks = Some(decode_primary_keys_with_counts(batch, &self.codec)?);
327                }
328
329                let pk_values_with_counts = decoded_pks.as_ref().unwrap();
330                let Some(col_info) = self.codec.pk_col_info(*col_id) else {
331                    debug!(
332                        "Column {} not found in primary key during building bloom filter index",
333                        column_name
334                    );
335                    continue;
336                };
337                let pk_index = col_info.idx;
338                let field = &col_info.field;
339                for (decoded, count) in pk_values_with_counts {
340                    let value = match decoded {
341                        CompositeValues::Dense(dense) => dense.get(pk_index).map(|v| &v.1),
342                        CompositeValues::Sparse(sparse) => sparse.get(col_id),
343                    };
344
345                    let elems = value
346                        .filter(|v| !v.is_null())
347                        .map(|v| {
348                            let mut buf = vec![];
349                            IndexValueCodec::encode_nonnull_value(
350                                v.as_value_ref(),
351                                field,
352                                &mut buf,
353                            )
354                            .context(EncodeSnafu)?;
355                            Ok(buf)
356                        })
357                        .transpose()?;
358
359                    creator
360                        .push_n_row_elems(*count, elems)
361                        .await
362                        .context(PushBloomFilterValueSnafu)?;
363                }
364            } else {
365                debug!(
366                    "Column {} not found in the batch during building bloom filter index",
367                    column_name
368                );
369            }
370        }
371
372        Ok(())
373    }
374
375    /// TODO(zhongzc): duplicate with `mito2::sst::index::inverted_index::creator::InvertedIndexCreator`
376    async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> {
377        let mut guard = self.stats.record_finish();
378
379        for (id, creator) in &mut self.creators {
380            let written_bytes = Self::do_finish_single_creator(id, creator, puffin_writer).await?;
381            guard.inc_byte_count(written_bytes);
382        }
383
384        Ok(())
385    }
386
387    async fn do_cleanup(&mut self) -> Result<()> {
388        let mut _guard = self.stats.record_cleanup();
389
390        self.creators.clear();
391        self.temp_file_provider.cleanup().await
392    }
393
394    /// Data flow of finishing index:
395    ///
396    /// ```text
397    ///                               (In Memory Buffer)
398    ///                                    ┌──────┐
399    ///  ┌─────────────┐                   │ PIPE │
400    ///  │             │ write index data  │      │
401    ///  │ IndexWriter ├──────────────────►│ tx   │
402    ///  │             │                   │      │
403    ///  └─────────────┘                   │      │
404    ///                  ┌─────────────────┤ rx   │
405    ///  ┌─────────────┐ │ read as blob    └──────┘
406    ///  │             │ │
407    ///  │ PuffinWriter├─┤
408    ///  │             │ │ copy to file    ┌──────┐
409    ///  └─────────────┘ └────────────────►│ File │
410    ///                                    └──────┘
411    /// ```
412    ///
413    /// TODO(zhongzc): duplicate with `mito2::sst::index::inverted_index::creator::InvertedIndexCreator`
414    async fn do_finish_single_creator(
415        col_id: &ColumnId,
416        creator: &mut BloomFilterCreator,
417        puffin_writer: &mut SstPuffinWriter,
418    ) -> Result<ByteCount> {
419        let (tx, rx) = tokio::io::duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);
420
421        let target_key = IndexTarget::ColumnId(*col_id);
422        let blob_name = format!("{INDEX_BLOB_TYPE}-{target_key}");
423        let (index_finish, puffin_add_blob) = futures::join!(
424            creator.finish(tx.compat_write()),
425            puffin_writer.put_blob(
426                &blob_name,
427                rx.compat(),
428                PutOptions::default(),
429                Default::default(),
430            )
431        );
432
433        match (
434            puffin_add_blob.context(PuffinAddBlobSnafu),
435            index_finish.context(BloomFilterFinishSnafu),
436        ) {
437            (Err(e1), Err(e2)) => BiErrorsSnafu {
438                first: Box::new(e1),
439                second: Box::new(e2),
440            }
441            .fail()?,
442
443            (Ok(_), e @ Err(_)) => e?,
444            (e @ Err(_), Ok(_)) => e.map(|_| ())?,
445            (Ok(written_bytes), Ok(_)) => {
446                return Ok(written_bytes);
447            }
448        }
449
450        Ok(0)
451    }
452
453    /// Returns the memory usage of the indexer.
454    pub fn memory_usage(&self) -> usize {
455        self.global_memory_usage
456            .load(std::sync::atomic::Ordering::Relaxed)
457    }
458
459    /// Returns the column ids to be indexed.
460    pub fn column_ids(&self) -> impl Iterator<Item = ColumnId> + use<'_> {
461        self.creators.keys().copied()
462    }
463}
464
465#[cfg(test)]
466pub(crate) mod tests {
467
468    use api::v1::SemanticType;
469    use datatypes::data_type::ConcreteDataType;
470    use datatypes::schema::{ColumnSchema, SkippingIndexOptions};
471    use datatypes::value::ValueRef;
472    use datatypes::vectors::{UInt8Vector, UInt64Vector};
473    use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl};
474    use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
475    use object_store::ObjectStore;
476    use object_store::services::Memory;
477    use puffin::puffin_manager::{PuffinManager, PuffinReader};
478    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
479    use store_api::storage::RegionId;
480
481    use super::*;
482    use crate::access_layer::FilePathProvider;
483    use crate::read::BatchColumn;
484    use crate::sst::file::RegionFileId;
485    use crate::sst::index::puffin_manager::PuffinManagerFactory;
486
487    pub fn mock_object_store() -> ObjectStore {
488        ObjectStore::new(Memory::default()).unwrap().finish()
489    }
490
491    pub async fn new_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
492        IntermediateManager::init_fs(path).await.unwrap()
493    }
494
495    pub struct TestPathProvider;
496
497    impl FilePathProvider for TestPathProvider {
498        fn build_index_file_path(&self, file_id: RegionFileId) -> String {
499            file_id.file_id().to_string()
500        }
501
502        fn build_sst_file_path(&self, file_id: RegionFileId) -> String {
503            file_id.file_id().to_string()
504        }
505    }
506
507    /// tag_str:
508    ///   - type: string
509    ///   - index: bloom filter
510    ///   - granularity: 2
511    ///   - column_id: 1
512    ///
513    /// ts:
514    ///   - type: timestamp
515    ///   - index: time index
516    ///   - column_id: 2
517    ///
518    /// field_u64:
519    ///   - type: uint64
520    ///   - index: bloom filter
521    ///   - granularity: 4
522    ///   - column_id: 3
523    pub fn mock_region_metadata() -> RegionMetadataRef {
524        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
525        builder
526            .push_column_metadata(ColumnMetadata {
527                column_schema: ColumnSchema::new(
528                    "tag_str",
529                    ConcreteDataType::string_datatype(),
530                    false,
531                )
532                .with_skipping_options(SkippingIndexOptions::new_unchecked(
533                    2,
534                    0.01,
535                    SkippingIndexType::BloomFilter,
536                ))
537                .unwrap(),
538                semantic_type: SemanticType::Tag,
539                column_id: 1,
540            })
541            .push_column_metadata(ColumnMetadata {
542                column_schema: ColumnSchema::new(
543                    "ts",
544                    ConcreteDataType::timestamp_millisecond_datatype(),
545                    false,
546                ),
547                semantic_type: SemanticType::Timestamp,
548                column_id: 2,
549            })
550            .push_column_metadata(ColumnMetadata {
551                column_schema: ColumnSchema::new(
552                    "field_u64",
553                    ConcreteDataType::uint64_datatype(),
554                    false,
555                )
556                .with_skipping_options(SkippingIndexOptions::new_unchecked(
557                    4,
558                    0.01,
559                    SkippingIndexType::BloomFilter,
560                ))
561                .unwrap(),
562                semantic_type: SemanticType::Field,
563                column_id: 3,
564            })
565            .primary_key(vec![1]);
566
567        Arc::new(builder.build().unwrap())
568    }
569
570    pub fn new_batch(str_tag: impl AsRef<str>, u64_field: impl IntoIterator<Item = u64>) -> Batch {
571        let fields = vec![(0, SortField::new(ConcreteDataType::string_datatype()))];
572        let codec = DensePrimaryKeyCodec::with_fields(fields);
573        let row: [ValueRef; 1] = [str_tag.as_ref().into()];
574        let primary_key = codec.encode(row.into_iter()).unwrap();
575
576        let u64_field = BatchColumn {
577            column_id: 3,
578            data: Arc::new(UInt64Vector::from_iter_values(u64_field)),
579        };
580        let num_rows = u64_field.data.len();
581
582        Batch::new(
583            primary_key,
584            Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
585                0, num_rows,
586            ))),
587            Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
588                0, num_rows,
589            ))),
590            Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(
591                1, num_rows,
592            ))),
593            vec![u64_field],
594        )
595        .unwrap()
596    }
597
598    #[tokio::test]
599    async fn test_bloom_filter_indexer() {
600        let prefix = "test_bloom_filter_indexer_";
601        let tempdir = common_test_util::temp_dir::create_temp_dir(prefix);
602        let object_store = mock_object_store();
603        let intm_mgr = new_intm_mgr(tempdir.path().to_string_lossy()).await;
604        let region_metadata = mock_region_metadata();
605        let memory_usage_threshold = Some(1024);
606
607        let file_id = FileId::random();
608        let mut indexer =
609            BloomFilterIndexer::new(file_id, &region_metadata, intm_mgr, memory_usage_threshold)
610                .unwrap()
611                .unwrap();
612
613        // push 20 rows
614        let mut batch = new_batch("tag1", 0..10);
615        indexer.update(&mut batch).await.unwrap();
616
617        let mut batch = new_batch("tag2", 10..20);
618        indexer.update(&mut batch).await.unwrap();
619
620        let (_d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
621        let puffin_manager = factory.build(object_store, TestPathProvider);
622
623        let file_id = RegionFileId::new(region_metadata.region_id, file_id);
624        let mut puffin_writer = puffin_manager.writer(&file_id).await.unwrap();
625        let (row_count, byte_count) = indexer.finish(&mut puffin_writer).await.unwrap();
626        assert_eq!(row_count, 20);
627        assert!(byte_count > 0);
628        puffin_writer.finish().await.unwrap();
629
630        let puffin_reader = puffin_manager.reader(&file_id).await.unwrap();
631
632        // tag_str
633        {
634            let blob_guard = puffin_reader
635                .blob("greptime-bloom-filter-v1-1")
636                .await
637                .unwrap();
638            let reader = blob_guard.reader().await.unwrap();
639            let bloom_filter = BloomFilterReaderImpl::new(reader);
640            let metadata = bloom_filter.metadata().await.unwrap();
641
642            assert_eq!(metadata.segment_count, 10);
643            for i in 0..5 {
644                let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[i] as usize];
645                let bf = bloom_filter.bloom_filter(loc).await.unwrap();
646                assert!(bf.contains(b"tag1"));
647            }
648            for i in 5..10 {
649                let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[i] as usize];
650                let bf = bloom_filter.bloom_filter(loc).await.unwrap();
651                assert!(bf.contains(b"tag2"));
652            }
653        }
654
655        // field_u64
656        {
657            let sort_field = SortField::new(ConcreteDataType::uint64_datatype());
658
659            let blob_guard = puffin_reader
660                .blob("greptime-bloom-filter-v1-3")
661                .await
662                .unwrap();
663            let reader = blob_guard.reader().await.unwrap();
664            let bloom_filter = BloomFilterReaderImpl::new(reader);
665            let metadata = bloom_filter.metadata().await.unwrap();
666
667            assert_eq!(metadata.segment_count, 5);
668            for i in 0u64..20 {
669                let idx = i as usize / 4;
670                let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[idx] as usize];
671                let bf = bloom_filter.bloom_filter(loc).await.unwrap();
672                let mut buf = vec![];
673                IndexValueCodec::encode_nonnull_value(ValueRef::UInt64(i), &sort_field, &mut buf)
674                    .unwrap();
675
676                assert!(bf.contains(&buf));
677            }
678        }
679    }
680}