1pub(crate) mod bloom_filter;
16pub(crate) mod fulltext_index;
17mod indexer;
18pub mod intermediate;
19pub(crate) mod inverted_index;
20pub mod puffin_manager;
21mod statistics;
22pub(crate) mod store;
23
24use std::num::NonZeroUsize;
25
26use bloom_filter::creator::BloomFilterIndexer;
27use common_telemetry::{debug, warn};
28use puffin_manager::SstPuffinManager;
29use smallvec::SmallVec;
30use statistics::{ByteCount, RowCount};
31use store_api::metadata::RegionMetadataRef;
32use store_api::storage::{ColumnId, RegionId};
33
34use crate::access_layer::OperationType;
35use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
36use crate::metrics::INDEX_CREATE_MEMORY_USAGE;
37use crate::read::Batch;
38use crate::region::options::IndexOptions;
39use crate::sst::file::{FileId, IndexType};
40use crate::sst::index::fulltext_index::creator::FulltextIndexer;
41use crate::sst::index::intermediate::IntermediateManager;
42use crate::sst::index::inverted_index::creator::InvertedIndexer;
43
44pub(crate) const TYPE_INVERTED_INDEX: &str = "inverted_index";
45pub(crate) const TYPE_FULLTEXT_INDEX: &str = "fulltext_index";
46pub(crate) const TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
47
48#[derive(Debug, Clone, Default)]
50pub struct IndexOutput {
51 pub file_size: u64,
53 pub inverted_index: InvertedIndexOutput,
55 pub fulltext_index: FulltextIndexOutput,
57 pub bloom_filter: BloomFilterOutput,
59}
60
61impl IndexOutput {
62 pub fn build_available_indexes(&self) -> SmallVec<[IndexType; 4]> {
63 let mut indexes = SmallVec::new();
64 if self.inverted_index.is_available() {
65 indexes.push(IndexType::InvertedIndex);
66 }
67 if self.fulltext_index.is_available() {
68 indexes.push(IndexType::FulltextIndex);
69 }
70 if self.bloom_filter.is_available() {
71 indexes.push(IndexType::BloomFilterIndex);
72 }
73 indexes
74 }
75}
76
77#[derive(Debug, Clone, Default)]
79pub struct IndexBaseOutput {
80 pub index_size: ByteCount,
82 pub row_count: RowCount,
84 pub columns: Vec<ColumnId>,
86}
87
88impl IndexBaseOutput {
89 pub fn is_available(&self) -> bool {
90 self.index_size > 0
91 }
92}
93
94pub type InvertedIndexOutput = IndexBaseOutput;
96pub type FulltextIndexOutput = IndexBaseOutput;
98pub type BloomFilterOutput = IndexBaseOutput;
100
101#[derive(Default)]
103pub struct Indexer {
104 file_id: FileId,
105 region_id: RegionId,
106 puffin_manager: Option<SstPuffinManager>,
107 inverted_indexer: Option<InvertedIndexer>,
108 last_mem_inverted_index: usize,
109 fulltext_indexer: Option<FulltextIndexer>,
110 last_mem_fulltext_index: usize,
111 bloom_filter_indexer: Option<BloomFilterIndexer>,
112 last_mem_bloom_filter: usize,
113}
114
115impl Indexer {
116 pub async fn update(&mut self, batch: &mut Batch) {
118 self.do_update(batch).await;
119
120 self.flush_mem_metrics();
121 }
122
123 pub async fn finish(&mut self) -> IndexOutput {
125 let output = self.do_finish().await;
126
127 self.flush_mem_metrics();
128 output
129 }
130
131 pub async fn abort(&mut self) {
133 self.do_abort().await;
134
135 self.flush_mem_metrics();
136 }
137
138 fn flush_mem_metrics(&mut self) {
139 let inverted_mem = self
140 .inverted_indexer
141 .as_ref()
142 .map_or(0, |creator| creator.memory_usage());
143 INDEX_CREATE_MEMORY_USAGE
144 .with_label_values(&[TYPE_INVERTED_INDEX])
145 .add(inverted_mem as i64 - self.last_mem_inverted_index as i64);
146 self.last_mem_inverted_index = inverted_mem;
147
148 let fulltext_mem = self
149 .fulltext_indexer
150 .as_ref()
151 .map_or(0, |creator| creator.memory_usage());
152 INDEX_CREATE_MEMORY_USAGE
153 .with_label_values(&[TYPE_FULLTEXT_INDEX])
154 .add(fulltext_mem as i64 - self.last_mem_fulltext_index as i64);
155 self.last_mem_fulltext_index = fulltext_mem;
156
157 let bloom_filter_mem = self
158 .bloom_filter_indexer
159 .as_ref()
160 .map_or(0, |creator| creator.memory_usage());
161 INDEX_CREATE_MEMORY_USAGE
162 .with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
163 .add(bloom_filter_mem as i64 - self.last_mem_bloom_filter as i64);
164 self.last_mem_bloom_filter = bloom_filter_mem;
165 }
166}
167
168#[async_trait::async_trait]
169pub trait IndexerBuilder {
170 async fn build(&self, file_id: FileId) -> Indexer;
172}
173
174pub(crate) struct IndexerBuilderImpl {
175 pub(crate) op_type: OperationType,
176 pub(crate) metadata: RegionMetadataRef,
177 pub(crate) row_group_size: usize,
178 pub(crate) puffin_manager: SstPuffinManager,
179 pub(crate) intermediate_manager: IntermediateManager,
180 pub(crate) index_options: IndexOptions,
181 pub(crate) inverted_index_config: InvertedIndexConfig,
182 pub(crate) fulltext_index_config: FulltextIndexConfig,
183 pub(crate) bloom_filter_index_config: BloomFilterConfig,
184}
185
186#[async_trait::async_trait]
187impl IndexerBuilder for IndexerBuilderImpl {
188 async fn build(&self, file_id: FileId) -> Indexer {
190 let mut indexer = Indexer {
191 file_id,
192 region_id: self.metadata.region_id,
193 ..Default::default()
194 };
195
196 indexer.inverted_indexer = self.build_inverted_indexer(file_id);
197 indexer.fulltext_indexer = self.build_fulltext_indexer(file_id).await;
198 indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(file_id);
199 if indexer.inverted_indexer.is_none()
200 && indexer.fulltext_indexer.is_none()
201 && indexer.bloom_filter_indexer.is_none()
202 {
203 indexer.abort().await;
204 return Indexer::default();
205 }
206
207 indexer.puffin_manager = Some(self.puffin_manager.clone());
208 indexer
209 }
210}
211
212impl IndexerBuilderImpl {
213 fn build_inverted_indexer(&self, file_id: FileId) -> Option<InvertedIndexer> {
214 let create = match self.op_type {
215 OperationType::Flush => self.inverted_index_config.create_on_flush.auto(),
216 OperationType::Compact => self.inverted_index_config.create_on_compaction.auto(),
217 };
218
219 if !create {
220 debug!(
221 "Skip creating inverted index due to config, region_id: {}, file_id: {}",
222 self.metadata.region_id, file_id,
223 );
224 return None;
225 }
226
227 let indexed_column_ids = self.metadata.inverted_indexed_column_ids(
228 self.index_options.inverted_index.ignore_column_ids.iter(),
229 );
230 if indexed_column_ids.is_empty() {
231 debug!(
232 "No columns to be indexed, skip creating inverted index, region_id: {}, file_id: {}",
233 self.metadata.region_id, file_id,
234 );
235 return None;
236 }
237
238 let Some(mut segment_row_count) =
239 NonZeroUsize::new(self.index_options.inverted_index.segment_row_count)
240 else {
241 warn!(
242 "Segment row count is 0, skip creating index, region_id: {}, file_id: {}",
243 self.metadata.region_id, file_id,
244 );
245 return None;
246 };
247
248 let Some(row_group_size) = NonZeroUsize::new(self.row_group_size) else {
249 warn!(
250 "Row group size is 0, skip creating index, region_id: {}, file_id: {}",
251 self.metadata.region_id, file_id,
252 );
253 return None;
254 };
255
256 if row_group_size.get() % segment_row_count.get() != 0 {
258 segment_row_count = row_group_size;
259 }
260
261 let indexer = InvertedIndexer::new(
262 file_id,
263 &self.metadata,
264 self.intermediate_manager.clone(),
265 self.inverted_index_config.mem_threshold_on_create(),
266 segment_row_count,
267 indexed_column_ids,
268 );
269
270 Some(indexer)
271 }
272
273 async fn build_fulltext_indexer(&self, file_id: FileId) -> Option<FulltextIndexer> {
274 let create = match self.op_type {
275 OperationType::Flush => self.fulltext_index_config.create_on_flush.auto(),
276 OperationType::Compact => self.fulltext_index_config.create_on_compaction.auto(),
277 };
278
279 if !create {
280 debug!(
281 "Skip creating full-text index due to config, region_id: {}, file_id: {}",
282 self.metadata.region_id, file_id,
283 );
284 return None;
285 }
286
287 let mem_limit = self.fulltext_index_config.mem_threshold_on_create();
288 let creator = FulltextIndexer::new(
289 &self.metadata.region_id,
290 &file_id,
291 &self.intermediate_manager,
292 &self.metadata,
293 self.fulltext_index_config.compress,
294 mem_limit,
295 )
296 .await;
297
298 let err = match creator {
299 Ok(creator) => {
300 if creator.is_none() {
301 debug!(
302 "Skip creating full-text index due to no columns require indexing, region_id: {}, file_id: {}",
303 self.metadata.region_id, file_id,
304 );
305 }
306 return creator;
307 }
308 Err(err) => err,
309 };
310
311 if cfg!(any(test, feature = "test")) {
312 panic!(
313 "Failed to create full-text indexer, region_id: {}, file_id: {}, err: {:?}",
314 self.metadata.region_id, file_id, err
315 );
316 } else {
317 warn!(
318 err; "Failed to create full-text indexer, region_id: {}, file_id: {}",
319 self.metadata.region_id, file_id,
320 );
321 }
322
323 None
324 }
325
326 fn build_bloom_filter_indexer(&self, file_id: FileId) -> Option<BloomFilterIndexer> {
327 let create = match self.op_type {
328 OperationType::Flush => self.bloom_filter_index_config.create_on_flush.auto(),
329 OperationType::Compact => self.bloom_filter_index_config.create_on_compaction.auto(),
330 };
331
332 if !create {
333 debug!(
334 "Skip creating bloom filter due to config, region_id: {}, file_id: {}",
335 self.metadata.region_id, file_id,
336 );
337 return None;
338 }
339
340 let mem_limit = self.bloom_filter_index_config.mem_threshold_on_create();
341 let indexer = BloomFilterIndexer::new(
342 file_id,
343 &self.metadata,
344 self.intermediate_manager.clone(),
345 mem_limit,
346 );
347
348 let err = match indexer {
349 Ok(indexer) => {
350 if indexer.is_none() {
351 debug!(
352 "Skip creating bloom filter due to no columns require indexing, region_id: {}, file_id: {}",
353 self.metadata.region_id, file_id,
354 );
355 }
356 return indexer;
357 }
358 Err(err) => err,
359 };
360
361 if cfg!(any(test, feature = "test")) {
362 panic!(
363 "Failed to create bloom filter, region_id: {}, file_id: {}, err: {:?}",
364 self.metadata.region_id, file_id, err
365 );
366 } else {
367 warn!(
368 err; "Failed to create bloom filter, region_id: {}, file_id: {}",
369 self.metadata.region_id, file_id,
370 );
371 }
372
373 None
374 }
375}
376
377#[cfg(test)]
378mod tests {
379 use std::sync::Arc;
380
381 use api::v1::SemanticType;
382 use datatypes::data_type::ConcreteDataType;
383 use datatypes::schema::{
384 ColumnSchema, FulltextOptions, SkippingIndexOptions, SkippingIndexType,
385 };
386 use object_store::services::Memory;
387 use object_store::ObjectStore;
388 use puffin_manager::PuffinManagerFactory;
389 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
390
391 use super::*;
392 use crate::access_layer::FilePathProvider;
393 use crate::config::{FulltextIndexConfig, Mode};
394
395 struct MetaConfig {
396 with_inverted: bool,
397 with_fulltext: bool,
398 with_skipping_bloom: bool,
399 }
400
401 fn mock_region_metadata(
402 MetaConfig {
403 with_inverted,
404 with_fulltext,
405 with_skipping_bloom,
406 }: MetaConfig,
407 ) -> RegionMetadataRef {
408 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
409 let mut column_schema = ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false);
410 if with_inverted {
411 column_schema = column_schema.with_inverted_index(true);
412 }
413 builder
414 .push_column_metadata(ColumnMetadata {
415 column_schema,
416 semantic_type: SemanticType::Field,
417 column_id: 1,
418 })
419 .push_column_metadata(ColumnMetadata {
420 column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
421 semantic_type: SemanticType::Field,
422 column_id: 2,
423 })
424 .push_column_metadata(ColumnMetadata {
425 column_schema: ColumnSchema::new(
426 "c",
427 ConcreteDataType::timestamp_millisecond_datatype(),
428 false,
429 ),
430 semantic_type: SemanticType::Timestamp,
431 column_id: 3,
432 });
433
434 if with_fulltext {
435 let column_schema =
436 ColumnSchema::new("text", ConcreteDataType::string_datatype(), true)
437 .with_fulltext_options(FulltextOptions {
438 enable: true,
439 ..Default::default()
440 })
441 .unwrap();
442
443 let column = ColumnMetadata {
444 column_schema,
445 semantic_type: SemanticType::Field,
446 column_id: 4,
447 };
448
449 builder.push_column_metadata(column);
450 }
451
452 if with_skipping_bloom {
453 let column_schema =
454 ColumnSchema::new("bloom", ConcreteDataType::string_datatype(), false)
455 .with_skipping_options(SkippingIndexOptions::new_unchecked(
456 42,
457 0.01,
458 SkippingIndexType::BloomFilter,
459 ))
460 .unwrap();
461
462 let column = ColumnMetadata {
463 column_schema,
464 semantic_type: SemanticType::Field,
465 column_id: 5,
466 };
467
468 builder.push_column_metadata(column);
469 }
470
471 Arc::new(builder.build().unwrap())
472 }
473
474 fn mock_object_store() -> ObjectStore {
475 ObjectStore::new(Memory::default()).unwrap().finish()
476 }
477
478 async fn mock_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
479 IntermediateManager::init_fs(path).await.unwrap()
480 }
481
482 struct NoopPathProvider;
483
484 impl FilePathProvider for NoopPathProvider {
485 fn build_index_file_path(&self, _file_id: FileId) -> String {
486 unreachable!()
487 }
488
489 fn build_sst_file_path(&self, _file_id: FileId) -> String {
490 unreachable!()
491 }
492 }
493
494 #[tokio::test]
495 async fn test_build_indexer_basic() {
496 let (dir, factory) =
497 PuffinManagerFactory::new_for_test_async("test_build_indexer_basic_").await;
498 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
499
500 let metadata = mock_region_metadata(MetaConfig {
501 with_inverted: true,
502 with_fulltext: true,
503 with_skipping_bloom: true,
504 });
505 let indexer = IndexerBuilderImpl {
506 op_type: OperationType::Flush,
507 metadata,
508 row_group_size: 1024,
509 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
510 intermediate_manager: intm_manager,
511 index_options: IndexOptions::default(),
512 inverted_index_config: InvertedIndexConfig::default(),
513 fulltext_index_config: FulltextIndexConfig::default(),
514 bloom_filter_index_config: BloomFilterConfig::default(),
515 }
516 .build(FileId::random())
517 .await;
518
519 assert!(indexer.inverted_indexer.is_some());
520 assert!(indexer.fulltext_indexer.is_some());
521 assert!(indexer.bloom_filter_indexer.is_some());
522 }
523
524 #[tokio::test]
525 async fn test_build_indexer_disable_create() {
526 let (dir, factory) =
527 PuffinManagerFactory::new_for_test_async("test_build_indexer_disable_create_").await;
528 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
529
530 let metadata = mock_region_metadata(MetaConfig {
531 with_inverted: true,
532 with_fulltext: true,
533 with_skipping_bloom: true,
534 });
535 let indexer = IndexerBuilderImpl {
536 op_type: OperationType::Flush,
537 metadata: metadata.clone(),
538 row_group_size: 1024,
539 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
540 intermediate_manager: intm_manager.clone(),
541 index_options: IndexOptions::default(),
542 inverted_index_config: InvertedIndexConfig {
543 create_on_flush: Mode::Disable,
544 ..Default::default()
545 },
546 fulltext_index_config: FulltextIndexConfig::default(),
547 bloom_filter_index_config: BloomFilterConfig::default(),
548 }
549 .build(FileId::random())
550 .await;
551
552 assert!(indexer.inverted_indexer.is_none());
553 assert!(indexer.fulltext_indexer.is_some());
554 assert!(indexer.bloom_filter_indexer.is_some());
555
556 let indexer = IndexerBuilderImpl {
557 op_type: OperationType::Compact,
558 metadata: metadata.clone(),
559 row_group_size: 1024,
560 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
561 intermediate_manager: intm_manager.clone(),
562 index_options: IndexOptions::default(),
563 inverted_index_config: InvertedIndexConfig::default(),
564 fulltext_index_config: FulltextIndexConfig {
565 create_on_compaction: Mode::Disable,
566 ..Default::default()
567 },
568 bloom_filter_index_config: BloomFilterConfig::default(),
569 }
570 .build(FileId::random())
571 .await;
572
573 assert!(indexer.inverted_indexer.is_some());
574 assert!(indexer.fulltext_indexer.is_none());
575 assert!(indexer.bloom_filter_indexer.is_some());
576
577 let indexer = IndexerBuilderImpl {
578 op_type: OperationType::Compact,
579 metadata,
580 row_group_size: 1024,
581 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
582 intermediate_manager: intm_manager,
583 index_options: IndexOptions::default(),
584 inverted_index_config: InvertedIndexConfig::default(),
585 fulltext_index_config: FulltextIndexConfig::default(),
586 bloom_filter_index_config: BloomFilterConfig {
587 create_on_compaction: Mode::Disable,
588 ..Default::default()
589 },
590 }
591 .build(FileId::random())
592 .await;
593
594 assert!(indexer.inverted_indexer.is_some());
595 assert!(indexer.fulltext_indexer.is_some());
596 assert!(indexer.bloom_filter_indexer.is_none());
597 }
598
599 #[tokio::test]
600 async fn test_build_indexer_no_required() {
601 let (dir, factory) =
602 PuffinManagerFactory::new_for_test_async("test_build_indexer_no_required_").await;
603 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
604
605 let metadata = mock_region_metadata(MetaConfig {
606 with_inverted: false,
607 with_fulltext: true,
608 with_skipping_bloom: true,
609 });
610 let indexer = IndexerBuilderImpl {
611 op_type: OperationType::Flush,
612 metadata: metadata.clone(),
613 row_group_size: 1024,
614 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
615 intermediate_manager: intm_manager.clone(),
616 index_options: IndexOptions::default(),
617 inverted_index_config: InvertedIndexConfig::default(),
618 fulltext_index_config: FulltextIndexConfig::default(),
619 bloom_filter_index_config: BloomFilterConfig::default(),
620 }
621 .build(FileId::random())
622 .await;
623
624 assert!(indexer.inverted_indexer.is_none());
625 assert!(indexer.fulltext_indexer.is_some());
626 assert!(indexer.bloom_filter_indexer.is_some());
627
628 let metadata = mock_region_metadata(MetaConfig {
629 with_inverted: true,
630 with_fulltext: false,
631 with_skipping_bloom: true,
632 });
633 let indexer = IndexerBuilderImpl {
634 op_type: OperationType::Flush,
635 metadata: metadata.clone(),
636 row_group_size: 1024,
637 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
638 intermediate_manager: intm_manager.clone(),
639 index_options: IndexOptions::default(),
640 inverted_index_config: InvertedIndexConfig::default(),
641 fulltext_index_config: FulltextIndexConfig::default(),
642 bloom_filter_index_config: BloomFilterConfig::default(),
643 }
644 .build(FileId::random())
645 .await;
646
647 assert!(indexer.inverted_indexer.is_some());
648 assert!(indexer.fulltext_indexer.is_none());
649 assert!(indexer.bloom_filter_indexer.is_some());
650
651 let metadata = mock_region_metadata(MetaConfig {
652 with_inverted: true,
653 with_fulltext: true,
654 with_skipping_bloom: false,
655 });
656 let indexer = IndexerBuilderImpl {
657 op_type: OperationType::Flush,
658 metadata: metadata.clone(),
659 row_group_size: 1024,
660 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
661 intermediate_manager: intm_manager,
662 index_options: IndexOptions::default(),
663 inverted_index_config: InvertedIndexConfig::default(),
664 fulltext_index_config: FulltextIndexConfig::default(),
665 bloom_filter_index_config: BloomFilterConfig::default(),
666 }
667 .build(FileId::random())
668 .await;
669
670 assert!(indexer.inverted_indexer.is_some());
671 assert!(indexer.fulltext_indexer.is_some());
672 assert!(indexer.bloom_filter_indexer.is_none());
673 }
674
675 #[tokio::test]
676 async fn test_build_indexer_zero_row_group() {
677 let (dir, factory) =
678 PuffinManagerFactory::new_for_test_async("test_build_indexer_zero_row_group_").await;
679 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
680
681 let metadata = mock_region_metadata(MetaConfig {
682 with_inverted: true,
683 with_fulltext: true,
684 with_skipping_bloom: true,
685 });
686 let indexer = IndexerBuilderImpl {
687 op_type: OperationType::Flush,
688 metadata,
689 row_group_size: 0,
690 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
691 intermediate_manager: intm_manager,
692 index_options: IndexOptions::default(),
693 inverted_index_config: InvertedIndexConfig::default(),
694 fulltext_index_config: FulltextIndexConfig::default(),
695 bloom_filter_index_config: BloomFilterConfig::default(),
696 }
697 .build(FileId::random())
698 .await;
699
700 assert!(indexer.inverted_indexer.is_none());
701 }
702}