mito2/sst/
index.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod bloom_filter;
16pub(crate) mod fulltext_index;
17mod indexer;
18pub mod intermediate;
19pub(crate) mod inverted_index;
20pub mod puffin_manager;
21mod statistics;
22pub(crate) mod store;
23
24use std::num::NonZeroUsize;
25
26use bloom_filter::creator::BloomFilterIndexer;
27use common_telemetry::{debug, warn};
28use puffin_manager::SstPuffinManager;
29use smallvec::SmallVec;
30use statistics::{ByteCount, RowCount};
31use store_api::metadata::RegionMetadataRef;
32use store_api::storage::{ColumnId, RegionId};
33
34use crate::access_layer::OperationType;
35use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
36use crate::metrics::INDEX_CREATE_MEMORY_USAGE;
37use crate::read::Batch;
38use crate::region::options::IndexOptions;
39use crate::sst::file::{FileId, IndexType};
40use crate::sst::index::fulltext_index::creator::FulltextIndexer;
41use crate::sst::index::intermediate::IntermediateManager;
42use crate::sst::index::inverted_index::creator::InvertedIndexer;
43
44pub(crate) const TYPE_INVERTED_INDEX: &str = "inverted_index";
45pub(crate) const TYPE_FULLTEXT_INDEX: &str = "fulltext_index";
46pub(crate) const TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
47
48/// Output of the index creation.
49#[derive(Debug, Clone, Default)]
50pub struct IndexOutput {
51    /// Size of the file.
52    pub file_size: u64,
53    /// Inverted index output.
54    pub inverted_index: InvertedIndexOutput,
55    /// Fulltext index output.
56    pub fulltext_index: FulltextIndexOutput,
57    /// Bloom filter output.
58    pub bloom_filter: BloomFilterOutput,
59}
60
61impl IndexOutput {
62    pub fn build_available_indexes(&self) -> SmallVec<[IndexType; 4]> {
63        let mut indexes = SmallVec::new();
64        if self.inverted_index.is_available() {
65            indexes.push(IndexType::InvertedIndex);
66        }
67        if self.fulltext_index.is_available() {
68            indexes.push(IndexType::FulltextIndex);
69        }
70        if self.bloom_filter.is_available() {
71            indexes.push(IndexType::BloomFilterIndex);
72        }
73        indexes
74    }
75}
76
77/// Base output of the index creation.
78#[derive(Debug, Clone, Default)]
79pub struct IndexBaseOutput {
80    /// Size of the index.
81    pub index_size: ByteCount,
82    /// Number of rows in the index.
83    pub row_count: RowCount,
84    /// Available columns in the index.
85    pub columns: Vec<ColumnId>,
86}
87
88impl IndexBaseOutput {
89    pub fn is_available(&self) -> bool {
90        self.index_size > 0
91    }
92}
93
94/// Output of the inverted index creation.
95pub type InvertedIndexOutput = IndexBaseOutput;
96/// Output of the fulltext index creation.
97pub type FulltextIndexOutput = IndexBaseOutput;
98/// Output of the bloom filter creation.
99pub type BloomFilterOutput = IndexBaseOutput;
100
101/// The index creator that hides the error handling details.
102#[derive(Default)]
103pub struct Indexer {
104    file_id: FileId,
105    region_id: RegionId,
106    puffin_manager: Option<SstPuffinManager>,
107    inverted_indexer: Option<InvertedIndexer>,
108    last_mem_inverted_index: usize,
109    fulltext_indexer: Option<FulltextIndexer>,
110    last_mem_fulltext_index: usize,
111    bloom_filter_indexer: Option<BloomFilterIndexer>,
112    last_mem_bloom_filter: usize,
113}
114
115impl Indexer {
116    /// Updates the index with the given batch.
117    pub async fn update(&mut self, batch: &mut Batch) {
118        self.do_update(batch).await;
119
120        self.flush_mem_metrics();
121    }
122
123    /// Finalizes the index creation.
124    pub async fn finish(&mut self) -> IndexOutput {
125        let output = self.do_finish().await;
126
127        self.flush_mem_metrics();
128        output
129    }
130
131    /// Aborts the index creation.
132    pub async fn abort(&mut self) {
133        self.do_abort().await;
134
135        self.flush_mem_metrics();
136    }
137
138    fn flush_mem_metrics(&mut self) {
139        let inverted_mem = self
140            .inverted_indexer
141            .as_ref()
142            .map_or(0, |creator| creator.memory_usage());
143        INDEX_CREATE_MEMORY_USAGE
144            .with_label_values(&[TYPE_INVERTED_INDEX])
145            .add(inverted_mem as i64 - self.last_mem_inverted_index as i64);
146        self.last_mem_inverted_index = inverted_mem;
147
148        let fulltext_mem = self
149            .fulltext_indexer
150            .as_ref()
151            .map_or(0, |creator| creator.memory_usage());
152        INDEX_CREATE_MEMORY_USAGE
153            .with_label_values(&[TYPE_FULLTEXT_INDEX])
154            .add(fulltext_mem as i64 - self.last_mem_fulltext_index as i64);
155        self.last_mem_fulltext_index = fulltext_mem;
156
157        let bloom_filter_mem = self
158            .bloom_filter_indexer
159            .as_ref()
160            .map_or(0, |creator| creator.memory_usage());
161        INDEX_CREATE_MEMORY_USAGE
162            .with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
163            .add(bloom_filter_mem as i64 - self.last_mem_bloom_filter as i64);
164        self.last_mem_bloom_filter = bloom_filter_mem;
165    }
166}
167
168#[async_trait::async_trait]
169pub trait IndexerBuilder {
170    /// Builds indexer of given file id to [index_file_path].
171    async fn build(&self, file_id: FileId) -> Indexer;
172}
173
174pub(crate) struct IndexerBuilderImpl {
175    pub(crate) op_type: OperationType,
176    pub(crate) metadata: RegionMetadataRef,
177    pub(crate) row_group_size: usize,
178    pub(crate) puffin_manager: SstPuffinManager,
179    pub(crate) intermediate_manager: IntermediateManager,
180    pub(crate) index_options: IndexOptions,
181    pub(crate) inverted_index_config: InvertedIndexConfig,
182    pub(crate) fulltext_index_config: FulltextIndexConfig,
183    pub(crate) bloom_filter_index_config: BloomFilterConfig,
184}
185
186#[async_trait::async_trait]
187impl IndexerBuilder for IndexerBuilderImpl {
188    /// Sanity check for arguments and create a new [Indexer] if arguments are valid.
189    async fn build(&self, file_id: FileId) -> Indexer {
190        let mut indexer = Indexer {
191            file_id,
192            region_id: self.metadata.region_id,
193            ..Default::default()
194        };
195
196        indexer.inverted_indexer = self.build_inverted_indexer(file_id);
197        indexer.fulltext_indexer = self.build_fulltext_indexer(file_id).await;
198        indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(file_id);
199        if indexer.inverted_indexer.is_none()
200            && indexer.fulltext_indexer.is_none()
201            && indexer.bloom_filter_indexer.is_none()
202        {
203            indexer.abort().await;
204            return Indexer::default();
205        }
206
207        indexer.puffin_manager = Some(self.puffin_manager.clone());
208        indexer
209    }
210}
211
212impl IndexerBuilderImpl {
213    fn build_inverted_indexer(&self, file_id: FileId) -> Option<InvertedIndexer> {
214        let create = match self.op_type {
215            OperationType::Flush => self.inverted_index_config.create_on_flush.auto(),
216            OperationType::Compact => self.inverted_index_config.create_on_compaction.auto(),
217        };
218
219        if !create {
220            debug!(
221                "Skip creating inverted index due to config, region_id: {}, file_id: {}",
222                self.metadata.region_id, file_id,
223            );
224            return None;
225        }
226
227        let indexed_column_ids = self.metadata.inverted_indexed_column_ids(
228            self.index_options.inverted_index.ignore_column_ids.iter(),
229        );
230        if indexed_column_ids.is_empty() {
231            debug!(
232                "No columns to be indexed, skip creating inverted index, region_id: {}, file_id: {}",
233                self.metadata.region_id, file_id,
234            );
235            return None;
236        }
237
238        let Some(mut segment_row_count) =
239            NonZeroUsize::new(self.index_options.inverted_index.segment_row_count)
240        else {
241            warn!(
242                "Segment row count is 0, skip creating index, region_id: {}, file_id: {}",
243                self.metadata.region_id, file_id,
244            );
245            return None;
246        };
247
248        let Some(row_group_size) = NonZeroUsize::new(self.row_group_size) else {
249            warn!(
250                "Row group size is 0, skip creating index, region_id: {}, file_id: {}",
251                self.metadata.region_id, file_id,
252            );
253            return None;
254        };
255
256        // if segment row count not aligned with row group size, adjust it to be aligned.
257        if row_group_size.get() % segment_row_count.get() != 0 {
258            segment_row_count = row_group_size;
259        }
260
261        let indexer = InvertedIndexer::new(
262            file_id,
263            &self.metadata,
264            self.intermediate_manager.clone(),
265            self.inverted_index_config.mem_threshold_on_create(),
266            segment_row_count,
267            indexed_column_ids,
268        );
269
270        Some(indexer)
271    }
272
273    async fn build_fulltext_indexer(&self, file_id: FileId) -> Option<FulltextIndexer> {
274        let create = match self.op_type {
275            OperationType::Flush => self.fulltext_index_config.create_on_flush.auto(),
276            OperationType::Compact => self.fulltext_index_config.create_on_compaction.auto(),
277        };
278
279        if !create {
280            debug!(
281                "Skip creating full-text index due to config, region_id: {}, file_id: {}",
282                self.metadata.region_id, file_id,
283            );
284            return None;
285        }
286
287        let mem_limit = self.fulltext_index_config.mem_threshold_on_create();
288        let creator = FulltextIndexer::new(
289            &self.metadata.region_id,
290            &file_id,
291            &self.intermediate_manager,
292            &self.metadata,
293            self.fulltext_index_config.compress,
294            mem_limit,
295        )
296        .await;
297
298        let err = match creator {
299            Ok(creator) => {
300                if creator.is_none() {
301                    debug!(
302                        "Skip creating full-text index due to no columns require indexing, region_id: {}, file_id: {}",
303                        self.metadata.region_id, file_id,
304                    );
305                }
306                return creator;
307            }
308            Err(err) => err,
309        };
310
311        if cfg!(any(test, feature = "test")) {
312            panic!(
313                "Failed to create full-text indexer, region_id: {}, file_id: {}, err: {:?}",
314                self.metadata.region_id, file_id, err
315            );
316        } else {
317            warn!(
318                err; "Failed to create full-text indexer, region_id: {}, file_id: {}",
319                self.metadata.region_id, file_id,
320            );
321        }
322
323        None
324    }
325
326    fn build_bloom_filter_indexer(&self, file_id: FileId) -> Option<BloomFilterIndexer> {
327        let create = match self.op_type {
328            OperationType::Flush => self.bloom_filter_index_config.create_on_flush.auto(),
329            OperationType::Compact => self.bloom_filter_index_config.create_on_compaction.auto(),
330        };
331
332        if !create {
333            debug!(
334                "Skip creating bloom filter due to config, region_id: {}, file_id: {}",
335                self.metadata.region_id, file_id,
336            );
337            return None;
338        }
339
340        let mem_limit = self.bloom_filter_index_config.mem_threshold_on_create();
341        let indexer = BloomFilterIndexer::new(
342            file_id,
343            &self.metadata,
344            self.intermediate_manager.clone(),
345            mem_limit,
346        );
347
348        let err = match indexer {
349            Ok(indexer) => {
350                if indexer.is_none() {
351                    debug!(
352                        "Skip creating bloom filter due to no columns require indexing, region_id: {}, file_id: {}",
353                        self.metadata.region_id, file_id,
354                    );
355                }
356                return indexer;
357            }
358            Err(err) => err,
359        };
360
361        if cfg!(any(test, feature = "test")) {
362            panic!(
363                "Failed to create bloom filter, region_id: {}, file_id: {}, err: {:?}",
364                self.metadata.region_id, file_id, err
365            );
366        } else {
367            warn!(
368                err; "Failed to create bloom filter, region_id: {}, file_id: {}",
369                self.metadata.region_id, file_id,
370            );
371        }
372
373        None
374    }
375}
376
377#[cfg(test)]
378mod tests {
379    use std::sync::Arc;
380
381    use api::v1::SemanticType;
382    use datatypes::data_type::ConcreteDataType;
383    use datatypes::schema::{
384        ColumnSchema, FulltextOptions, SkippingIndexOptions, SkippingIndexType,
385    };
386    use object_store::services::Memory;
387    use object_store::ObjectStore;
388    use puffin_manager::PuffinManagerFactory;
389    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
390
391    use super::*;
392    use crate::access_layer::FilePathProvider;
393    use crate::config::{FulltextIndexConfig, Mode};
394
395    struct MetaConfig {
396        with_inverted: bool,
397        with_fulltext: bool,
398        with_skipping_bloom: bool,
399    }
400
401    fn mock_region_metadata(
402        MetaConfig {
403            with_inverted,
404            with_fulltext,
405            with_skipping_bloom,
406        }: MetaConfig,
407    ) -> RegionMetadataRef {
408        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
409        let mut column_schema = ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false);
410        if with_inverted {
411            column_schema = column_schema.with_inverted_index(true);
412        }
413        builder
414            .push_column_metadata(ColumnMetadata {
415                column_schema,
416                semantic_type: SemanticType::Field,
417                column_id: 1,
418            })
419            .push_column_metadata(ColumnMetadata {
420                column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
421                semantic_type: SemanticType::Field,
422                column_id: 2,
423            })
424            .push_column_metadata(ColumnMetadata {
425                column_schema: ColumnSchema::new(
426                    "c",
427                    ConcreteDataType::timestamp_millisecond_datatype(),
428                    false,
429                ),
430                semantic_type: SemanticType::Timestamp,
431                column_id: 3,
432            });
433
434        if with_fulltext {
435            let column_schema =
436                ColumnSchema::new("text", ConcreteDataType::string_datatype(), true)
437                    .with_fulltext_options(FulltextOptions {
438                        enable: true,
439                        ..Default::default()
440                    })
441                    .unwrap();
442
443            let column = ColumnMetadata {
444                column_schema,
445                semantic_type: SemanticType::Field,
446                column_id: 4,
447            };
448
449            builder.push_column_metadata(column);
450        }
451
452        if with_skipping_bloom {
453            let column_schema =
454                ColumnSchema::new("bloom", ConcreteDataType::string_datatype(), false)
455                    .with_skipping_options(SkippingIndexOptions::new_unchecked(
456                        42,
457                        0.01,
458                        SkippingIndexType::BloomFilter,
459                    ))
460                    .unwrap();
461
462            let column = ColumnMetadata {
463                column_schema,
464                semantic_type: SemanticType::Field,
465                column_id: 5,
466            };
467
468            builder.push_column_metadata(column);
469        }
470
471        Arc::new(builder.build().unwrap())
472    }
473
474    fn mock_object_store() -> ObjectStore {
475        ObjectStore::new(Memory::default()).unwrap().finish()
476    }
477
478    async fn mock_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
479        IntermediateManager::init_fs(path).await.unwrap()
480    }
481
482    struct NoopPathProvider;
483
484    impl FilePathProvider for NoopPathProvider {
485        fn build_index_file_path(&self, _file_id: FileId) -> String {
486            unreachable!()
487        }
488
489        fn build_sst_file_path(&self, _file_id: FileId) -> String {
490            unreachable!()
491        }
492    }
493
494    #[tokio::test]
495    async fn test_build_indexer_basic() {
496        let (dir, factory) =
497            PuffinManagerFactory::new_for_test_async("test_build_indexer_basic_").await;
498        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
499
500        let metadata = mock_region_metadata(MetaConfig {
501            with_inverted: true,
502            with_fulltext: true,
503            with_skipping_bloom: true,
504        });
505        let indexer = IndexerBuilderImpl {
506            op_type: OperationType::Flush,
507            metadata,
508            row_group_size: 1024,
509            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
510            intermediate_manager: intm_manager,
511            index_options: IndexOptions::default(),
512            inverted_index_config: InvertedIndexConfig::default(),
513            fulltext_index_config: FulltextIndexConfig::default(),
514            bloom_filter_index_config: BloomFilterConfig::default(),
515        }
516        .build(FileId::random())
517        .await;
518
519        assert!(indexer.inverted_indexer.is_some());
520        assert!(indexer.fulltext_indexer.is_some());
521        assert!(indexer.bloom_filter_indexer.is_some());
522    }
523
524    #[tokio::test]
525    async fn test_build_indexer_disable_create() {
526        let (dir, factory) =
527            PuffinManagerFactory::new_for_test_async("test_build_indexer_disable_create_").await;
528        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
529
530        let metadata = mock_region_metadata(MetaConfig {
531            with_inverted: true,
532            with_fulltext: true,
533            with_skipping_bloom: true,
534        });
535        let indexer = IndexerBuilderImpl {
536            op_type: OperationType::Flush,
537            metadata: metadata.clone(),
538            row_group_size: 1024,
539            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
540            intermediate_manager: intm_manager.clone(),
541            index_options: IndexOptions::default(),
542            inverted_index_config: InvertedIndexConfig {
543                create_on_flush: Mode::Disable,
544                ..Default::default()
545            },
546            fulltext_index_config: FulltextIndexConfig::default(),
547            bloom_filter_index_config: BloomFilterConfig::default(),
548        }
549        .build(FileId::random())
550        .await;
551
552        assert!(indexer.inverted_indexer.is_none());
553        assert!(indexer.fulltext_indexer.is_some());
554        assert!(indexer.bloom_filter_indexer.is_some());
555
556        let indexer = IndexerBuilderImpl {
557            op_type: OperationType::Compact,
558            metadata: metadata.clone(),
559            row_group_size: 1024,
560            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
561            intermediate_manager: intm_manager.clone(),
562            index_options: IndexOptions::default(),
563            inverted_index_config: InvertedIndexConfig::default(),
564            fulltext_index_config: FulltextIndexConfig {
565                create_on_compaction: Mode::Disable,
566                ..Default::default()
567            },
568            bloom_filter_index_config: BloomFilterConfig::default(),
569        }
570        .build(FileId::random())
571        .await;
572
573        assert!(indexer.inverted_indexer.is_some());
574        assert!(indexer.fulltext_indexer.is_none());
575        assert!(indexer.bloom_filter_indexer.is_some());
576
577        let indexer = IndexerBuilderImpl {
578            op_type: OperationType::Compact,
579            metadata,
580            row_group_size: 1024,
581            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
582            intermediate_manager: intm_manager,
583            index_options: IndexOptions::default(),
584            inverted_index_config: InvertedIndexConfig::default(),
585            fulltext_index_config: FulltextIndexConfig::default(),
586            bloom_filter_index_config: BloomFilterConfig {
587                create_on_compaction: Mode::Disable,
588                ..Default::default()
589            },
590        }
591        .build(FileId::random())
592        .await;
593
594        assert!(indexer.inverted_indexer.is_some());
595        assert!(indexer.fulltext_indexer.is_some());
596        assert!(indexer.bloom_filter_indexer.is_none());
597    }
598
599    #[tokio::test]
600    async fn test_build_indexer_no_required() {
601        let (dir, factory) =
602            PuffinManagerFactory::new_for_test_async("test_build_indexer_no_required_").await;
603        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
604
605        let metadata = mock_region_metadata(MetaConfig {
606            with_inverted: false,
607            with_fulltext: true,
608            with_skipping_bloom: true,
609        });
610        let indexer = IndexerBuilderImpl {
611            op_type: OperationType::Flush,
612            metadata: metadata.clone(),
613            row_group_size: 1024,
614            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
615            intermediate_manager: intm_manager.clone(),
616            index_options: IndexOptions::default(),
617            inverted_index_config: InvertedIndexConfig::default(),
618            fulltext_index_config: FulltextIndexConfig::default(),
619            bloom_filter_index_config: BloomFilterConfig::default(),
620        }
621        .build(FileId::random())
622        .await;
623
624        assert!(indexer.inverted_indexer.is_none());
625        assert!(indexer.fulltext_indexer.is_some());
626        assert!(indexer.bloom_filter_indexer.is_some());
627
628        let metadata = mock_region_metadata(MetaConfig {
629            with_inverted: true,
630            with_fulltext: false,
631            with_skipping_bloom: true,
632        });
633        let indexer = IndexerBuilderImpl {
634            op_type: OperationType::Flush,
635            metadata: metadata.clone(),
636            row_group_size: 1024,
637            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
638            intermediate_manager: intm_manager.clone(),
639            index_options: IndexOptions::default(),
640            inverted_index_config: InvertedIndexConfig::default(),
641            fulltext_index_config: FulltextIndexConfig::default(),
642            bloom_filter_index_config: BloomFilterConfig::default(),
643        }
644        .build(FileId::random())
645        .await;
646
647        assert!(indexer.inverted_indexer.is_some());
648        assert!(indexer.fulltext_indexer.is_none());
649        assert!(indexer.bloom_filter_indexer.is_some());
650
651        let metadata = mock_region_metadata(MetaConfig {
652            with_inverted: true,
653            with_fulltext: true,
654            with_skipping_bloom: false,
655        });
656        let indexer = IndexerBuilderImpl {
657            op_type: OperationType::Flush,
658            metadata: metadata.clone(),
659            row_group_size: 1024,
660            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
661            intermediate_manager: intm_manager,
662            index_options: IndexOptions::default(),
663            inverted_index_config: InvertedIndexConfig::default(),
664            fulltext_index_config: FulltextIndexConfig::default(),
665            bloom_filter_index_config: BloomFilterConfig::default(),
666        }
667        .build(FileId::random())
668        .await;
669
670        assert!(indexer.inverted_indexer.is_some());
671        assert!(indexer.fulltext_indexer.is_some());
672        assert!(indexer.bloom_filter_indexer.is_none());
673    }
674
675    #[tokio::test]
676    async fn test_build_indexer_zero_row_group() {
677        let (dir, factory) =
678            PuffinManagerFactory::new_for_test_async("test_build_indexer_zero_row_group_").await;
679        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
680
681        let metadata = mock_region_metadata(MetaConfig {
682            with_inverted: true,
683            with_fulltext: true,
684            with_skipping_bloom: true,
685        });
686        let indexer = IndexerBuilderImpl {
687            op_type: OperationType::Flush,
688            metadata,
689            row_group_size: 0,
690            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
691            intermediate_manager: intm_manager,
692            index_options: IndexOptions::default(),
693            inverted_index_config: InvertedIndexConfig::default(),
694            fulltext_index_config: FulltextIndexConfig::default(),
695            bloom_filter_index_config: BloomFilterConfig::default(),
696        }
697        .build(FileId::random())
698        .await;
699
700        assert!(indexer.inverted_indexer.is_none());
701    }
702}