mito2/sst/index/bloom_filter/
creator.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::atomic::AtomicUsize;
17use std::sync::Arc;
18
19use common_telemetry::{debug, warn};
20use datatypes::schema::SkippingIndexType;
21use index::bloom_filter::creator::BloomFilterCreator;
22use mito_codec::index::{IndexValueCodec, IndexValuesCodec};
23use mito_codec::row_converter::SortField;
24use puffin::puffin_manager::{PuffinWriter, PutOptions};
25use snafu::{ensure, ResultExt};
26use store_api::metadata::RegionMetadataRef;
27use store_api::storage::ColumnId;
28use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
29
30use crate::error::{
31    BiErrorsSnafu, BloomFilterFinishSnafu, EncodeSnafu, IndexOptionsSnafu,
32    OperateAbortedIndexSnafu, PuffinAddBlobSnafu, PushBloomFilterValueSnafu, Result,
33};
34use crate::read::Batch;
35use crate::sst::file::FileId;
36use crate::sst::index::bloom_filter::INDEX_BLOB_TYPE;
37use crate::sst::index::intermediate::{
38    IntermediateLocation, IntermediateManager, TempFileProvider,
39};
40use crate::sst::index::puffin_manager::SstPuffinWriter;
41use crate::sst::index::statistics::{ByteCount, RowCount, Statistics};
42use crate::sst::index::TYPE_BLOOM_FILTER_INDEX;
43
44/// The buffer size for the pipe used to send index data to the puffin blob.
45const PIPE_BUFFER_SIZE_FOR_SENDING_BLOB: usize = 8192;
46
47/// The indexer for the bloom filter index.
48pub struct BloomFilterIndexer {
49    /// The bloom filter creators.
50    creators: HashMap<ColumnId, BloomFilterCreator>,
51
52    /// The provider for intermediate files.
53    temp_file_provider: Arc<TempFileProvider>,
54
55    /// Codec for decoding primary keys.
56    codec: IndexValuesCodec,
57
58    /// Whether the indexing process has been aborted.
59    aborted: bool,
60
61    /// The statistics of the indexer.
62    stats: Statistics,
63
64    /// The global memory usage.
65    global_memory_usage: Arc<AtomicUsize>,
66}
67
68impl BloomFilterIndexer {
69    /// Creates a new bloom filter indexer.
70    pub fn new(
71        sst_file_id: FileId,
72        metadata: &RegionMetadataRef,
73        intermediate_manager: IntermediateManager,
74        memory_usage_threshold: Option<usize>,
75    ) -> Result<Option<Self>> {
76        let mut creators = HashMap::new();
77
78        let temp_file_provider = Arc::new(TempFileProvider::new(
79            IntermediateLocation::new(&metadata.region_id, &sst_file_id),
80            intermediate_manager,
81        ));
82        let global_memory_usage = Arc::new(AtomicUsize::new(0));
83
84        for column in &metadata.column_metadatas {
85            let options =
86                column
87                    .column_schema
88                    .skipping_index_options()
89                    .context(IndexOptionsSnafu {
90                        column_name: &column.column_schema.name,
91                    })?;
92
93            let options = match options {
94                Some(options) if options.index_type == SkippingIndexType::BloomFilter => options,
95                _ => continue,
96            };
97
98            let creator = BloomFilterCreator::new(
99                options.granularity as _,
100                options.false_positive_rate(),
101                temp_file_provider.clone(),
102                global_memory_usage.clone(),
103                memory_usage_threshold,
104            );
105            creators.insert(column.column_id, creator);
106        }
107
108        if creators.is_empty() {
109            return Ok(None);
110        }
111
112        let codec = IndexValuesCodec::from_tag_columns(
113            metadata.primary_key_encoding,
114            metadata.primary_key_columns(),
115        );
116        let indexer = Self {
117            creators,
118            temp_file_provider,
119            codec,
120            aborted: false,
121            stats: Statistics::new(TYPE_BLOOM_FILTER_INDEX),
122            global_memory_usage,
123        };
124        Ok(Some(indexer))
125    }
126
127    /// Updates index with a batch of rows.
128    /// Garbage will be cleaned up if failed to update.
129    ///
130    /// TODO(zhongzc): duplicate with `mito2::sst::index::inverted_index::creator::InvertedIndexCreator`
131    pub async fn update(&mut self, batch: &mut Batch) -> Result<()> {
132        ensure!(!self.aborted, OperateAbortedIndexSnafu);
133
134        if self.creators.is_empty() {
135            return Ok(());
136        }
137
138        if let Err(update_err) = self.do_update(batch).await {
139            // clean up garbage if failed to update
140            if let Err(err) = self.do_cleanup().await {
141                if cfg!(any(test, feature = "test")) {
142                    panic!("Failed to clean up index creator, err: {err:?}",);
143                } else {
144                    warn!(err; "Failed to clean up index creator");
145                }
146            }
147            return Err(update_err);
148        }
149
150        Ok(())
151    }
152
153    /// Finishes index creation and cleans up garbage.
154    /// Returns the number of rows and bytes written.
155    ///
156    /// TODO(zhongzc): duplicate with `mito2::sst::index::inverted_index::creator::InvertedIndexCreator`
157    pub async fn finish(
158        &mut self,
159        puffin_writer: &mut SstPuffinWriter,
160    ) -> Result<(RowCount, ByteCount)> {
161        ensure!(!self.aborted, OperateAbortedIndexSnafu);
162
163        if self.stats.row_count() == 0 {
164            // no IO is performed, no garbage to clean up, just return
165            return Ok((0, 0));
166        }
167
168        let finish_res = self.do_finish(puffin_writer).await;
169        // clean up garbage no matter finish successfully or not
170        if let Err(err) = self.do_cleanup().await {
171            if cfg!(any(test, feature = "test")) {
172                panic!("Failed to clean up index creator, err: {err:?}",);
173            } else {
174                warn!(err; "Failed to clean up index creator");
175            }
176        }
177
178        finish_res.map(|_| (self.stats.row_count(), self.stats.byte_count()))
179    }
180
181    /// Aborts index creation and clean up garbage.
182    ///
183    /// TODO(zhongzc): duplicate with `mito2::sst::index::inverted_index::creator::InvertedIndexCreator`
184    pub async fn abort(&mut self) -> Result<()> {
185        if self.aborted {
186            return Ok(());
187        }
188        self.aborted = true;
189
190        self.do_cleanup().await
191    }
192
193    async fn do_update(&mut self, batch: &mut Batch) -> Result<()> {
194        let mut guard = self.stats.record_update();
195
196        let n = batch.num_rows();
197        guard.inc_row_count(n);
198
199        for (col_id, creator) in &mut self.creators {
200            match self.codec.pk_col_info(*col_id) {
201                // tags
202                Some(col_info) => {
203                    let pk_idx = col_info.idx;
204                    let field = &col_info.field;
205                    let elems = batch
206                        .pk_col_value(self.codec.decoder(), pk_idx, *col_id)?
207                        .filter(|v| !v.is_null())
208                        .map(|v| {
209                            let mut buf = vec![];
210                            IndexValueCodec::encode_nonnull_value(
211                                v.as_value_ref(),
212                                field,
213                                &mut buf,
214                            )
215                            .context(EncodeSnafu)?;
216                            Ok(buf)
217                        })
218                        .transpose()?;
219                    creator
220                        .push_n_row_elems(n, elems)
221                        .await
222                        .context(PushBloomFilterValueSnafu)?;
223                }
224                // fields
225                None => {
226                    let Some(values) = batch.field_col_value(*col_id) else {
227                        debug!(
228                            "Column {} not found in the batch during building bloom filter index",
229                            col_id
230                        );
231                        continue;
232                    };
233                    let sort_field = SortField::new(values.data.data_type());
234                    for i in 0..n {
235                        let value = values.data.get_ref(i);
236                        let elems = (!value.is_null())
237                            .then(|| {
238                                let mut buf = vec![];
239                                IndexValueCodec::encode_nonnull_value(value, &sort_field, &mut buf)
240                                    .context(EncodeSnafu)?;
241                                Ok(buf)
242                            })
243                            .transpose()?;
244
245                        creator
246                            .push_row_elems(elems)
247                            .await
248                            .context(PushBloomFilterValueSnafu)?;
249                    }
250                }
251            }
252        }
253
254        Ok(())
255    }
256
257    /// TODO(zhongzc): duplicate with `mito2::sst::index::inverted_index::creator::InvertedIndexCreator`
258    async fn do_finish(&mut self, puffin_writer: &mut SstPuffinWriter) -> Result<()> {
259        let mut guard = self.stats.record_finish();
260
261        for (id, creator) in &mut self.creators {
262            let written_bytes = Self::do_finish_single_creator(id, creator, puffin_writer).await?;
263            guard.inc_byte_count(written_bytes);
264        }
265
266        Ok(())
267    }
268
269    async fn do_cleanup(&mut self) -> Result<()> {
270        let mut _guard = self.stats.record_cleanup();
271
272        self.creators.clear();
273        self.temp_file_provider.cleanup().await
274    }
275
276    /// Data flow of finishing index:
277    ///
278    /// ```text
279    ///                               (In Memory Buffer)
280    ///                                    ┌──────┐
281    ///  ┌─────────────┐                   │ PIPE │
282    ///  │             │ write index data  │      │
283    ///  │ IndexWriter ├──────────────────►│ tx   │
284    ///  │             │                   │      │
285    ///  └─────────────┘                   │      │
286    ///                  ┌─────────────────┤ rx   │
287    ///  ┌─────────────┐ │ read as blob    └──────┘
288    ///  │             │ │
289    ///  │ PuffinWriter├─┤
290    ///  │             │ │ copy to file    ┌──────┐
291    ///  └─────────────┘ └────────────────►│ File │
292    ///                                    └──────┘
293    /// ```
294    ///
295    /// TODO(zhongzc): duplicate with `mito2::sst::index::inverted_index::creator::InvertedIndexCreator`
296    async fn do_finish_single_creator(
297        col_id: &ColumnId,
298        creator: &mut BloomFilterCreator,
299        puffin_writer: &mut SstPuffinWriter,
300    ) -> Result<ByteCount> {
301        let (tx, rx) = tokio::io::duplex(PIPE_BUFFER_SIZE_FOR_SENDING_BLOB);
302
303        let blob_name = format!("{}-{}", INDEX_BLOB_TYPE, col_id);
304        let (index_finish, puffin_add_blob) = futures::join!(
305            creator.finish(tx.compat_write()),
306            puffin_writer.put_blob(
307                &blob_name,
308                rx.compat(),
309                PutOptions::default(),
310                Default::default(),
311            )
312        );
313
314        match (
315            puffin_add_blob.context(PuffinAddBlobSnafu),
316            index_finish.context(BloomFilterFinishSnafu),
317        ) {
318            (Err(e1), Err(e2)) => BiErrorsSnafu {
319                first: Box::new(e1),
320                second: Box::new(e2),
321            }
322            .fail()?,
323
324            (Ok(_), e @ Err(_)) => e?,
325            (e @ Err(_), Ok(_)) => e.map(|_| ())?,
326            (Ok(written_bytes), Ok(_)) => {
327                return Ok(written_bytes);
328            }
329        }
330
331        Ok(0)
332    }
333
334    /// Returns the memory usage of the indexer.
335    pub fn memory_usage(&self) -> usize {
336        self.global_memory_usage
337            .load(std::sync::atomic::Ordering::Relaxed)
338    }
339
340    /// Returns the column ids to be indexed.
341    pub fn column_ids(&self) -> impl Iterator<Item = ColumnId> + use<'_> {
342        self.creators.keys().copied()
343    }
344}
345
346#[cfg(test)]
347pub(crate) mod tests {
348
349    use api::v1::SemanticType;
350    use datatypes::data_type::ConcreteDataType;
351    use datatypes::schema::{ColumnSchema, SkippingIndexOptions};
352    use datatypes::value::ValueRef;
353    use datatypes::vectors::{UInt64Vector, UInt8Vector};
354    use index::bloom_filter::reader::{BloomFilterReader, BloomFilterReaderImpl};
355    use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt};
356    use object_store::services::Memory;
357    use object_store::ObjectStore;
358    use puffin::puffin_manager::{PuffinManager, PuffinReader};
359    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
360    use store_api::storage::RegionId;
361
362    use super::*;
363    use crate::access_layer::FilePathProvider;
364    use crate::read::BatchColumn;
365    use crate::sst::file::RegionFileId;
366    use crate::sst::index::puffin_manager::PuffinManagerFactory;
367
368    pub fn mock_object_store() -> ObjectStore {
369        ObjectStore::new(Memory::default()).unwrap().finish()
370    }
371
372    pub async fn new_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
373        IntermediateManager::init_fs(path).await.unwrap()
374    }
375
376    pub struct TestPathProvider;
377
378    impl FilePathProvider for TestPathProvider {
379        fn build_index_file_path(&self, file_id: RegionFileId) -> String {
380            file_id.file_id().to_string()
381        }
382
383        fn build_sst_file_path(&self, file_id: RegionFileId) -> String {
384            file_id.file_id().to_string()
385        }
386    }
387
388    /// tag_str:
389    ///   - type: string
390    ///   - index: bloom filter
391    ///   - granularity: 2
392    ///   - column_id: 1
393    ///
394    /// ts:
395    ///   - type: timestamp
396    ///   - index: time index
397    ///   - column_id: 2
398    ///
399    /// field_u64:
400    ///   - type: uint64
401    ///   - index: bloom filter
402    ///   - granularity: 4
403    ///   - column_id: 3
404    pub fn mock_region_metadata() -> RegionMetadataRef {
405        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
406        builder
407            .push_column_metadata(ColumnMetadata {
408                column_schema: ColumnSchema::new(
409                    "tag_str",
410                    ConcreteDataType::string_datatype(),
411                    false,
412                )
413                .with_skipping_options(SkippingIndexOptions::new_unchecked(
414                    2,
415                    0.01,
416                    SkippingIndexType::BloomFilter,
417                ))
418                .unwrap(),
419                semantic_type: SemanticType::Tag,
420                column_id: 1,
421            })
422            .push_column_metadata(ColumnMetadata {
423                column_schema: ColumnSchema::new(
424                    "ts",
425                    ConcreteDataType::timestamp_millisecond_datatype(),
426                    false,
427                ),
428                semantic_type: SemanticType::Timestamp,
429                column_id: 2,
430            })
431            .push_column_metadata(ColumnMetadata {
432                column_schema: ColumnSchema::new(
433                    "field_u64",
434                    ConcreteDataType::uint64_datatype(),
435                    false,
436                )
437                .with_skipping_options(SkippingIndexOptions::new_unchecked(
438                    4,
439                    0.01,
440                    SkippingIndexType::BloomFilter,
441                ))
442                .unwrap(),
443                semantic_type: SemanticType::Field,
444                column_id: 3,
445            })
446            .primary_key(vec![1]);
447
448        Arc::new(builder.build().unwrap())
449    }
450
451    pub fn new_batch(str_tag: impl AsRef<str>, u64_field: impl IntoIterator<Item = u64>) -> Batch {
452        let fields = vec![(0, SortField::new(ConcreteDataType::string_datatype()))];
453        let codec = DensePrimaryKeyCodec::with_fields(fields);
454        let row: [ValueRef; 1] = [str_tag.as_ref().into()];
455        let primary_key = codec.encode(row.into_iter()).unwrap();
456
457        let u64_field = BatchColumn {
458            column_id: 3,
459            data: Arc::new(UInt64Vector::from_iter_values(u64_field)),
460        };
461        let num_rows = u64_field.data.len();
462
463        Batch::new(
464            primary_key,
465            Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
466                0, num_rows,
467            ))),
468            Arc::new(UInt64Vector::from_iter_values(std::iter::repeat_n(
469                0, num_rows,
470            ))),
471            Arc::new(UInt8Vector::from_iter_values(std::iter::repeat_n(
472                1, num_rows,
473            ))),
474            vec![u64_field],
475        )
476        .unwrap()
477    }
478
479    #[tokio::test]
480    async fn test_bloom_filter_indexer() {
481        let prefix = "test_bloom_filter_indexer_";
482        let tempdir = common_test_util::temp_dir::create_temp_dir(prefix);
483        let object_store = mock_object_store();
484        let intm_mgr = new_intm_mgr(tempdir.path().to_string_lossy()).await;
485        let region_metadata = mock_region_metadata();
486        let memory_usage_threshold = Some(1024);
487
488        let file_id = FileId::random();
489        let mut indexer =
490            BloomFilterIndexer::new(file_id, &region_metadata, intm_mgr, memory_usage_threshold)
491                .unwrap()
492                .unwrap();
493
494        // push 20 rows
495        let mut batch = new_batch("tag1", 0..10);
496        indexer.update(&mut batch).await.unwrap();
497
498        let mut batch = new_batch("tag2", 10..20);
499        indexer.update(&mut batch).await.unwrap();
500
501        let (_d, factory) = PuffinManagerFactory::new_for_test_async(prefix).await;
502        let puffin_manager = factory.build(object_store, TestPathProvider);
503
504        let file_id = RegionFileId::new(region_metadata.region_id, file_id);
505        let mut puffin_writer = puffin_manager.writer(&file_id).await.unwrap();
506        let (row_count, byte_count) = indexer.finish(&mut puffin_writer).await.unwrap();
507        assert_eq!(row_count, 20);
508        assert!(byte_count > 0);
509        puffin_writer.finish().await.unwrap();
510
511        let puffin_reader = puffin_manager.reader(&file_id).await.unwrap();
512
513        // tag_str
514        {
515            let blob_guard = puffin_reader
516                .blob("greptime-bloom-filter-v1-1")
517                .await
518                .unwrap();
519            let reader = blob_guard.reader().await.unwrap();
520            let bloom_filter = BloomFilterReaderImpl::new(reader);
521            let metadata = bloom_filter.metadata().await.unwrap();
522
523            assert_eq!(metadata.segment_count, 10);
524            for i in 0..5 {
525                let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[i] as usize];
526                let bf = bloom_filter.bloom_filter(loc).await.unwrap();
527                assert!(bf.contains(b"tag1"));
528            }
529            for i in 5..10 {
530                let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[i] as usize];
531                let bf = bloom_filter.bloom_filter(loc).await.unwrap();
532                assert!(bf.contains(b"tag2"));
533            }
534        }
535
536        // field_u64
537        {
538            let sort_field = SortField::new(ConcreteDataType::uint64_datatype());
539
540            let blob_guard = puffin_reader
541                .blob("greptime-bloom-filter-v1-3")
542                .await
543                .unwrap();
544            let reader = blob_guard.reader().await.unwrap();
545            let bloom_filter = BloomFilterReaderImpl::new(reader);
546            let metadata = bloom_filter.metadata().await.unwrap();
547
548            assert_eq!(metadata.segment_count, 5);
549            for i in 0u64..20 {
550                let idx = i as usize / 4;
551                let loc = &metadata.bloom_filter_locs[metadata.segment_loc_indices[idx] as usize];
552                let bf = bloom_filter.bloom_filter(loc).await.unwrap();
553                let mut buf = vec![];
554                IndexValueCodec::encode_nonnull_value(ValueRef::UInt64(i), &sort_field, &mut buf)
555                    .unwrap();
556
557                assert!(bf.contains(&buf));
558            }
559        }
560    }
561}