1pub(crate) mod bloom_filter;
16pub(crate) mod fulltext_index;
17mod indexer;
18pub mod intermediate;
19pub(crate) mod inverted_index;
20pub mod puffin_manager;
21mod statistics;
22pub(crate) mod store;
23
24use std::num::NonZeroUsize;
25
26use bloom_filter::creator::BloomFilterIndexer;
27use common_telemetry::{debug, warn};
28use datatypes::arrow::record_batch::RecordBatch;
29use puffin_manager::SstPuffinManager;
30use smallvec::SmallVec;
31use statistics::{ByteCount, RowCount};
32use store_api::metadata::RegionMetadataRef;
33use store_api::storage::{ColumnId, RegionId};
34
35use crate::access_layer::OperationType;
36use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
37use crate::metrics::INDEX_CREATE_MEMORY_USAGE;
38use crate::read::Batch;
39use crate::region::options::IndexOptions;
40use crate::sst::file::{FileId, IndexType};
41use crate::sst::index::fulltext_index::creator::FulltextIndexer;
42use crate::sst::index::intermediate::IntermediateManager;
43use crate::sst::index::inverted_index::creator::InvertedIndexer;
44
45pub(crate) const TYPE_INVERTED_INDEX: &str = "inverted_index";
46pub(crate) const TYPE_FULLTEXT_INDEX: &str = "fulltext_index";
47pub(crate) const TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
48
49#[derive(Debug, Clone, Default)]
51pub struct IndexOutput {
52 pub file_size: u64,
54 pub inverted_index: InvertedIndexOutput,
56 pub fulltext_index: FulltextIndexOutput,
58 pub bloom_filter: BloomFilterOutput,
60}
61
62impl IndexOutput {
63 pub fn build_available_indexes(&self) -> SmallVec<[IndexType; 4]> {
64 let mut indexes = SmallVec::new();
65 if self.inverted_index.is_available() {
66 indexes.push(IndexType::InvertedIndex);
67 }
68 if self.fulltext_index.is_available() {
69 indexes.push(IndexType::FulltextIndex);
70 }
71 if self.bloom_filter.is_available() {
72 indexes.push(IndexType::BloomFilterIndex);
73 }
74 indexes
75 }
76}
77
78#[derive(Debug, Clone, Default)]
80pub struct IndexBaseOutput {
81 pub index_size: ByteCount,
83 pub row_count: RowCount,
85 pub columns: Vec<ColumnId>,
87}
88
89impl IndexBaseOutput {
90 pub fn is_available(&self) -> bool {
91 self.index_size > 0
92 }
93}
94
95pub type InvertedIndexOutput = IndexBaseOutput;
97pub type FulltextIndexOutput = IndexBaseOutput;
99pub type BloomFilterOutput = IndexBaseOutput;
101
102#[derive(Default)]
104pub struct Indexer {
105 file_id: FileId,
106 region_id: RegionId,
107 puffin_manager: Option<SstPuffinManager>,
108 inverted_indexer: Option<InvertedIndexer>,
109 last_mem_inverted_index: usize,
110 fulltext_indexer: Option<FulltextIndexer>,
111 last_mem_fulltext_index: usize,
112 bloom_filter_indexer: Option<BloomFilterIndexer>,
113 last_mem_bloom_filter: usize,
114 intermediate_manager: Option<IntermediateManager>,
115}
116
117impl Indexer {
118 pub async fn update(&mut self, batch: &mut Batch) {
120 self.do_update(batch).await;
121
122 self.flush_mem_metrics();
123 }
124
125 pub async fn update_flat(&mut self, batch: &RecordBatch) {
127 self.do_update_flat(batch).await;
128
129 self.flush_mem_metrics();
130 }
131
132 pub async fn finish(&mut self) -> IndexOutput {
134 let output = self.do_finish().await;
135
136 self.flush_mem_metrics();
137 output
138 }
139
140 pub async fn abort(&mut self) {
142 self.do_abort().await;
143
144 self.flush_mem_metrics();
145 }
146
147 fn flush_mem_metrics(&mut self) {
148 let inverted_mem = self
149 .inverted_indexer
150 .as_ref()
151 .map_or(0, |creator| creator.memory_usage());
152 INDEX_CREATE_MEMORY_USAGE
153 .with_label_values(&[TYPE_INVERTED_INDEX])
154 .add(inverted_mem as i64 - self.last_mem_inverted_index as i64);
155 self.last_mem_inverted_index = inverted_mem;
156
157 let fulltext_mem = self
158 .fulltext_indexer
159 .as_ref()
160 .map_or(0, |creator| creator.memory_usage());
161 INDEX_CREATE_MEMORY_USAGE
162 .with_label_values(&[TYPE_FULLTEXT_INDEX])
163 .add(fulltext_mem as i64 - self.last_mem_fulltext_index as i64);
164 self.last_mem_fulltext_index = fulltext_mem;
165
166 let bloom_filter_mem = self
167 .bloom_filter_indexer
168 .as_ref()
169 .map_or(0, |creator| creator.memory_usage());
170 INDEX_CREATE_MEMORY_USAGE
171 .with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
172 .add(bloom_filter_mem as i64 - self.last_mem_bloom_filter as i64);
173 self.last_mem_bloom_filter = bloom_filter_mem;
174 }
175}
176
177#[async_trait::async_trait]
178pub trait IndexerBuilder {
179 async fn build(&self, file_id: FileId) -> Indexer;
181}
182
183pub(crate) struct IndexerBuilderImpl {
184 pub(crate) op_type: OperationType,
185 pub(crate) metadata: RegionMetadataRef,
186 pub(crate) row_group_size: usize,
187 pub(crate) puffin_manager: SstPuffinManager,
188 pub(crate) intermediate_manager: IntermediateManager,
189 pub(crate) index_options: IndexOptions,
190 pub(crate) inverted_index_config: InvertedIndexConfig,
191 pub(crate) fulltext_index_config: FulltextIndexConfig,
192 pub(crate) bloom_filter_index_config: BloomFilterConfig,
193}
194
195#[async_trait::async_trait]
196impl IndexerBuilder for IndexerBuilderImpl {
197 async fn build(&self, file_id: FileId) -> Indexer {
199 let mut indexer = Indexer {
200 file_id,
201 region_id: self.metadata.region_id,
202 ..Default::default()
203 };
204
205 indexer.inverted_indexer = self.build_inverted_indexer(file_id);
206 indexer.fulltext_indexer = self.build_fulltext_indexer(file_id).await;
207 indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(file_id);
208 indexer.intermediate_manager = Some(self.intermediate_manager.clone());
209 if indexer.inverted_indexer.is_none()
210 && indexer.fulltext_indexer.is_none()
211 && indexer.bloom_filter_indexer.is_none()
212 {
213 indexer.abort().await;
214 return Indexer::default();
215 }
216
217 indexer.puffin_manager = Some(self.puffin_manager.clone());
218 indexer
219 }
220}
221
222impl IndexerBuilderImpl {
223 fn build_inverted_indexer(&self, file_id: FileId) -> Option<InvertedIndexer> {
224 let create = match self.op_type {
225 OperationType::Flush => self.inverted_index_config.create_on_flush.auto(),
226 OperationType::Compact => self.inverted_index_config.create_on_compaction.auto(),
227 };
228
229 if !create {
230 debug!(
231 "Skip creating inverted index due to config, region_id: {}, file_id: {}",
232 self.metadata.region_id, file_id,
233 );
234 return None;
235 }
236
237 let indexed_column_ids = self.metadata.inverted_indexed_column_ids(
238 self.index_options.inverted_index.ignore_column_ids.iter(),
239 );
240 if indexed_column_ids.is_empty() {
241 debug!(
242 "No columns to be indexed, skip creating inverted index, region_id: {}, file_id: {}",
243 self.metadata.region_id, file_id,
244 );
245 return None;
246 }
247
248 let Some(mut segment_row_count) =
249 NonZeroUsize::new(self.index_options.inverted_index.segment_row_count)
250 else {
251 warn!(
252 "Segment row count is 0, skip creating index, region_id: {}, file_id: {}",
253 self.metadata.region_id, file_id,
254 );
255 return None;
256 };
257
258 let Some(row_group_size) = NonZeroUsize::new(self.row_group_size) else {
259 warn!(
260 "Row group size is 0, skip creating index, region_id: {}, file_id: {}",
261 self.metadata.region_id, file_id,
262 );
263 return None;
264 };
265
266 if row_group_size.get() % segment_row_count.get() != 0 {
268 segment_row_count = row_group_size;
269 }
270
271 let indexer = InvertedIndexer::new(
272 file_id,
273 &self.metadata,
274 self.intermediate_manager.clone(),
275 self.inverted_index_config.mem_threshold_on_create(),
276 segment_row_count,
277 indexed_column_ids,
278 );
279
280 Some(indexer)
281 }
282
283 async fn build_fulltext_indexer(&self, file_id: FileId) -> Option<FulltextIndexer> {
284 let create = match self.op_type {
285 OperationType::Flush => self.fulltext_index_config.create_on_flush.auto(),
286 OperationType::Compact => self.fulltext_index_config.create_on_compaction.auto(),
287 };
288
289 if !create {
290 debug!(
291 "Skip creating full-text index due to config, region_id: {}, file_id: {}",
292 self.metadata.region_id, file_id,
293 );
294 return None;
295 }
296
297 let mem_limit = self.fulltext_index_config.mem_threshold_on_create();
298 let creator = FulltextIndexer::new(
299 &self.metadata.region_id,
300 &file_id,
301 &self.intermediate_manager,
302 &self.metadata,
303 self.fulltext_index_config.compress,
304 mem_limit,
305 )
306 .await;
307
308 let err = match creator {
309 Ok(creator) => {
310 if creator.is_none() {
311 debug!(
312 "Skip creating full-text index due to no columns require indexing, region_id: {}, file_id: {}",
313 self.metadata.region_id, file_id,
314 );
315 }
316 return creator;
317 }
318 Err(err) => err,
319 };
320
321 if cfg!(any(test, feature = "test")) {
322 panic!(
323 "Failed to create full-text indexer, region_id: {}, file_id: {}, err: {:?}",
324 self.metadata.region_id, file_id, err
325 );
326 } else {
327 warn!(
328 err; "Failed to create full-text indexer, region_id: {}, file_id: {}",
329 self.metadata.region_id, file_id,
330 );
331 }
332
333 None
334 }
335
336 fn build_bloom_filter_indexer(&self, file_id: FileId) -> Option<BloomFilterIndexer> {
337 let create = match self.op_type {
338 OperationType::Flush => self.bloom_filter_index_config.create_on_flush.auto(),
339 OperationType::Compact => self.bloom_filter_index_config.create_on_compaction.auto(),
340 };
341
342 if !create {
343 debug!(
344 "Skip creating bloom filter due to config, region_id: {}, file_id: {}",
345 self.metadata.region_id, file_id,
346 );
347 return None;
348 }
349
350 let mem_limit = self.bloom_filter_index_config.mem_threshold_on_create();
351 let indexer = BloomFilterIndexer::new(
352 file_id,
353 &self.metadata,
354 self.intermediate_manager.clone(),
355 mem_limit,
356 );
357
358 let err = match indexer {
359 Ok(indexer) => {
360 if indexer.is_none() {
361 debug!(
362 "Skip creating bloom filter due to no columns require indexing, region_id: {}, file_id: {}",
363 self.metadata.region_id, file_id,
364 );
365 }
366 return indexer;
367 }
368 Err(err) => err,
369 };
370
371 if cfg!(any(test, feature = "test")) {
372 panic!(
373 "Failed to create bloom filter, region_id: {}, file_id: {}, err: {:?}",
374 self.metadata.region_id, file_id, err
375 );
376 } else {
377 warn!(
378 err; "Failed to create bloom filter, region_id: {}, file_id: {}",
379 self.metadata.region_id, file_id,
380 );
381 }
382
383 None
384 }
385}
386
387#[cfg(test)]
388mod tests {
389 use std::sync::Arc;
390
391 use api::v1::SemanticType;
392 use datatypes::data_type::ConcreteDataType;
393 use datatypes::schema::{
394 ColumnSchema, FulltextOptions, SkippingIndexOptions, SkippingIndexType,
395 };
396 use object_store::services::Memory;
397 use object_store::ObjectStore;
398 use puffin_manager::PuffinManagerFactory;
399 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
400
401 use super::*;
402 use crate::access_layer::FilePathProvider;
403 use crate::config::{FulltextIndexConfig, Mode};
404 use crate::sst::file::RegionFileId;
405
406 struct MetaConfig {
407 with_inverted: bool,
408 with_fulltext: bool,
409 with_skipping_bloom: bool,
410 }
411
412 fn mock_region_metadata(
413 MetaConfig {
414 with_inverted,
415 with_fulltext,
416 with_skipping_bloom,
417 }: MetaConfig,
418 ) -> RegionMetadataRef {
419 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
420 let mut column_schema = ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false);
421 if with_inverted {
422 column_schema = column_schema.with_inverted_index(true);
423 }
424 builder
425 .push_column_metadata(ColumnMetadata {
426 column_schema,
427 semantic_type: SemanticType::Field,
428 column_id: 1,
429 })
430 .push_column_metadata(ColumnMetadata {
431 column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
432 semantic_type: SemanticType::Field,
433 column_id: 2,
434 })
435 .push_column_metadata(ColumnMetadata {
436 column_schema: ColumnSchema::new(
437 "c",
438 ConcreteDataType::timestamp_millisecond_datatype(),
439 false,
440 ),
441 semantic_type: SemanticType::Timestamp,
442 column_id: 3,
443 });
444
445 if with_fulltext {
446 let column_schema =
447 ColumnSchema::new("text", ConcreteDataType::string_datatype(), true)
448 .with_fulltext_options(FulltextOptions {
449 enable: true,
450 ..Default::default()
451 })
452 .unwrap();
453
454 let column = ColumnMetadata {
455 column_schema,
456 semantic_type: SemanticType::Field,
457 column_id: 4,
458 };
459
460 builder.push_column_metadata(column);
461 }
462
463 if with_skipping_bloom {
464 let column_schema =
465 ColumnSchema::new("bloom", ConcreteDataType::string_datatype(), false)
466 .with_skipping_options(SkippingIndexOptions::new_unchecked(
467 42,
468 0.01,
469 SkippingIndexType::BloomFilter,
470 ))
471 .unwrap();
472
473 let column = ColumnMetadata {
474 column_schema,
475 semantic_type: SemanticType::Field,
476 column_id: 5,
477 };
478
479 builder.push_column_metadata(column);
480 }
481
482 Arc::new(builder.build().unwrap())
483 }
484
485 fn mock_object_store() -> ObjectStore {
486 ObjectStore::new(Memory::default()).unwrap().finish()
487 }
488
489 async fn mock_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
490 IntermediateManager::init_fs(path).await.unwrap()
491 }
492
493 struct NoopPathProvider;
494
495 impl FilePathProvider for NoopPathProvider {
496 fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
497 unreachable!()
498 }
499
500 fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
501 unreachable!()
502 }
503 }
504
505 #[tokio::test]
506 async fn test_build_indexer_basic() {
507 let (dir, factory) =
508 PuffinManagerFactory::new_for_test_async("test_build_indexer_basic_").await;
509 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
510
511 let metadata = mock_region_metadata(MetaConfig {
512 with_inverted: true,
513 with_fulltext: true,
514 with_skipping_bloom: true,
515 });
516 let indexer = IndexerBuilderImpl {
517 op_type: OperationType::Flush,
518 metadata,
519 row_group_size: 1024,
520 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
521 intermediate_manager: intm_manager,
522 index_options: IndexOptions::default(),
523 inverted_index_config: InvertedIndexConfig::default(),
524 fulltext_index_config: FulltextIndexConfig::default(),
525 bloom_filter_index_config: BloomFilterConfig::default(),
526 }
527 .build(FileId::random())
528 .await;
529
530 assert!(indexer.inverted_indexer.is_some());
531 assert!(indexer.fulltext_indexer.is_some());
532 assert!(indexer.bloom_filter_indexer.is_some());
533 }
534
535 #[tokio::test]
536 async fn test_build_indexer_disable_create() {
537 let (dir, factory) =
538 PuffinManagerFactory::new_for_test_async("test_build_indexer_disable_create_").await;
539 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
540
541 let metadata = mock_region_metadata(MetaConfig {
542 with_inverted: true,
543 with_fulltext: true,
544 with_skipping_bloom: true,
545 });
546 let indexer = IndexerBuilderImpl {
547 op_type: OperationType::Flush,
548 metadata: metadata.clone(),
549 row_group_size: 1024,
550 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
551 intermediate_manager: intm_manager.clone(),
552 index_options: IndexOptions::default(),
553 inverted_index_config: InvertedIndexConfig {
554 create_on_flush: Mode::Disable,
555 ..Default::default()
556 },
557 fulltext_index_config: FulltextIndexConfig::default(),
558 bloom_filter_index_config: BloomFilterConfig::default(),
559 }
560 .build(FileId::random())
561 .await;
562
563 assert!(indexer.inverted_indexer.is_none());
564 assert!(indexer.fulltext_indexer.is_some());
565 assert!(indexer.bloom_filter_indexer.is_some());
566
567 let indexer = IndexerBuilderImpl {
568 op_type: OperationType::Compact,
569 metadata: metadata.clone(),
570 row_group_size: 1024,
571 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
572 intermediate_manager: intm_manager.clone(),
573 index_options: IndexOptions::default(),
574 inverted_index_config: InvertedIndexConfig::default(),
575 fulltext_index_config: FulltextIndexConfig {
576 create_on_compaction: Mode::Disable,
577 ..Default::default()
578 },
579 bloom_filter_index_config: BloomFilterConfig::default(),
580 }
581 .build(FileId::random())
582 .await;
583
584 assert!(indexer.inverted_indexer.is_some());
585 assert!(indexer.fulltext_indexer.is_none());
586 assert!(indexer.bloom_filter_indexer.is_some());
587
588 let indexer = IndexerBuilderImpl {
589 op_type: OperationType::Compact,
590 metadata,
591 row_group_size: 1024,
592 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
593 intermediate_manager: intm_manager,
594 index_options: IndexOptions::default(),
595 inverted_index_config: InvertedIndexConfig::default(),
596 fulltext_index_config: FulltextIndexConfig::default(),
597 bloom_filter_index_config: BloomFilterConfig {
598 create_on_compaction: Mode::Disable,
599 ..Default::default()
600 },
601 }
602 .build(FileId::random())
603 .await;
604
605 assert!(indexer.inverted_indexer.is_some());
606 assert!(indexer.fulltext_indexer.is_some());
607 assert!(indexer.bloom_filter_indexer.is_none());
608 }
609
610 #[tokio::test]
611 async fn test_build_indexer_no_required() {
612 let (dir, factory) =
613 PuffinManagerFactory::new_for_test_async("test_build_indexer_no_required_").await;
614 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
615
616 let metadata = mock_region_metadata(MetaConfig {
617 with_inverted: false,
618 with_fulltext: true,
619 with_skipping_bloom: true,
620 });
621 let indexer = IndexerBuilderImpl {
622 op_type: OperationType::Flush,
623 metadata: metadata.clone(),
624 row_group_size: 1024,
625 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
626 intermediate_manager: intm_manager.clone(),
627 index_options: IndexOptions::default(),
628 inverted_index_config: InvertedIndexConfig::default(),
629 fulltext_index_config: FulltextIndexConfig::default(),
630 bloom_filter_index_config: BloomFilterConfig::default(),
631 }
632 .build(FileId::random())
633 .await;
634
635 assert!(indexer.inverted_indexer.is_none());
636 assert!(indexer.fulltext_indexer.is_some());
637 assert!(indexer.bloom_filter_indexer.is_some());
638
639 let metadata = mock_region_metadata(MetaConfig {
640 with_inverted: true,
641 with_fulltext: false,
642 with_skipping_bloom: true,
643 });
644 let indexer = IndexerBuilderImpl {
645 op_type: OperationType::Flush,
646 metadata: metadata.clone(),
647 row_group_size: 1024,
648 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
649 intermediate_manager: intm_manager.clone(),
650 index_options: IndexOptions::default(),
651 inverted_index_config: InvertedIndexConfig::default(),
652 fulltext_index_config: FulltextIndexConfig::default(),
653 bloom_filter_index_config: BloomFilterConfig::default(),
654 }
655 .build(FileId::random())
656 .await;
657
658 assert!(indexer.inverted_indexer.is_some());
659 assert!(indexer.fulltext_indexer.is_none());
660 assert!(indexer.bloom_filter_indexer.is_some());
661
662 let metadata = mock_region_metadata(MetaConfig {
663 with_inverted: true,
664 with_fulltext: true,
665 with_skipping_bloom: false,
666 });
667 let indexer = IndexerBuilderImpl {
668 op_type: OperationType::Flush,
669 metadata: metadata.clone(),
670 row_group_size: 1024,
671 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
672 intermediate_manager: intm_manager,
673 index_options: IndexOptions::default(),
674 inverted_index_config: InvertedIndexConfig::default(),
675 fulltext_index_config: FulltextIndexConfig::default(),
676 bloom_filter_index_config: BloomFilterConfig::default(),
677 }
678 .build(FileId::random())
679 .await;
680
681 assert!(indexer.inverted_indexer.is_some());
682 assert!(indexer.fulltext_indexer.is_some());
683 assert!(indexer.bloom_filter_indexer.is_none());
684 }
685
686 #[tokio::test]
687 async fn test_build_indexer_zero_row_group() {
688 let (dir, factory) =
689 PuffinManagerFactory::new_for_test_async("test_build_indexer_zero_row_group_").await;
690 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
691
692 let metadata = mock_region_metadata(MetaConfig {
693 with_inverted: true,
694 with_fulltext: true,
695 with_skipping_bloom: true,
696 });
697 let indexer = IndexerBuilderImpl {
698 op_type: OperationType::Flush,
699 metadata,
700 row_group_size: 0,
701 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
702 intermediate_manager: intm_manager,
703 index_options: IndexOptions::default(),
704 inverted_index_config: InvertedIndexConfig::default(),
705 fulltext_index_config: FulltextIndexConfig::default(),
706 bloom_filter_index_config: BloomFilterConfig::default(),
707 }
708 .build(FileId::random())
709 .await;
710
711 assert!(indexer.inverted_indexer.is_none());
712 }
713}