1pub(crate) mod bloom_filter;
16pub(crate) mod fulltext_index;
17mod indexer;
18pub mod intermediate;
19pub(crate) mod inverted_index;
20pub mod puffin_manager;
21mod statistics;
22pub(crate) mod store;
23
24use std::num::NonZeroUsize;
25
26use bloom_filter::creator::BloomFilterIndexer;
27use common_telemetry::{debug, warn};
28use puffin_manager::SstPuffinManager;
29use smallvec::SmallVec;
30use statistics::{ByteCount, RowCount};
31use store_api::metadata::RegionMetadataRef;
32use store_api::storage::{ColumnId, RegionId};
33
34use crate::access_layer::OperationType;
35use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
36use crate::metrics::INDEX_CREATE_MEMORY_USAGE;
37use crate::read::Batch;
38use crate::region::options::IndexOptions;
39use crate::sst::file::{FileId, IndexType};
40use crate::sst::index::fulltext_index::creator::FulltextIndexer;
41use crate::sst::index::intermediate::IntermediateManager;
42use crate::sst::index::inverted_index::creator::InvertedIndexer;
43
44pub(crate) const TYPE_INVERTED_INDEX: &str = "inverted_index";
45pub(crate) const TYPE_FULLTEXT_INDEX: &str = "fulltext_index";
46pub(crate) const TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
47
48const DEFAULT_FULLTEXT_BLOOM_ROW_GRANULARITY: usize = 8096;
49
50#[derive(Debug, Clone, Default)]
52pub struct IndexOutput {
53 pub file_size: u64,
55 pub inverted_index: InvertedIndexOutput,
57 pub fulltext_index: FulltextIndexOutput,
59 pub bloom_filter: BloomFilterOutput,
61}
62
63impl IndexOutput {
64 pub fn build_available_indexes(&self) -> SmallVec<[IndexType; 4]> {
65 let mut indexes = SmallVec::new();
66 if self.inverted_index.is_available() {
67 indexes.push(IndexType::InvertedIndex);
68 }
69 if self.fulltext_index.is_available() {
70 indexes.push(IndexType::FulltextIndex);
71 }
72 if self.bloom_filter.is_available() {
73 indexes.push(IndexType::BloomFilterIndex);
74 }
75 indexes
76 }
77}
78
79#[derive(Debug, Clone, Default)]
81pub struct IndexBaseOutput {
82 pub index_size: ByteCount,
84 pub row_count: RowCount,
86 pub columns: Vec<ColumnId>,
88}
89
90impl IndexBaseOutput {
91 pub fn is_available(&self) -> bool {
92 self.index_size > 0
93 }
94}
95
96pub type InvertedIndexOutput = IndexBaseOutput;
98pub type FulltextIndexOutput = IndexBaseOutput;
100pub type BloomFilterOutput = IndexBaseOutput;
102
103#[derive(Default)]
105pub struct Indexer {
106 file_id: FileId,
107 region_id: RegionId,
108 puffin_manager: Option<SstPuffinManager>,
109 inverted_indexer: Option<InvertedIndexer>,
110 last_mem_inverted_index: usize,
111 fulltext_indexer: Option<FulltextIndexer>,
112 last_mem_fulltext_index: usize,
113 bloom_filter_indexer: Option<BloomFilterIndexer>,
114 last_mem_bloom_filter: usize,
115}
116
117impl Indexer {
118 pub async fn update(&mut self, batch: &mut Batch) {
120 self.do_update(batch).await;
121
122 self.flush_mem_metrics();
123 }
124
125 pub async fn finish(&mut self) -> IndexOutput {
127 let output = self.do_finish().await;
128
129 self.flush_mem_metrics();
130 output
131 }
132
133 pub async fn abort(&mut self) {
135 self.do_abort().await;
136
137 self.flush_mem_metrics();
138 }
139
140 fn flush_mem_metrics(&mut self) {
141 let inverted_mem = self
142 .inverted_indexer
143 .as_ref()
144 .map_or(0, |creator| creator.memory_usage());
145 INDEX_CREATE_MEMORY_USAGE
146 .with_label_values(&[TYPE_INVERTED_INDEX])
147 .add(inverted_mem as i64 - self.last_mem_inverted_index as i64);
148 self.last_mem_inverted_index = inverted_mem;
149
150 let fulltext_mem = self
151 .fulltext_indexer
152 .as_ref()
153 .map_or(0, |creator| creator.memory_usage());
154 INDEX_CREATE_MEMORY_USAGE
155 .with_label_values(&[TYPE_FULLTEXT_INDEX])
156 .add(fulltext_mem as i64 - self.last_mem_fulltext_index as i64);
157 self.last_mem_fulltext_index = fulltext_mem;
158
159 let bloom_filter_mem = self
160 .bloom_filter_indexer
161 .as_ref()
162 .map_or(0, |creator| creator.memory_usage());
163 INDEX_CREATE_MEMORY_USAGE
164 .with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
165 .add(bloom_filter_mem as i64 - self.last_mem_bloom_filter as i64);
166 self.last_mem_bloom_filter = bloom_filter_mem;
167 }
168}
169
170#[async_trait::async_trait]
171pub trait IndexerBuilder {
172 async fn build(&self, file_id: FileId) -> Indexer;
174}
175
176pub(crate) struct IndexerBuilderImpl {
177 pub(crate) op_type: OperationType,
178 pub(crate) metadata: RegionMetadataRef,
179 pub(crate) row_group_size: usize,
180 pub(crate) puffin_manager: SstPuffinManager,
181 pub(crate) intermediate_manager: IntermediateManager,
182 pub(crate) index_options: IndexOptions,
183 pub(crate) inverted_index_config: InvertedIndexConfig,
184 pub(crate) fulltext_index_config: FulltextIndexConfig,
185 pub(crate) bloom_filter_index_config: BloomFilterConfig,
186}
187
188#[async_trait::async_trait]
189impl IndexerBuilder for IndexerBuilderImpl {
190 async fn build(&self, file_id: FileId) -> Indexer {
192 let mut indexer = Indexer {
193 file_id,
194 region_id: self.metadata.region_id,
195 ..Default::default()
196 };
197
198 indexer.inverted_indexer = self.build_inverted_indexer(file_id);
199 indexer.fulltext_indexer = self.build_fulltext_indexer(file_id).await;
200 indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(file_id);
201 if indexer.inverted_indexer.is_none()
202 && indexer.fulltext_indexer.is_none()
203 && indexer.bloom_filter_indexer.is_none()
204 {
205 indexer.abort().await;
206 return Indexer::default();
207 }
208
209 indexer.puffin_manager = Some(self.puffin_manager.clone());
210 indexer
211 }
212}
213
214impl IndexerBuilderImpl {
215 fn build_inverted_indexer(&self, file_id: FileId) -> Option<InvertedIndexer> {
216 let create = match self.op_type {
217 OperationType::Flush => self.inverted_index_config.create_on_flush.auto(),
218 OperationType::Compact => self.inverted_index_config.create_on_compaction.auto(),
219 };
220
221 if !create {
222 debug!(
223 "Skip creating inverted index due to config, region_id: {}, file_id: {}",
224 self.metadata.region_id, file_id,
225 );
226 return None;
227 }
228
229 let indexed_column_ids = self.metadata.inverted_indexed_column_ids(
230 self.index_options.inverted_index.ignore_column_ids.iter(),
231 );
232 if indexed_column_ids.is_empty() {
233 debug!(
234 "No columns to be indexed, skip creating inverted index, region_id: {}, file_id: {}",
235 self.metadata.region_id, file_id,
236 );
237 return None;
238 }
239
240 let Some(mut segment_row_count) =
241 NonZeroUsize::new(self.index_options.inverted_index.segment_row_count)
242 else {
243 warn!(
244 "Segment row count is 0, skip creating index, region_id: {}, file_id: {}",
245 self.metadata.region_id, file_id,
246 );
247 return None;
248 };
249
250 let Some(row_group_size) = NonZeroUsize::new(self.row_group_size) else {
251 warn!(
252 "Row group size is 0, skip creating index, region_id: {}, file_id: {}",
253 self.metadata.region_id, file_id,
254 );
255 return None;
256 };
257
258 if row_group_size.get() % segment_row_count.get() != 0 {
260 segment_row_count = row_group_size;
261 }
262
263 let indexer = InvertedIndexer::new(
264 file_id,
265 &self.metadata,
266 self.intermediate_manager.clone(),
267 self.inverted_index_config.mem_threshold_on_create(),
268 segment_row_count,
269 indexed_column_ids,
270 );
271
272 Some(indexer)
273 }
274
275 async fn build_fulltext_indexer(&self, file_id: FileId) -> Option<FulltextIndexer> {
276 let create = match self.op_type {
277 OperationType::Flush => self.fulltext_index_config.create_on_flush.auto(),
278 OperationType::Compact => self.fulltext_index_config.create_on_compaction.auto(),
279 };
280
281 if !create {
282 debug!(
283 "Skip creating full-text index due to config, region_id: {}, file_id: {}",
284 self.metadata.region_id, file_id,
285 );
286 return None;
287 }
288
289 let mem_limit = self.fulltext_index_config.mem_threshold_on_create();
290 let creator = FulltextIndexer::new(
291 &self.metadata.region_id,
292 &file_id,
293 &self.intermediate_manager,
294 &self.metadata,
295 self.fulltext_index_config.compress,
296 DEFAULT_FULLTEXT_BLOOM_ROW_GRANULARITY,
297 mem_limit,
298 )
299 .await;
300
301 let err = match creator {
302 Ok(creator) => {
303 if creator.is_none() {
304 debug!(
305 "Skip creating full-text index due to no columns require indexing, region_id: {}, file_id: {}",
306 self.metadata.region_id, file_id,
307 );
308 }
309 return creator;
310 }
311 Err(err) => err,
312 };
313
314 if cfg!(any(test, feature = "test")) {
315 panic!(
316 "Failed to create full-text indexer, region_id: {}, file_id: {}, err: {:?}",
317 self.metadata.region_id, file_id, err
318 );
319 } else {
320 warn!(
321 err; "Failed to create full-text indexer, region_id: {}, file_id: {}",
322 self.metadata.region_id, file_id,
323 );
324 }
325
326 None
327 }
328
329 fn build_bloom_filter_indexer(&self, file_id: FileId) -> Option<BloomFilterIndexer> {
330 let create = match self.op_type {
331 OperationType::Flush => self.bloom_filter_index_config.create_on_flush.auto(),
332 OperationType::Compact => self.bloom_filter_index_config.create_on_compaction.auto(),
333 };
334
335 if !create {
336 debug!(
337 "Skip creating bloom filter due to config, region_id: {}, file_id: {}",
338 self.metadata.region_id, file_id,
339 );
340 return None;
341 }
342
343 let mem_limit = self.bloom_filter_index_config.mem_threshold_on_create();
344 let indexer = BloomFilterIndexer::new(
345 file_id,
346 &self.metadata,
347 self.intermediate_manager.clone(),
348 mem_limit,
349 );
350
351 let err = match indexer {
352 Ok(indexer) => {
353 if indexer.is_none() {
354 debug!(
355 "Skip creating bloom filter due to no columns require indexing, region_id: {}, file_id: {}",
356 self.metadata.region_id, file_id,
357 );
358 }
359 return indexer;
360 }
361 Err(err) => err,
362 };
363
364 if cfg!(any(test, feature = "test")) {
365 panic!(
366 "Failed to create bloom filter, region_id: {}, file_id: {}, err: {:?}",
367 self.metadata.region_id, file_id, err
368 );
369 } else {
370 warn!(
371 err; "Failed to create bloom filter, region_id: {}, file_id: {}",
372 self.metadata.region_id, file_id,
373 );
374 }
375
376 None
377 }
378}
379
380#[cfg(test)]
381mod tests {
382 use std::sync::Arc;
383
384 use api::v1::SemanticType;
385 use datatypes::data_type::ConcreteDataType;
386 use datatypes::schema::{
387 ColumnSchema, FulltextOptions, SkippingIndexOptions, SkippingIndexType,
388 };
389 use object_store::services::Memory;
390 use object_store::ObjectStore;
391 use puffin_manager::PuffinManagerFactory;
392 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
393
394 use super::*;
395 use crate::access_layer::FilePathProvider;
396 use crate::config::{FulltextIndexConfig, Mode};
397
398 struct MetaConfig {
399 with_inverted: bool,
400 with_fulltext: bool,
401 with_skipping_bloom: bool,
402 }
403
404 fn mock_region_metadata(
405 MetaConfig {
406 with_inverted,
407 with_fulltext,
408 with_skipping_bloom,
409 }: MetaConfig,
410 ) -> RegionMetadataRef {
411 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
412 let mut column_schema = ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false);
413 if with_inverted {
414 column_schema = column_schema.with_inverted_index(true);
415 }
416 builder
417 .push_column_metadata(ColumnMetadata {
418 column_schema,
419 semantic_type: SemanticType::Field,
420 column_id: 1,
421 })
422 .push_column_metadata(ColumnMetadata {
423 column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
424 semantic_type: SemanticType::Field,
425 column_id: 2,
426 })
427 .push_column_metadata(ColumnMetadata {
428 column_schema: ColumnSchema::new(
429 "c",
430 ConcreteDataType::timestamp_millisecond_datatype(),
431 false,
432 ),
433 semantic_type: SemanticType::Timestamp,
434 column_id: 3,
435 });
436
437 if with_fulltext {
438 let column_schema =
439 ColumnSchema::new("text", ConcreteDataType::string_datatype(), true)
440 .with_fulltext_options(FulltextOptions {
441 enable: true,
442 ..Default::default()
443 })
444 .unwrap();
445
446 let column = ColumnMetadata {
447 column_schema,
448 semantic_type: SemanticType::Field,
449 column_id: 4,
450 };
451
452 builder.push_column_metadata(column);
453 }
454
455 if with_skipping_bloom {
456 let column_schema =
457 ColumnSchema::new("bloom", ConcreteDataType::string_datatype(), false)
458 .with_skipping_options(SkippingIndexOptions {
459 granularity: 42,
460 index_type: SkippingIndexType::BloomFilter,
461 })
462 .unwrap();
463
464 let column = ColumnMetadata {
465 column_schema,
466 semantic_type: SemanticType::Field,
467 column_id: 5,
468 };
469
470 builder.push_column_metadata(column);
471 }
472
473 Arc::new(builder.build().unwrap())
474 }
475
476 fn mock_object_store() -> ObjectStore {
477 ObjectStore::new(Memory::default()).unwrap().finish()
478 }
479
480 async fn mock_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
481 IntermediateManager::init_fs(path).await.unwrap()
482 }
483
484 struct NoopPathProvider;
485
486 impl FilePathProvider for NoopPathProvider {
487 fn build_index_file_path(&self, _file_id: FileId) -> String {
488 unreachable!()
489 }
490
491 fn build_sst_file_path(&self, _file_id: FileId) -> String {
492 unreachable!()
493 }
494 }
495
496 #[tokio::test]
497 async fn test_build_indexer_basic() {
498 let (dir, factory) =
499 PuffinManagerFactory::new_for_test_async("test_build_indexer_basic_").await;
500 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
501
502 let metadata = mock_region_metadata(MetaConfig {
503 with_inverted: true,
504 with_fulltext: true,
505 with_skipping_bloom: true,
506 });
507 let indexer = IndexerBuilderImpl {
508 op_type: OperationType::Flush,
509 metadata,
510 row_group_size: 1024,
511 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
512 intermediate_manager: intm_manager,
513 index_options: IndexOptions::default(),
514 inverted_index_config: InvertedIndexConfig::default(),
515 fulltext_index_config: FulltextIndexConfig::default(),
516 bloom_filter_index_config: BloomFilterConfig::default(),
517 }
518 .build(FileId::random())
519 .await;
520
521 assert!(indexer.inverted_indexer.is_some());
522 assert!(indexer.fulltext_indexer.is_some());
523 assert!(indexer.bloom_filter_indexer.is_some());
524 }
525
526 #[tokio::test]
527 async fn test_build_indexer_disable_create() {
528 let (dir, factory) =
529 PuffinManagerFactory::new_for_test_async("test_build_indexer_disable_create_").await;
530 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
531
532 let metadata = mock_region_metadata(MetaConfig {
533 with_inverted: true,
534 with_fulltext: true,
535 with_skipping_bloom: true,
536 });
537 let indexer = IndexerBuilderImpl {
538 op_type: OperationType::Flush,
539 metadata: metadata.clone(),
540 row_group_size: 1024,
541 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
542 intermediate_manager: intm_manager.clone(),
543 index_options: IndexOptions::default(),
544 inverted_index_config: InvertedIndexConfig {
545 create_on_flush: Mode::Disable,
546 ..Default::default()
547 },
548 fulltext_index_config: FulltextIndexConfig::default(),
549 bloom_filter_index_config: BloomFilterConfig::default(),
550 }
551 .build(FileId::random())
552 .await;
553
554 assert!(indexer.inverted_indexer.is_none());
555 assert!(indexer.fulltext_indexer.is_some());
556 assert!(indexer.bloom_filter_indexer.is_some());
557
558 let indexer = IndexerBuilderImpl {
559 op_type: OperationType::Compact,
560 metadata: metadata.clone(),
561 row_group_size: 1024,
562 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
563 intermediate_manager: intm_manager.clone(),
564 index_options: IndexOptions::default(),
565 inverted_index_config: InvertedIndexConfig::default(),
566 fulltext_index_config: FulltextIndexConfig {
567 create_on_compaction: Mode::Disable,
568 ..Default::default()
569 },
570 bloom_filter_index_config: BloomFilterConfig::default(),
571 }
572 .build(FileId::random())
573 .await;
574
575 assert!(indexer.inverted_indexer.is_some());
576 assert!(indexer.fulltext_indexer.is_none());
577 assert!(indexer.bloom_filter_indexer.is_some());
578
579 let indexer = IndexerBuilderImpl {
580 op_type: OperationType::Compact,
581 metadata,
582 row_group_size: 1024,
583 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
584 intermediate_manager: intm_manager,
585 index_options: IndexOptions::default(),
586 inverted_index_config: InvertedIndexConfig::default(),
587 fulltext_index_config: FulltextIndexConfig::default(),
588 bloom_filter_index_config: BloomFilterConfig {
589 create_on_compaction: Mode::Disable,
590 ..Default::default()
591 },
592 }
593 .build(FileId::random())
594 .await;
595
596 assert!(indexer.inverted_indexer.is_some());
597 assert!(indexer.fulltext_indexer.is_some());
598 assert!(indexer.bloom_filter_indexer.is_none());
599 }
600
601 #[tokio::test]
602 async fn test_build_indexer_no_required() {
603 let (dir, factory) =
604 PuffinManagerFactory::new_for_test_async("test_build_indexer_no_required_").await;
605 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
606
607 let metadata = mock_region_metadata(MetaConfig {
608 with_inverted: false,
609 with_fulltext: true,
610 with_skipping_bloom: true,
611 });
612 let indexer = IndexerBuilderImpl {
613 op_type: OperationType::Flush,
614 metadata: metadata.clone(),
615 row_group_size: 1024,
616 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
617 intermediate_manager: intm_manager.clone(),
618 index_options: IndexOptions::default(),
619 inverted_index_config: InvertedIndexConfig::default(),
620 fulltext_index_config: FulltextIndexConfig::default(),
621 bloom_filter_index_config: BloomFilterConfig::default(),
622 }
623 .build(FileId::random())
624 .await;
625
626 assert!(indexer.inverted_indexer.is_none());
627 assert!(indexer.fulltext_indexer.is_some());
628 assert!(indexer.bloom_filter_indexer.is_some());
629
630 let metadata = mock_region_metadata(MetaConfig {
631 with_inverted: true,
632 with_fulltext: false,
633 with_skipping_bloom: true,
634 });
635 let indexer = IndexerBuilderImpl {
636 op_type: OperationType::Flush,
637 metadata: metadata.clone(),
638 row_group_size: 1024,
639 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
640 intermediate_manager: intm_manager.clone(),
641 index_options: IndexOptions::default(),
642 inverted_index_config: InvertedIndexConfig::default(),
643 fulltext_index_config: FulltextIndexConfig::default(),
644 bloom_filter_index_config: BloomFilterConfig::default(),
645 }
646 .build(FileId::random())
647 .await;
648
649 assert!(indexer.inverted_indexer.is_some());
650 assert!(indexer.fulltext_indexer.is_none());
651 assert!(indexer.bloom_filter_indexer.is_some());
652
653 let metadata = mock_region_metadata(MetaConfig {
654 with_inverted: true,
655 with_fulltext: true,
656 with_skipping_bloom: false,
657 });
658 let indexer = IndexerBuilderImpl {
659 op_type: OperationType::Flush,
660 metadata: metadata.clone(),
661 row_group_size: 1024,
662 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
663 intermediate_manager: intm_manager,
664 index_options: IndexOptions::default(),
665 inverted_index_config: InvertedIndexConfig::default(),
666 fulltext_index_config: FulltextIndexConfig::default(),
667 bloom_filter_index_config: BloomFilterConfig::default(),
668 }
669 .build(FileId::random())
670 .await;
671
672 assert!(indexer.inverted_indexer.is_some());
673 assert!(indexer.fulltext_indexer.is_some());
674 assert!(indexer.bloom_filter_indexer.is_none());
675 }
676
677 #[tokio::test]
678 async fn test_build_indexer_zero_row_group() {
679 let (dir, factory) =
680 PuffinManagerFactory::new_for_test_async("test_build_indexer_zero_row_group_").await;
681 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
682
683 let metadata = mock_region_metadata(MetaConfig {
684 with_inverted: true,
685 with_fulltext: true,
686 with_skipping_bloom: true,
687 });
688 let indexer = IndexerBuilderImpl {
689 op_type: OperationType::Flush,
690 metadata,
691 row_group_size: 0,
692 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
693 intermediate_manager: intm_manager,
694 index_options: IndexOptions::default(),
695 inverted_index_config: InvertedIndexConfig::default(),
696 fulltext_index_config: FulltextIndexConfig::default(),
697 bloom_filter_index_config: BloomFilterConfig::default(),
698 }
699 .build(FileId::random())
700 .await;
701
702 assert!(indexer.inverted_indexer.is_none());
703 }
704}