mito2/sst/index/bloom_filter/
creator.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::sync::atomic::AtomicUsize;
18
19use common_telemetry::{debug, warn};
20use datatypes::arrow::record_batch::RecordBatch;
21use datatypes::schema::SkippingIndexType;
22use datatypes::vectors::Helper;
23use index::bloom_filter::creator::BloomFilterCreator;
24use mito_codec::index::{IndexValueCodec, IndexValuesCodec};
25use mito_codec::row_converter::SortField;
26use puffin::puffin_manager::{PuffinWriter, PutOptions};
27use snafu::{ResultExt, ensure};
28use store_api::metadata::RegionMetadataRef;
29use store_api::storage::{ColumnId, FileId};
30use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
31
32use crate::error::{
33    BiErrorsSnafu, BloomFilterFinishSnafu, EncodeSnafu, IndexOptionsSnafu,
34    OperateAbortedIndexSnafu, PuffinAddBlobSnafu, PushBloomFilterValueSnafu, Result,
35};
36use crate::read::Batch;
37use crate::sst::index::TYPE_BLOOM_FILTER_INDEX;
38use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
39use crate::sst::index::intermediate::{
40    IntermediateLocation, IntermediateManager, TempFileProvider,
41};
42use crate::sst::index::puffin_manager::SstPuffinWriter;
43use crate::sst::index::statistics::{ByteCount, RowCount, Statistics};
44
45/// The buffer size for the pipe used to send index data to the puffin blob.
46const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
47
48/// The indexer for the bloom filter index.
49pub struct BloomFilterIndexer {
50    /// The bloom filter creators.
51    creators: HashMap<ColumnId, BloomFilterCreator>,
52
53    /// The provider for intermediate files.
54    temp_file_provider: Arc<TempFileProvider>,
55
56    /// Codec for decoding primary keys.
57    codec: IndexValuesCodec,
58
59    /// Whether the indexing process has been aborted.
60    aborted: bool,
61
62    /// The statistics of the indexer.
63    stats: Statistics,
64
65    /// The global memory usage.
66    global_memory_usage: Arc<AtomicUsize>,
67
68    /// Region metadata for column lookups.
69    metadata: RegionMetadataRef,
70}
71
72impl BloomFilterIndexer {
73    /// Creates a new bloom filter indexer.
74    pub fn new(
75        sst_file_id: FileId,
76        metadata: &RegionMetadataRef,
77        intermediate_manager: IntermediateManager,
78        memory_usage_threshold: Option<usize>,
79    ) -> Result<Option<Self>> {
80        let mut creators = HashMap::new();
81
82        let temp_file_provider = Arc::new(TempFileProvider::new(
83            IntermediateLocation::new(&metadata.region_id, &sst_file_id),
84            intermediate_manager,
85        ));
86        let global_memory_usage = Arc::new(AtomicUsize::new(0));
87
88        for column in &metadata.column_metadatas {
89            let options =
90                column
91                    .column_schema
92                    .skipping_index_options()
93                    .context(IndexOptionsSnafu {
94                        column_name: &column.column_schema.name,
95                    })?;
96
97            let options = match options {
98                Some(options) if options.index_type == SkippingIndexType::BloomFilter => options,
99                _ => continue,
100            };
101
102            let creator = BloomFilterCreator::new(
103                options.granularity as _,
104                options.false_positive_rate(),
105                temp_file_provider.clone(),
106                global_memory_usage.clone(),
107                memory_usage_threshold,
108            );
109            creators.insert(column.column_id, creator);
110        }
111
112        if creators.is_empty() {
113            return Ok(None);
114        }
115
116        let codec = IndexValuesCodec::from_tag_columns(
117            metadata.primary_key_encoding,
118            metadata.primary_key_columns(),
119        );
120        let indexer = Self {
121            creators,
122            temp_file_provider,
123            codec,
124            aborted: false,
125            stats: Statistics::new(TYPE_BLOOM_FILTER_INDEX),
126            global_memory_usage,
127            metadata: metadata.clone(),
128        };
129        Ok(Some(indexer))
130    }
131
132    /// Updates index with a batch of rows.
133    /// Garbage will be cleaned up if failed to update.
134    ///
135    /// TODO(zhongzc): duplicate with `mito2::sst::index::inverted_index::creator::InvertedIndexCreator`
136    pub async fn update(&mut self, batch: &mut Batch) -> Result<()> {
137        ensure!(!self.aborted, OperateAbortedIndexSnafu);
138
139        if self.creators.is_empty() {
140            return Ok(());
141        }
142
143        if let Err(update_err) = self.do_update(batch).await {
144            // clean up garbage if failed to update
145            if let Err(err) = self.do_cleanup().await {
146                if cfg!(any(test, feature = "test")) {
147                    panic!("Failed to clean up index creator, err: {err:?}",);
148                } else {
149                    warn!(err; "Failed to clean up index creator");
150                }
151            }
152            return Err(update_err);
153        }
154
155        Ok(())
156    }
157
158    /// Updates the bloom filter index with the given flat format RecordBatch.
159    pub async fn update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
160        ensure!(!self.aborted, OperateAbortedIndexSnafu);
161
162        if self.creators.is_empty() || batch.num_rows() == 0 {
163            return Ok(());
164        }
165
166        if let Err(update_err) = self.do_update_flat(batch).await {
167            // clean up garbage if failed to update
168            if let Err(err) = self.do_cleanup().await {
169                if cfg!(any(test, feature = "test")) {
170                    panic!("Failed to clean up index creator, err: {err:?}",);
171                } else {
172                    warn!(err; "Failed to clean up index creator");
173                }
174            }
175            return Err(update_err);
176        }
177
178        Ok(())
179    }
180
181    /// Finishes index creation and cleans up garbage.
182    /// Returns the number of rows and bytes written.
183    ///
184    /// TODO(zhongzc): duplicate with `mito2::sst::index::inverted_index::creator::InvertedIndexCreator`
185    pub async fn finish(
186        &mut self,
187        puffin_writer: &mut SstPuffinWriter,
188    ) -> Result<(RowCount, ByteCount)> {
189        ensure!(!self.aborted, OperateAbortedIndexSnafu);
190
191        if self.stats.row_count() == 0 {
192            // no IO is performed, no garbage to clean up, just return
193            return Ok((0, 0));
194        }
195
196        let finish_res = self.do_finish(puffin_writer).await;
197        // clean up garbage no matter finish successfully or not
198        if let Err(err) = self.do_cleanup().await {
199            if cfg!(any(test, feature = "test")) {
200                panic!("Failed to clean up index creator, err: {err:?}",);
201            } else {
202                warn!(err; "Failed to clean up index creator");
203            }
204        }
205
206        finish_res.map(|_| (self.stats.row_count(), self.stats.byte_count()))
207    }
208
209    /// Aborts index creation and clean up garbage.
210    ///
211    /// TODO(zhongzc): duplicate with `mito2::sst::index::inverted_index::creator::InvertedIndexCreator`
212    pub async fn abort(&mut self) -> Result<()> {
213        if self.aborted {
214            return Ok(());
215        }
216        self.aborted = true;
217
218        self.do_cleanup().await
219    }
220
221    async fn do_update(&mut self, batch: &mut Batch) -> Result<()> {
222        let mut guard = self.stats.record_update();
223
224        let n = batch.num_rows();
225        guard.inc_row_count(n);
226
227        for (col_id, creator) in &mut self.creators {
228            match self.codec.pk_col_info(*col_id) {
229                // tags
230                Some(col_info) => {
231                    let pk_idx = col_info.idx;
232                    let field = &col_info.field;
233                    let elems = batch
234                        .pk_col_value(self.codec.decoder(), pk_idx, *col_id)?
235                        .filter(|v| !v.is_null())
236                        .map(|v| {
237                            let mut buf = vec![];
238                            IndexValueCodec::encode_nonnull_value(
239                                v.as_value_ref(),
240                                field,
241                                &mut buf,
242                            )
243                            .context(EncodeSnafu)?;
244                            Ok(buf)
245                        })
246                        .transpose()?;
247                    creator
248                        .push_n_row_elems(n, elems)
249                        .await
250                        .context(PushBloomFilterValueSnafu)?;
251                }
252                // fields
253                None => {
254                    let Some(values) = batch.field_col_value(*col_id) else {
255                        debug!(
256                            "Column {} not found in the batch during building bloom filter index",
257                            col_id
258                        );
259                        continue;
260                    };
261                    let sort_field = SortField::new(values.data.data_type());
262                    for i in 0..n {
263                        let value = values.data.get_ref(i);
264                        let elems = (!value.is_null())
265                            .then(|| {
266                                let mut buf = vec![];
267                                IndexValueCodec::encode_nonnull_value(value, &sort_field, &mut buf)
268                                    .context(EncodeSnafu)?;
269                                Ok(buf)
270                            })
271                            .transpose()?;
272
273                        creator
274                            .push_row_elems(elems)
275                            .await
276                            .context(PushBloomFilterValueSnafu)?;
277                    }
278                }
279            }
280        }
281
282        Ok(())
283    }
284
285    async fn do_update_flat(&mut self, batch: &RecordBatch) -> Result<()> {
286        let mut guard = self.stats.record_update();
287
288        let n = batch.num_rows();
289        guard.inc_row_count(n);
290
291        for (col_id, creator) in &mut self.creators {
292            // Get the column name from metadata
293            if let Some(column_meta) = self.metadata.column_by_id(*col_id) {
294                let column_name = &column_meta.column_schema.name;
295
296                // Find the column in the RecordBatch by name
297                if let Some(column_array) = batch.column_by_name(column_name) {
298                    // Convert Arrow array to VectorRef
299                    let vector = Helper::try_into_vector(column_array.clone())
300                        .context(crate::error::ConvertVectorSnafu)?;
301                    let sort_field = SortField::new(vector.data_type());
302
303                    for i in 0..n {
304                        let value = vector.get_ref(i);
305                        let elems = (!value.is_null())
306                            .then(|| {
307                                let mut buf = vec![];
308                                IndexValueCodec::encode_nonnull_value(value, &sort_field, &mut buf)
309                                    .context(EncodeSnafu)?;
310                                Ok(buf)
311                            })
312                            .transpose()?;
313
314                        creator
315                            .push_row_elems(elems)
316                            .await
317                            .context(PushBloomFilterValueSnafu)?;
318                    }
319                } else {
320                    debug!(
321                        "Column {} not found in the batch during building bloom filter index",
322                        column_name
323                    );
324                    // Push empty elements to maintain alignment
325                    for _ in 0..n {
326                        creator
327                            .push_row_elems(None)
328                            .await
329                            .context(PushBloomFilterValueSnafu)?;
330                    }
331                }
332            }
333        }
334
335        Ok(())
336    }
337
338    /// TODO(zhongzc): duplicate with `mito2::sst::index::inverted_index::creator::InvertedIndexCreator`
339    async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> {
340        let mut guard = self.stats.record_finish();
341
342        for (id, creator) in &mut self.creators {
343            let written_bytes = Self::do_finish_single_creator(id, creator, puffin_writer).await?;
344            guard.inc_byte_count(written_bytes);
345        }
346
347        Ok(())
348    }
349
350    async fn do_cleanup(&mut self) -> Result<()> {
351        let mut _guard = self.stats.record_cleanup();
352
353        self.creators.clear();
354        self.temp_file_provider.cleanup().await
355    }
356
357    /// Data flow of finishing index:
358    ///
359    /// ```text
360    ///                               (In Memory Buffer)
361    ///                                    ┌──────┐
362    ///  ┌─────────────┐                   │ PIPE │
363    ///  │             │ write index data  │      │
364    ///  │ IndexWriter ├──────────────────►│ tx   │
365    ///  │             │                   │      │
366    ///  └─────────────┘                   │      │
367    ///                  ┌─────────────────┤ rx   │
368    ///  ┌─────────────┐ │ read as blob    └──────┘
369    ///  │             │ │
370    ///  │ PuffinWriter├─┤
371    ///  │             │ │ copy to file    ┌──────┐
372    ///  └─────────────┘ └────────────────►│ File │
373    ///                                    └──────┘
374    /// ```
375    ///
376    /// TODO(zhongzc): duplicate with `mito2::sst::index::inverted_index::creator::InvertedIndexCreator`
377    async fn do_finish_single_creator(
378        col_id: &ColumnId,
379        creator: &mut BloomFilterCreator,
380        puffin_writer: &mut SstPuffinWriter,
381    ) -> Result<ByteCount> {
382        let (tx, rx) = tokio::io::duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);
383
384        let blob_name = format!("{}-{}", INDEX_BLOB_TYPE, col_id);
385        let (index_finish, puffin_add_blob) = futures::join!(
386            creator.finish(tx.compat_write()),
387            puffin_writer.put_blob(
388                &blob_name,
389                rx.compat(),
390                PutOptions::default(),
391                Default::default(),
392            )
393        );
394
395        match (
396            puffin_add_blob.context(PuffinAddBlobSnafu),
397            index_finish.context(BloomFilterFinishSnafu),
398        ) {
399            (Err(e1), Err(e2)) => BiErrorsSnafu {
400                first: Box::new(e1),
401                second: Box::new(e2),
402            }
403            .fail()?,
404
405            (Ok(_), e @ Err(_)) => e?,
406            (e @ Err(_), Ok(_)) => e.map(|_| ())?,
407            (Ok(written_bytes), Ok(_)) => {
408                return Ok(written_bytes);
409            }
410        }
411
412        Ok(0)
413    }
414
415    /// Returns the memory usage of the indexer.
416    pub fn memory_usage(&self) -> usize {
417        self.global_memory_usage
418            .load(std::sync::atomic::Ordering::Relaxed)
419    }
420
421    /// Returns the column ids to be indexed.
422    pub fn column_ids(&self) -> impl Iterator<Item = ColumnId> + use<'_> {
423        self.creators.keys().copied()
424    }
425}
426
427#[cfg(test)]
428pub(crate) mod tests {
429
430    use api::v1::SemanticType;
431    use datatypes::data_type::ConcreteDataType;
432    use datatypes::schema::{ColumnSchema, SkippingIndexOptions};
433    use datatypes::value::ValueRef;
434    use datatypes::vectors::{UInt8Vector, UInt64Vector};
435    use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl};
436    use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
437    use object_store::ObjectStore;
438    use object_store::services::Memory;
439    use puffin::puffin_manager::{PuffinManager, PuffinReader};
440    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
441    use store_api::storage::RegionId;
442
443    use super::*;
444    use crate::access_layer::FilePathProvider;
445    use crate::read::BatchColumn;
446    use crate::sst::file::RegionFileId;
447    use crate::sst::index::puffin_manager::PuffinManagerFactory;
448
449    pub fn mock_object_store() -> ObjectStore {
450        ObjectStore::new(Memory::default()).unwrap().finish()
451    }
452
453    pub async fn new_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
454        IntermediateManager::init_fs(path).await.unwrap()
455    }
456
457    pub struct TestPathProvider;
458
459    impl FilePathProvider for TestPathProvider {
460        fn build_index_file_path(&self, file_id: RegionFileId) -> String {
461            file_id.file_id().to_string()
462        }
463
464        fn build_sst_file_path(&self, file_id: RegionFileId) -> String {
465            file_id.file_id().to_string()
466        }
467    }
468
469    /// tag_str:
470    ///   - type: string
471    ///   - index: bloom filter
472    ///   - granularity: 2
473    ///   - column_id: 1
474    ///
475    /// ts:
476    ///   - type: timestamp
477    ///   - index: time index
478    ///   - column_id: 2
479    ///
480    /// field_u64:
481    ///   - type: uint64
482    ///   - index: bloom filter
483    ///   - granularity: 4
484    ///   - column_id: 3
485    pub fn mock_region_metadata() -> RegionMetadataRef {
486        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
487        builder
488            .push_column_metadata(ColumnMetadata {
489                column_schema: ColumnSchema::new(
490                    "tag_str",
491                    ConcreteDataType::string_datatype(),
492                    false,
493                )
494                .with_skipping_options(SkippingIndexOptions::new_unchecked(
495                    2,
496                    0.01,
497                    SkippingIndexType::BloomFilter,
498                ))
499                .unwrap(),
500                semantic_type: SemanticType::Tag,
501                column_id: 1,
502            })
503            .push_column_metadata(ColumnMetadata {
504                column_schema: ColumnSchema::new(
505                    "ts",
506                    ConcreteDataType::timestamp_millisecond_datatype(),
507                    false,
508                ),
509                semantic_type: SemanticType::Timestamp,
510                column_id: 2,
511            })
512            .push_column_metadata(ColumnMetadata {
513                column_schema: ColumnSchema::new(
514                    "field_u64",
515                    ConcreteDataType::uint64_datatype(),
516                    false,
517                )
518                .with_skipping_options(SkippingIndexOptions::new_unchecked(
519                    4,
520                    0.01,
521                    SkippingIndexType::BloomFilter,
522                ))
523                .unwrap(),
524                semantic_type: SemanticType::Field,
525                column_id: 3,
526            })
527            .primary_key(vec![1]);
528
529        Arc::new(builder.build().unwrap())
530    }
531
532    pub fn new_batch(str_tag: impl AsRef<str>, u64_field: impl IntoIterator<Item = u64>) -> Batch {
533        let fields = vec![(0, SortField::new(ConcreteDataType::string_datatype()))];
534        let codec = DensePrimaryKeyCodec::with_fields(fields);
535        let row: [ValueRef; 1] = [str_tag.as_ref().into()];
536        let primary_key = codec.encode(row.into_iter()).unwrap();
537
538        let u64_field = BatchColumn {
539            column_id: 3,
540            data: Arc::new(UInt64Vector::from_iter_values(u64_field)),
541        };
542        let num_rows = u64_field.data.len();
543
544        Batch::new(
545            primary_key,
546            Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
547                0, num_rows,
548            ))),
549            Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
550                0, num_rows,
551            ))),
552            Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(
553                1, num_rows,
554            ))),
555            vec![u64_field],
556        )
557        .unwrap()
558    }
559
560    #[tokio::test]
561    async fn test_bloom_filter_indexer() {
562        let prefix = "test_bloom_filter_indexer_";
563        let tempdir = common_test_util::temp_dir::create_temp_dir(prefix);
564        let object_store = mock_object_store();
565        let intm_mgr = new_intm_mgr(tempdir.path().to_string_lossy()).await;
566        let region_metadata = mock_region_metadata();
567        let memory_usage_threshold = Some(1024);
568
569        let file_id = FileId::random();
570        let mut indexer =
571            BloomFilterIndexer::new(file_id, &region_metadata, intm_mgr, memory_usage_threshold)
572                .unwrap()
573                .unwrap();
574
575        // push 20 rows
576        let mut batch = new_batch("tag1", 0..10);
577        indexer.update(&mut batch).await.unwrap();
578
579        let mut batch = new_batch("tag2", 10..20);
580        indexer.update(&mut batch).await.unwrap();
581
582        let (_d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
583        let puffin_manager = factory.build(object_store, TestPathProvider);
584
585        let file_id = RegionFileId::new(region_metadata.region_id, file_id);
586        let mut puffin_writer = puffin_manager.writer(&file_id).await.unwrap();
587        let (row_count, byte_count) = indexer.finish(&mut puffin_writer).await.unwrap();
588        assert_eq!(row_count, 20);
589        assert!(byte_count > 0);
590        puffin_writer.finish().await.unwrap();
591
592        let puffin_reader = puffin_manager.reader(&file_id).await.unwrap();
593
594        // tag_str
595        {
596            let blob_guard = puffin_reader
597                .blob("greptime-bloom-filter-v1-1")
598                .await
599                .unwrap();
600            let reader = blob_guard.reader().await.unwrap();
601            let bloom_filter = BloomFilterReaderImpl::new(reader);
602            let metadata = bloom_filter.metadata().await.unwrap();
603
604            assert_eq!(metadata.segment_count, 10);
605            for i in 0..5 {
606                let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[i] as usize];
607                let bf = bloom_filter.bloom_filter(loc).await.unwrap();
608                assert!(bf.contains(b"tag1"));
609            }
610            for i in 5..10 {
611                let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[i] as usize];
612                let bf = bloom_filter.bloom_filter(loc).await.unwrap();
613                assert!(bf.contains(b"tag2"));
614            }
615        }
616
617        // field_u64
618        {
619            let sort_field = SortField::new(ConcreteDataType::uint64_datatype());
620
621            let blob_guard = puffin_reader
622                .blob("greptime-bloom-filter-v1-3")
623                .await
624                .unwrap();
625            let reader = blob_guard.reader().await.unwrap();
626            let bloom_filter = BloomFilterReaderImpl::new(reader);
627            let metadata = bloom_filter.metadata().await.unwrap();
628
629            assert_eq!(metadata.segment_count, 5);
630            for i in 0u64..20 {
631                let idx = i as usize / 4;
632                let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[idx] as usize];
633                let bf = bloom_filter.bloom_filter(loc).await.unwrap();
634                let mut buf = vec![];
635                IndexValueCodec::encode_nonnull_value(ValueRef::UInt64(i), &sort_field, &mut buf)
636                    .unwrap();
637
638                assert!(bf.contains(&buf));
639            }
640        }
641    }
642}