1pub(crate) mod bloom_filter;
16mod codec;
17pub(crate) mod fulltext_index;
18mod indexer;
19pub mod intermediate;
20pub(crate) mod inverted_index;
21pub mod puffin_manager;
22mod statistics;
23pub(crate) mod store;
24
25use std::num::NonZeroUsize;
26
27use bloom_filter::creator::BloomFilterIndexer;
28use common_telemetry::{debug, warn};
29use puffin_manager::SstPuffinManager;
30use smallvec::SmallVec;
31use statistics::{ByteCount, RowCount};
32use store_api::metadata::RegionMetadataRef;
33use store_api::storage::{ColumnId, RegionId};
34
35use crate::access_layer::OperationType;
36use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
37use crate::metrics::INDEX_CREATE_MEMORY_USAGE;
38use crate::read::Batch;
39use crate::region::options::IndexOptions;
40use crate::sst::file::{FileId, IndexType};
41use crate::sst::index::fulltext_index::creator::FulltextIndexer;
42use crate::sst::index::intermediate::IntermediateManager;
43use crate::sst::index::inverted_index::creator::InvertedIndexer;
44
45pub(crate) const TYPE_INVERTED_INDEX: &str = "inverted_index";
46pub(crate) const TYPE_FULLTEXT_INDEX: &str = "fulltext_index";
47pub(crate) const TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
48
49const DEFAULT_FULLTEXT_BLOOM_ROW_GRANULARITY: usize = 8096;
50
51#[derive(Debug, Clone, Default)]
53pub struct IndexOutput {
54 pub file_size: u64,
56 pub inverted_index: InvertedIndexOutput,
58 pub fulltext_index: FulltextIndexOutput,
60 pub bloom_filter: BloomFilterOutput,
62}
63
64impl IndexOutput {
65 pub fn build_available_indexes(&self) -> SmallVec<[IndexType; 4]> {
66 let mut indexes = SmallVec::new();
67 if self.inverted_index.is_available() {
68 indexes.push(IndexType::InvertedIndex);
69 }
70 if self.fulltext_index.is_available() {
71 indexes.push(IndexType::FulltextIndex);
72 }
73 if self.bloom_filter.is_available() {
74 indexes.push(IndexType::BloomFilterIndex);
75 }
76 indexes
77 }
78}
79
80#[derive(Debug, Clone, Default)]
82pub struct IndexBaseOutput {
83 pub index_size: ByteCount,
85 pub row_count: RowCount,
87 pub columns: Vec<ColumnId>,
89}
90
91impl IndexBaseOutput {
92 pub fn is_available(&self) -> bool {
93 self.index_size > 0
94 }
95}
96
97pub type InvertedIndexOutput = IndexBaseOutput;
99pub type FulltextIndexOutput = IndexBaseOutput;
101pub type BloomFilterOutput = IndexBaseOutput;
103
104#[derive(Default)]
106pub struct Indexer {
107 file_id: FileId,
108 region_id: RegionId,
109 puffin_manager: Option<SstPuffinManager>,
110 inverted_indexer: Option<InvertedIndexer>,
111 last_mem_inverted_index: usize,
112 fulltext_indexer: Option<FulltextIndexer>,
113 last_mem_fulltext_index: usize,
114 bloom_filter_indexer: Option<BloomFilterIndexer>,
115 last_mem_bloom_filter: usize,
116}
117
118impl Indexer {
119 pub async fn update(&mut self, batch: &mut Batch) {
121 self.do_update(batch).await;
122
123 self.flush_mem_metrics();
124 }
125
126 pub async fn finish(&mut self) -> IndexOutput {
128 let output = self.do_finish().await;
129
130 self.flush_mem_metrics();
131 output
132 }
133
134 pub async fn abort(&mut self) {
136 self.do_abort().await;
137
138 self.flush_mem_metrics();
139 }
140
141 fn flush_mem_metrics(&mut self) {
142 let inverted_mem = self
143 .inverted_indexer
144 .as_ref()
145 .map_or(0, |creator| creator.memory_usage());
146 INDEX_CREATE_MEMORY_USAGE
147 .with_label_values(&[TYPE_INVERTED_INDEX])
148 .add(inverted_mem as i64 - self.last_mem_inverted_index as i64);
149 self.last_mem_inverted_index = inverted_mem;
150
151 let fulltext_mem = self
152 .fulltext_indexer
153 .as_ref()
154 .map_or(0, |creator| creator.memory_usage());
155 INDEX_CREATE_MEMORY_USAGE
156 .with_label_values(&[TYPE_FULLTEXT_INDEX])
157 .add(fulltext_mem as i64 - self.last_mem_fulltext_index as i64);
158 self.last_mem_fulltext_index = fulltext_mem;
159
160 let bloom_filter_mem = self
161 .bloom_filter_indexer
162 .as_ref()
163 .map_or(0, |creator| creator.memory_usage());
164 INDEX_CREATE_MEMORY_USAGE
165 .with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
166 .add(bloom_filter_mem as i64 - self.last_mem_bloom_filter as i64);
167 self.last_mem_bloom_filter = bloom_filter_mem;
168 }
169}
170
171#[async_trait::async_trait]
172pub trait IndexerBuilder {
173 async fn build(&self, file_id: FileId) -> Indexer;
175}
176
177pub(crate) struct IndexerBuilderImpl {
178 pub(crate) op_type: OperationType,
179 pub(crate) metadata: RegionMetadataRef,
180 pub(crate) row_group_size: usize,
181 pub(crate) puffin_manager: SstPuffinManager,
182 pub(crate) intermediate_manager: IntermediateManager,
183 pub(crate) index_options: IndexOptions,
184 pub(crate) inverted_index_config: InvertedIndexConfig,
185 pub(crate) fulltext_index_config: FulltextIndexConfig,
186 pub(crate) bloom_filter_index_config: BloomFilterConfig,
187}
188
189#[async_trait::async_trait]
190impl IndexerBuilder for IndexerBuilderImpl {
191 async fn build(&self, file_id: FileId) -> Indexer {
193 let mut indexer = Indexer {
194 file_id,
195 region_id: self.metadata.region_id,
196 ..Default::default()
197 };
198
199 indexer.inverted_indexer = self.build_inverted_indexer(file_id);
200 indexer.fulltext_indexer = self.build_fulltext_indexer(file_id).await;
201 indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(file_id);
202 if indexer.inverted_indexer.is_none()
203 && indexer.fulltext_indexer.is_none()
204 && indexer.bloom_filter_indexer.is_none()
205 {
206 indexer.abort().await;
207 return Indexer::default();
208 }
209
210 indexer.puffin_manager = Some(self.puffin_manager.clone());
211 indexer
212 }
213}
214
215impl IndexerBuilderImpl {
216 fn build_inverted_indexer(&self, file_id: FileId) -> Option<InvertedIndexer> {
217 let create = match self.op_type {
218 OperationType::Flush => self.inverted_index_config.create_on_flush.auto(),
219 OperationType::Compact => self.inverted_index_config.create_on_compaction.auto(),
220 };
221
222 if !create {
223 debug!(
224 "Skip creating inverted index due to config, region_id: {}, file_id: {}",
225 self.metadata.region_id, file_id,
226 );
227 return None;
228 }
229
230 let indexed_column_ids = self.metadata.inverted_indexed_column_ids(
231 self.index_options.inverted_index.ignore_column_ids.iter(),
232 );
233 if indexed_column_ids.is_empty() {
234 debug!(
235 "No columns to be indexed, skip creating inverted index, region_id: {}, file_id: {}",
236 self.metadata.region_id, file_id,
237 );
238 return None;
239 }
240
241 let Some(mut segment_row_count) =
242 NonZeroUsize::new(self.index_options.inverted_index.segment_row_count)
243 else {
244 warn!(
245 "Segment row count is 0, skip creating index, region_id: {}, file_id: {}",
246 self.metadata.region_id, file_id,
247 );
248 return None;
249 };
250
251 let Some(row_group_size) = NonZeroUsize::new(self.row_group_size) else {
252 warn!(
253 "Row group size is 0, skip creating index, region_id: {}, file_id: {}",
254 self.metadata.region_id, file_id,
255 );
256 return None;
257 };
258
259 if row_group_size.get() % segment_row_count.get() != 0 {
261 segment_row_count = row_group_size;
262 }
263
264 let indexer = InvertedIndexer::new(
265 file_id,
266 &self.metadata,
267 self.intermediate_manager.clone(),
268 self.inverted_index_config.mem_threshold_on_create(),
269 segment_row_count,
270 indexed_column_ids,
271 );
272
273 Some(indexer)
274 }
275
276 async fn build_fulltext_indexer(&self, file_id: FileId) -> Option<FulltextIndexer> {
277 let create = match self.op_type {
278 OperationType::Flush => self.fulltext_index_config.create_on_flush.auto(),
279 OperationType::Compact => self.fulltext_index_config.create_on_compaction.auto(),
280 };
281
282 if !create {
283 debug!(
284 "Skip creating full-text index due to config, region_id: {}, file_id: {}",
285 self.metadata.region_id, file_id,
286 );
287 return None;
288 }
289
290 let mem_limit = self.fulltext_index_config.mem_threshold_on_create();
291 let creator = FulltextIndexer::new(
292 &self.metadata.region_id,
293 &file_id,
294 &self.intermediate_manager,
295 &self.metadata,
296 self.fulltext_index_config.compress,
297 DEFAULT_FULLTEXT_BLOOM_ROW_GRANULARITY,
298 mem_limit,
299 )
300 .await;
301
302 let err = match creator {
303 Ok(creator) => {
304 if creator.is_none() {
305 debug!(
306 "Skip creating full-text index due to no columns require indexing, region_id: {}, file_id: {}",
307 self.metadata.region_id, file_id,
308 );
309 }
310 return creator;
311 }
312 Err(err) => err,
313 };
314
315 if cfg!(any(test, feature = "test")) {
316 panic!(
317 "Failed to create full-text indexer, region_id: {}, file_id: {}, err: {:?}",
318 self.metadata.region_id, file_id, err
319 );
320 } else {
321 warn!(
322 err; "Failed to create full-text indexer, region_id: {}, file_id: {}",
323 self.metadata.region_id, file_id,
324 );
325 }
326
327 None
328 }
329
330 fn build_bloom_filter_indexer(&self, file_id: FileId) -> Option<BloomFilterIndexer> {
331 let create = match self.op_type {
332 OperationType::Flush => self.bloom_filter_index_config.create_on_flush.auto(),
333 OperationType::Compact => self.bloom_filter_index_config.create_on_compaction.auto(),
334 };
335
336 if !create {
337 debug!(
338 "Skip creating bloom filter due to config, region_id: {}, file_id: {}",
339 self.metadata.region_id, file_id,
340 );
341 return None;
342 }
343
344 let mem_limit = self.bloom_filter_index_config.mem_threshold_on_create();
345 let indexer = BloomFilterIndexer::new(
346 file_id,
347 &self.metadata,
348 self.intermediate_manager.clone(),
349 mem_limit,
350 );
351
352 let err = match indexer {
353 Ok(indexer) => {
354 if indexer.is_none() {
355 debug!(
356 "Skip creating bloom filter due to no columns require indexing, region_id: {}, file_id: {}",
357 self.metadata.region_id, file_id,
358 );
359 }
360 return indexer;
361 }
362 Err(err) => err,
363 };
364
365 if cfg!(any(test, feature = "test")) {
366 panic!(
367 "Failed to create bloom filter, region_id: {}, file_id: {}, err: {:?}",
368 self.metadata.region_id, file_id, err
369 );
370 } else {
371 warn!(
372 err; "Failed to create bloom filter, region_id: {}, file_id: {}",
373 self.metadata.region_id, file_id,
374 );
375 }
376
377 None
378 }
379}
380
381#[cfg(test)]
382mod tests {
383 use std::sync::Arc;
384
385 use api::v1::SemanticType;
386 use datatypes::data_type::ConcreteDataType;
387 use datatypes::schema::{
388 ColumnSchema, FulltextOptions, SkippingIndexOptions, SkippingIndexType,
389 };
390 use object_store::services::Memory;
391 use object_store::ObjectStore;
392 use puffin_manager::PuffinManagerFactory;
393 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
394
395 use super::*;
396 use crate::access_layer::FilePathProvider;
397 use crate::config::{FulltextIndexConfig, Mode};
398
399 struct MetaConfig {
400 with_inverted: bool,
401 with_fulltext: bool,
402 with_skipping_bloom: bool,
403 }
404
405 fn mock_region_metadata(
406 MetaConfig {
407 with_inverted,
408 with_fulltext,
409 with_skipping_bloom,
410 }: MetaConfig,
411 ) -> RegionMetadataRef {
412 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
413 let mut column_schema = ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false);
414 if with_inverted {
415 column_schema = column_schema.with_inverted_index(true);
416 }
417 builder
418 .push_column_metadata(ColumnMetadata {
419 column_schema,
420 semantic_type: SemanticType::Field,
421 column_id: 1,
422 })
423 .push_column_metadata(ColumnMetadata {
424 column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
425 semantic_type: SemanticType::Field,
426 column_id: 2,
427 })
428 .push_column_metadata(ColumnMetadata {
429 column_schema: ColumnSchema::new(
430 "c",
431 ConcreteDataType::timestamp_millisecond_datatype(),
432 false,
433 ),
434 semantic_type: SemanticType::Timestamp,
435 column_id: 3,
436 });
437
438 if with_fulltext {
439 let column_schema =
440 ColumnSchema::new("text", ConcreteDataType::string_datatype(), true)
441 .with_fulltext_options(FulltextOptions {
442 enable: true,
443 ..Default::default()
444 })
445 .unwrap();
446
447 let column = ColumnMetadata {
448 column_schema,
449 semantic_type: SemanticType::Field,
450 column_id: 4,
451 };
452
453 builder.push_column_metadata(column);
454 }
455
456 if with_skipping_bloom {
457 let column_schema =
458 ColumnSchema::new("bloom", ConcreteDataType::string_datatype(), false)
459 .with_skipping_options(SkippingIndexOptions {
460 granularity: 42,
461 index_type: SkippingIndexType::BloomFilter,
462 })
463 .unwrap();
464
465 let column = ColumnMetadata {
466 column_schema,
467 semantic_type: SemanticType::Field,
468 column_id: 5,
469 };
470
471 builder.push_column_metadata(column);
472 }
473
474 Arc::new(builder.build().unwrap())
475 }
476
477 fn mock_object_store() -> ObjectStore {
478 ObjectStore::new(Memory::default()).unwrap().finish()
479 }
480
481 async fn mock_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
482 IntermediateManager::init_fs(path).await.unwrap()
483 }
484
485 struct NoopPathProvider;
486
487 impl FilePathProvider for NoopPathProvider {
488 fn build_index_file_path(&self, _file_id: FileId) -> String {
489 unreachable!()
490 }
491
492 fn build_sst_file_path(&self, _file_id: FileId) -> String {
493 unreachable!()
494 }
495 }
496
497 #[tokio::test]
498 async fn test_build_indexer_basic() {
499 let (dir, factory) =
500 PuffinManagerFactory::new_for_test_async("test_build_indexer_basic_").await;
501 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
502
503 let metadata = mock_region_metadata(MetaConfig {
504 with_inverted: true,
505 with_fulltext: true,
506 with_skipping_bloom: true,
507 });
508 let indexer = IndexerBuilderImpl {
509 op_type: OperationType::Flush,
510 metadata,
511 row_group_size: 1024,
512 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
513 intermediate_manager: intm_manager,
514 index_options: IndexOptions::default(),
515 inverted_index_config: InvertedIndexConfig::default(),
516 fulltext_index_config: FulltextIndexConfig::default(),
517 bloom_filter_index_config: BloomFilterConfig::default(),
518 }
519 .build(FileId::random())
520 .await;
521
522 assert!(indexer.inverted_indexer.is_some());
523 assert!(indexer.fulltext_indexer.is_some());
524 assert!(indexer.bloom_filter_indexer.is_some());
525 }
526
527 #[tokio::test]
528 async fn test_build_indexer_disable_create() {
529 let (dir, factory) =
530 PuffinManagerFactory::new_for_test_async("test_build_indexer_disable_create_").await;
531 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
532
533 let metadata = mock_region_metadata(MetaConfig {
534 with_inverted: true,
535 with_fulltext: true,
536 with_skipping_bloom: true,
537 });
538 let indexer = IndexerBuilderImpl {
539 op_type: OperationType::Flush,
540 metadata: metadata.clone(),
541 row_group_size: 1024,
542 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
543 intermediate_manager: intm_manager.clone(),
544 index_options: IndexOptions::default(),
545 inverted_index_config: InvertedIndexConfig {
546 create_on_flush: Mode::Disable,
547 ..Default::default()
548 },
549 fulltext_index_config: FulltextIndexConfig::default(),
550 bloom_filter_index_config: BloomFilterConfig::default(),
551 }
552 .build(FileId::random())
553 .await;
554
555 assert!(indexer.inverted_indexer.is_none());
556 assert!(indexer.fulltext_indexer.is_some());
557 assert!(indexer.bloom_filter_indexer.is_some());
558
559 let indexer = IndexerBuilderImpl {
560 op_type: OperationType::Compact,
561 metadata: metadata.clone(),
562 row_group_size: 1024,
563 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
564 intermediate_manager: intm_manager.clone(),
565 index_options: IndexOptions::default(),
566 inverted_index_config: InvertedIndexConfig::default(),
567 fulltext_index_config: FulltextIndexConfig {
568 create_on_compaction: Mode::Disable,
569 ..Default::default()
570 },
571 bloom_filter_index_config: BloomFilterConfig::default(),
572 }
573 .build(FileId::random())
574 .await;
575
576 assert!(indexer.inverted_indexer.is_some());
577 assert!(indexer.fulltext_indexer.is_none());
578 assert!(indexer.bloom_filter_indexer.is_some());
579
580 let indexer = IndexerBuilderImpl {
581 op_type: OperationType::Compact,
582 metadata,
583 row_group_size: 1024,
584 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
585 intermediate_manager: intm_manager,
586 index_options: IndexOptions::default(),
587 inverted_index_config: InvertedIndexConfig::default(),
588 fulltext_index_config: FulltextIndexConfig::default(),
589 bloom_filter_index_config: BloomFilterConfig {
590 create_on_compaction: Mode::Disable,
591 ..Default::default()
592 },
593 }
594 .build(FileId::random())
595 .await;
596
597 assert!(indexer.inverted_indexer.is_some());
598 assert!(indexer.fulltext_indexer.is_some());
599 assert!(indexer.bloom_filter_indexer.is_none());
600 }
601
602 #[tokio::test]
603 async fn test_build_indexer_no_required() {
604 let (dir, factory) =
605 PuffinManagerFactory::new_for_test_async("test_build_indexer_no_required_").await;
606 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
607
608 let metadata = mock_region_metadata(MetaConfig {
609 with_inverted: false,
610 with_fulltext: true,
611 with_skipping_bloom: true,
612 });
613 let indexer = IndexerBuilderImpl {
614 op_type: OperationType::Flush,
615 metadata: metadata.clone(),
616 row_group_size: 1024,
617 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
618 intermediate_manager: intm_manager.clone(),
619 index_options: IndexOptions::default(),
620 inverted_index_config: InvertedIndexConfig::default(),
621 fulltext_index_config: FulltextIndexConfig::default(),
622 bloom_filter_index_config: BloomFilterConfig::default(),
623 }
624 .build(FileId::random())
625 .await;
626
627 assert!(indexer.inverted_indexer.is_none());
628 assert!(indexer.fulltext_indexer.is_some());
629 assert!(indexer.bloom_filter_indexer.is_some());
630
631 let metadata = mock_region_metadata(MetaConfig {
632 with_inverted: true,
633 with_fulltext: false,
634 with_skipping_bloom: true,
635 });
636 let indexer = IndexerBuilderImpl {
637 op_type: OperationType::Flush,
638 metadata: metadata.clone(),
639 row_group_size: 1024,
640 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
641 intermediate_manager: intm_manager.clone(),
642 index_options: IndexOptions::default(),
643 inverted_index_config: InvertedIndexConfig::default(),
644 fulltext_index_config: FulltextIndexConfig::default(),
645 bloom_filter_index_config: BloomFilterConfig::default(),
646 }
647 .build(FileId::random())
648 .await;
649
650 assert!(indexer.inverted_indexer.is_some());
651 assert!(indexer.fulltext_indexer.is_none());
652 assert!(indexer.bloom_filter_indexer.is_some());
653
654 let metadata = mock_region_metadata(MetaConfig {
655 with_inverted: true,
656 with_fulltext: true,
657 with_skipping_bloom: false,
658 });
659 let indexer = IndexerBuilderImpl {
660 op_type: OperationType::Flush,
661 metadata: metadata.clone(),
662 row_group_size: 1024,
663 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
664 intermediate_manager: intm_manager,
665 index_options: IndexOptions::default(),
666 inverted_index_config: InvertedIndexConfig::default(),
667 fulltext_index_config: FulltextIndexConfig::default(),
668 bloom_filter_index_config: BloomFilterConfig::default(),
669 }
670 .build(FileId::random())
671 .await;
672
673 assert!(indexer.inverted_indexer.is_some());
674 assert!(indexer.fulltext_indexer.is_some());
675 assert!(indexer.bloom_filter_indexer.is_none());
676 }
677
678 #[tokio::test]
679 async fn test_build_indexer_zero_row_group() {
680 let (dir, factory) =
681 PuffinManagerFactory::new_for_test_async("test_build_indexer_zero_row_group_").await;
682 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
683
684 let metadata = mock_region_metadata(MetaConfig {
685 with_inverted: true,
686 with_fulltext: true,
687 with_skipping_bloom: true,
688 });
689 let indexer = IndexerBuilderImpl {
690 op_type: OperationType::Flush,
691 metadata,
692 row_group_size: 0,
693 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
694 intermediate_manager: intm_manager,
695 index_options: IndexOptions::default(),
696 inverted_index_config: InvertedIndexConfig::default(),
697 fulltext_index_config: FulltextIndexConfig::default(),
698 bloom_filter_index_config: BloomFilterConfig::default(),
699 }
700 .build(FileId::random())
701 .await;
702
703 assert!(indexer.inverted_indexer.is_none());
704 }
705}