1pub(crate) mod bloom_filter;
16pub(crate) mod fulltext_index;
17mod indexer;
18pub mod intermediate;
19pub(crate) mod inverted_index;
20pub mod puffin_manager;
21mod statistics;
22pub(crate) mod store;
23
24use std::num::NonZeroUsize;
25use std::sync::Arc;
26
27use bloom_filter::creator::BloomFilterIndexer;
28use common_telemetry::{debug, info, warn};
29use datatypes::arrow::record_batch::RecordBatch;
30use puffin_manager::SstPuffinManager;
31use smallvec::{SmallVec, smallvec};
32use statistics::{ByteCount, RowCount};
33use store_api::metadata::RegionMetadataRef;
34use store_api::storage::{ColumnId, FileId, RegionId};
35use strum::IntoStaticStr;
36use tokio::sync::{mpsc, oneshot};
37
38use crate::access_layer::{AccessLayerRef, FilePathProvider, OperationType, RegionFilePathFactory};
39use crate::cache::file_cache::{FileType, IndexKey};
40use crate::cache::write_cache::{UploadTracker, WriteCacheRef};
41use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
42use crate::error::Result;
43use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
44use crate::metrics::INDEX_CREATE_MEMORY_USAGE;
45use crate::read::{Batch, BatchReader};
46use crate::region::options::IndexOptions;
47use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
48use crate::region::{ManifestContextRef, RegionLeaderState};
49use crate::request::{
50 BackgroundNotify, IndexBuildFailed, IndexBuildFinished, WorkerRequest, WorkerRequestWithTime,
51};
52use crate::schedule::scheduler::{Job, SchedulerRef};
53use crate::sst::file::{FileHandle, FileMeta, IndexType, RegionFileId};
54use crate::sst::file_purger::FilePurgerRef;
55use crate::sst::index::fulltext_index::creator::FulltextIndexer;
56use crate::sst::index::intermediate::IntermediateManager;
57use crate::sst::index::inverted_index::creator::InvertedIndexer;
58use crate::sst::parquet::SstInfo;
59
60pub(crate) const TYPE_INVERTED_INDEX: &str = "inverted_index";
61pub(crate) const TYPE_FULLTEXT_INDEX: &str = "fulltext_index";
62pub(crate) const TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
63
64#[derive(Debug, Clone, Default)]
66pub struct IndexOutput {
67 pub file_size: u64,
69 pub inverted_index: InvertedIndexOutput,
71 pub fulltext_index: FulltextIndexOutput,
73 pub bloom_filter: BloomFilterOutput,
75}
76
77impl IndexOutput {
78 pub fn build_available_indexes(&self) -> SmallVec<[IndexType; 4]> {
79 let mut indexes = SmallVec::new();
80 if self.inverted_index.is_available() {
81 indexes.push(IndexType::InvertedIndex);
82 }
83 if self.fulltext_index.is_available() {
84 indexes.push(IndexType::FulltextIndex);
85 }
86 if self.bloom_filter.is_available() {
87 indexes.push(IndexType::BloomFilterIndex);
88 }
89 indexes
90 }
91}
92
93#[derive(Debug, Clone, Default)]
95pub struct IndexBaseOutput {
96 pub index_size: ByteCount,
98 pub row_count: RowCount,
100 pub columns: Vec<ColumnId>,
102}
103
104impl IndexBaseOutput {
105 pub fn is_available(&self) -> bool {
106 self.index_size > 0
107 }
108}
109
110pub type InvertedIndexOutput = IndexBaseOutput;
112pub type FulltextIndexOutput = IndexBaseOutput;
114pub type BloomFilterOutput = IndexBaseOutput;
116
117#[derive(Default)]
119pub struct Indexer {
120 file_id: FileId,
121 region_id: RegionId,
122 puffin_manager: Option<SstPuffinManager>,
123 inverted_indexer: Option<InvertedIndexer>,
124 last_mem_inverted_index: usize,
125 fulltext_indexer: Option<FulltextIndexer>,
126 last_mem_fulltext_index: usize,
127 bloom_filter_indexer: Option<BloomFilterIndexer>,
128 last_mem_bloom_filter: usize,
129 intermediate_manager: Option<IntermediateManager>,
130}
131
132impl Indexer {
133 pub async fn update(&mut self, batch: &mut Batch) {
135 self.do_update(batch).await;
136
137 self.flush_mem_metrics();
138 }
139
140 pub async fn update_flat(&mut self, batch: &RecordBatch) {
142 self.do_update_flat(batch).await;
143
144 self.flush_mem_metrics();
145 }
146
147 pub async fn finish(&mut self) -> IndexOutput {
149 let output = self.do_finish().await;
150
151 self.flush_mem_metrics();
152 output
153 }
154
155 pub async fn abort(&mut self) {
157 self.do_abort().await;
158
159 self.flush_mem_metrics();
160 }
161
162 fn flush_mem_metrics(&mut self) {
163 let inverted_mem = self
164 .inverted_indexer
165 .as_ref()
166 .map_or(0, |creator| creator.memory_usage());
167 INDEX_CREATE_MEMORY_USAGE
168 .with_label_values(&[TYPE_INVERTED_INDEX])
169 .add(inverted_mem as i64 - self.last_mem_inverted_index as i64);
170 self.last_mem_inverted_index = inverted_mem;
171
172 let fulltext_mem = self
173 .fulltext_indexer
174 .as_ref()
175 .map_or(0, |creator| creator.memory_usage());
176 INDEX_CREATE_MEMORY_USAGE
177 .with_label_values(&[TYPE_FULLTEXT_INDEX])
178 .add(fulltext_mem as i64 - self.last_mem_fulltext_index as i64);
179 self.last_mem_fulltext_index = fulltext_mem;
180
181 let bloom_filter_mem = self
182 .bloom_filter_indexer
183 .as_ref()
184 .map_or(0, |creator| creator.memory_usage());
185 INDEX_CREATE_MEMORY_USAGE
186 .with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
187 .add(bloom_filter_mem as i64 - self.last_mem_bloom_filter as i64);
188 self.last_mem_bloom_filter = bloom_filter_mem;
189 }
190}
191
192#[async_trait::async_trait]
193pub trait IndexerBuilder {
194 async fn build(&self, file_id: FileId) -> Indexer;
196}
197#[derive(Clone)]
198pub(crate) struct IndexerBuilderImpl {
199 pub(crate) build_type: IndexBuildType,
200 pub(crate) metadata: RegionMetadataRef,
201 pub(crate) row_group_size: usize,
202 pub(crate) puffin_manager: SstPuffinManager,
203 pub(crate) intermediate_manager: IntermediateManager,
204 pub(crate) index_options: IndexOptions,
205 pub(crate) inverted_index_config: InvertedIndexConfig,
206 pub(crate) fulltext_index_config: FulltextIndexConfig,
207 pub(crate) bloom_filter_index_config: BloomFilterConfig,
208}
209
210#[async_trait::async_trait]
211impl IndexerBuilder for IndexerBuilderImpl {
212 async fn build(&self, file_id: FileId) -> Indexer {
214 let mut indexer = Indexer {
215 file_id,
216 region_id: self.metadata.region_id,
217 ..Default::default()
218 };
219
220 indexer.inverted_indexer = self.build_inverted_indexer(file_id);
221 indexer.fulltext_indexer = self.build_fulltext_indexer(file_id).await;
222 indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(file_id);
223 indexer.intermediate_manager = Some(self.intermediate_manager.clone());
224 if indexer.inverted_indexer.is_none()
225 && indexer.fulltext_indexer.is_none()
226 && indexer.bloom_filter_indexer.is_none()
227 {
228 indexer.abort().await;
229 return Indexer::default();
230 }
231
232 indexer.puffin_manager = Some(self.puffin_manager.clone());
233 indexer
234 }
235}
236
237impl IndexerBuilderImpl {
238 fn build_inverted_indexer(&self, file_id: FileId) -> Option<InvertedIndexer> {
239 let create = match self.build_type {
240 IndexBuildType::Flush => self.inverted_index_config.create_on_flush.auto(),
241 IndexBuildType::Compact => self.inverted_index_config.create_on_compaction.auto(),
242 _ => true,
243 };
244
245 if !create {
246 debug!(
247 "Skip creating inverted index due to config, region_id: {}, file_id: {}",
248 self.metadata.region_id, file_id,
249 );
250 return None;
251 }
252
253 let indexed_column_ids = self.metadata.inverted_indexed_column_ids(
254 self.index_options.inverted_index.ignore_column_ids.iter(),
255 );
256 if indexed_column_ids.is_empty() {
257 debug!(
258 "No columns to be indexed, skip creating inverted index, region_id: {}, file_id: {}",
259 self.metadata.region_id, file_id,
260 );
261 return None;
262 }
263
264 let Some(mut segment_row_count) =
265 NonZeroUsize::new(self.index_options.inverted_index.segment_row_count)
266 else {
267 warn!(
268 "Segment row count is 0, skip creating index, region_id: {}, file_id: {}",
269 self.metadata.region_id, file_id,
270 );
271 return None;
272 };
273
274 let Some(row_group_size) = NonZeroUsize::new(self.row_group_size) else {
275 warn!(
276 "Row group size is 0, skip creating index, region_id: {}, file_id: {}",
277 self.metadata.region_id, file_id,
278 );
279 return None;
280 };
281
282 if row_group_size.get() % segment_row_count.get() != 0 {
284 segment_row_count = row_group_size;
285 }
286
287 let indexer = InvertedIndexer::new(
288 file_id,
289 &self.metadata,
290 self.intermediate_manager.clone(),
291 self.inverted_index_config.mem_threshold_on_create(),
292 segment_row_count,
293 indexed_column_ids,
294 );
295
296 Some(indexer)
297 }
298
299 async fn build_fulltext_indexer(&self, file_id: FileId) -> Option<FulltextIndexer> {
300 let create = match self.build_type {
301 IndexBuildType::Flush => self.fulltext_index_config.create_on_flush.auto(),
302 IndexBuildType::Compact => self.fulltext_index_config.create_on_compaction.auto(),
303 _ => true,
304 };
305
306 if !create {
307 debug!(
308 "Skip creating full-text index due to config, region_id: {}, file_id: {}",
309 self.metadata.region_id, file_id,
310 );
311 return None;
312 }
313
314 let mem_limit = self.fulltext_index_config.mem_threshold_on_create();
315 let creator = FulltextIndexer::new(
316 &self.metadata.region_id,
317 &file_id,
318 &self.intermediate_manager,
319 &self.metadata,
320 self.fulltext_index_config.compress,
321 mem_limit,
322 )
323 .await;
324
325 let err = match creator {
326 Ok(creator) => {
327 if creator.is_none() {
328 debug!(
329 "Skip creating full-text index due to no columns require indexing, region_id: {}, file_id: {}",
330 self.metadata.region_id, file_id,
331 );
332 }
333 return creator;
334 }
335 Err(err) => err,
336 };
337
338 if cfg!(any(test, feature = "test")) {
339 panic!(
340 "Failed to create full-text indexer, region_id: {}, file_id: {}, err: {:?}",
341 self.metadata.region_id, file_id, err
342 );
343 } else {
344 warn!(
345 err; "Failed to create full-text indexer, region_id: {}, file_id: {}",
346 self.metadata.region_id, file_id,
347 );
348 }
349
350 None
351 }
352
353 fn build_bloom_filter_indexer(&self, file_id: FileId) -> Option<BloomFilterIndexer> {
354 let create = match self.build_type {
355 IndexBuildType::Flush => self.bloom_filter_index_config.create_on_flush.auto(),
356 IndexBuildType::Compact => self.bloom_filter_index_config.create_on_compaction.auto(),
357 _ => true,
358 };
359
360 if !create {
361 debug!(
362 "Skip creating bloom filter due to config, region_id: {}, file_id: {}",
363 self.metadata.region_id, file_id,
364 );
365 return None;
366 }
367
368 let mem_limit = self.bloom_filter_index_config.mem_threshold_on_create();
369 let indexer = BloomFilterIndexer::new(
370 file_id,
371 &self.metadata,
372 self.intermediate_manager.clone(),
373 mem_limit,
374 );
375
376 let err = match indexer {
377 Ok(indexer) => {
378 if indexer.is_none() {
379 debug!(
380 "Skip creating bloom filter due to no columns require indexing, region_id: {}, file_id: {}",
381 self.metadata.region_id, file_id,
382 );
383 }
384 return indexer;
385 }
386 Err(err) => err,
387 };
388
389 if cfg!(any(test, feature = "test")) {
390 panic!(
391 "Failed to create bloom filter, region_id: {}, file_id: {}, err: {:?}",
392 self.metadata.region_id, file_id, err
393 );
394 } else {
395 warn!(
396 err; "Failed to create bloom filter, region_id: {}, file_id: {}",
397 self.metadata.region_id, file_id,
398 );
399 }
400
401 None
402 }
403}
404
405#[derive(Debug, Clone, PartialEq, IntoStaticStr)]
407pub enum IndexBuildType {
408 SchemaChange,
410 Flush,
412 Compact,
414 Manual,
416}
417
418impl IndexBuildType {
419 fn as_str(&self) -> &'static str {
420 self.into()
421 }
422}
423
424impl From<OperationType> for IndexBuildType {
425 fn from(op_type: OperationType) -> Self {
426 match op_type {
427 OperationType::Flush => IndexBuildType::Flush,
428 OperationType::Compact => IndexBuildType::Compact,
429 }
430 }
431}
432
433#[derive(Debug, Clone, PartialEq, Eq, Hash)]
435pub enum IndexBuildOutcome {
436 Finished,
437 Aborted(String),
438}
439
440pub struct IndexBuildTask {
441 pub file_meta: FileMeta,
443 pub reason: IndexBuildType,
444 pub access_layer: AccessLayerRef,
445 pub(crate) manifest_ctx: ManifestContextRef,
446 pub write_cache: Option<WriteCacheRef>,
447 pub file_purger: FilePurgerRef,
448 pub indexer_builder: Arc<dyn IndexerBuilder + Send + Sync>,
451 pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
453 pub result_sender: Option<oneshot::Sender<IndexBuildOutcome>>,
455}
456
457impl IndexBuildTask {
458 fn into_index_build_job(mut self, version_control: &VersionControlRef) -> Job {
459 let version_data = version_control.current();
460
461 Box::pin(async move {
462 self.do_index_build(version_data).await;
463 })
464 }
465
466 async fn do_index_build(&mut self, version_data: VersionControlData) {
467 let outcome = match self.index_build(&version_data).await {
468 Ok(outcome) => outcome,
469 Err(e) => {
470 warn!(
471 e; "Index build task failed, region: {}, file_id: {}",
472 self.file_meta.region_id, self.file_meta.file_id,
473 );
474 IndexBuildOutcome::Aborted(format!("Index build failed: {}", e))
475 }
476 };
477 if let Some(sender) = self.result_sender.take() {
478 let _ = sender.send(outcome);
479 }
480 }
481
482 async fn check_sst_file_exists(&self, version: &VersionRef) -> bool {
484 let region_id = self.file_meta.region_id;
485 let file_id = self.file_meta.file_id;
486
487 let found_in_version = version
488 .ssts
489 .levels()
490 .iter()
491 .flat_map(|level| level.files.iter())
492 .any(|(id, handle)| {
493 *id == self.file_meta.file_id && !handle.is_deleted() && !handle.compacting()
494 });
495 if !found_in_version {
496 warn!(
497 "File id {} not found in region version for index build, region: {}",
498 file_id, region_id
499 );
500 false
501 } else {
502 true
506 }
507 }
508
509 async fn index_build(
510 &mut self,
511 version_data: &VersionControlData,
512 ) -> Result<IndexBuildOutcome> {
513 let version = &version_data.version;
514 let mut indexer = self.indexer_builder.build(self.file_meta.file_id).await;
515 let mut parquet_reader = self
516 .access_layer
517 .read_sst(FileHandle::new(
518 self.file_meta.clone(),
519 self.file_purger.clone(),
520 ))
521 .build()
522 .await?;
523
524 loop {
526 match parquet_reader.next_batch().await {
527 Ok(Some(batch)) => {
528 indexer.update(&mut batch.clone()).await;
529 }
530 Ok(None) => break,
531 Err(e) => {
532 indexer.abort().await;
533 return Err(e);
534 }
535 }
536 }
537 let index_output = indexer.finish().await;
538
539 if index_output.file_size > 0 {
540 if !self.check_sst_file_exists(version).await {
542 indexer.abort().await;
544 return Ok(IndexBuildOutcome::Aborted(format!(
545 "SST file not found during index build, region: {}, file_id: {}",
546 self.file_meta.region_id, self.file_meta.file_id
547 )));
548 }
549
550 self.maybe_upload_index_file(index_output.clone()).await?;
552
553 let worker_request = match self.update_manifest(index_output).await {
554 Ok(edit) => {
555 let index_build_finished = IndexBuildFinished {
556 region_id: self.file_meta.region_id,
557 edit,
558 };
559 WorkerRequest::Background {
560 region_id: self.file_meta.region_id,
561 notify: BackgroundNotify::IndexBuildFinished(index_build_finished),
562 }
563 }
564 Err(e) => {
565 let err = Arc::new(e);
566 WorkerRequest::Background {
567 region_id: self.file_meta.region_id,
568 notify: BackgroundNotify::IndexBuildFailed(IndexBuildFailed { err }),
569 }
570 }
571 };
572
573 let _ = self
574 .request_sender
575 .send(WorkerRequestWithTime::new(worker_request))
576 .await;
577 }
578 Ok(IndexBuildOutcome::Finished)
579 }
580
581 async fn maybe_upload_index_file(&self, output: IndexOutput) -> Result<()> {
582 if let Some(write_cache) = &self.write_cache {
583 let file_id = self.file_meta.file_id;
584 let region_id = self.file_meta.region_id;
585 let remote_store = self.access_layer.object_store();
586 let mut upload_tracker = UploadTracker::new(region_id);
587 let mut err = None;
588 let puffin_key = IndexKey::new(region_id, file_id, FileType::Puffin);
589 let puffin_path = RegionFilePathFactory::new(
590 self.access_layer.table_dir().to_string(),
591 self.access_layer.path_type(),
592 )
593 .build_index_file_path(RegionFileId::new(region_id, file_id));
594 if let Err(e) = write_cache
595 .upload(puffin_key, &puffin_path, remote_store)
596 .await
597 {
598 err = Some(e);
599 }
600 upload_tracker.push_uploaded_file(puffin_path);
601 if let Some(err) = err {
602 upload_tracker
604 .clean(
605 &smallvec![SstInfo {
606 file_id,
607 index_metadata: output,
608 ..Default::default()
609 }],
610 &write_cache.file_cache(),
611 remote_store,
612 )
613 .await;
614 return Err(err);
615 }
616 } else {
617 debug!("write cache is not available, skip uploading index file");
618 }
619 Ok(())
620 }
621
622 async fn update_manifest(&mut self, output: IndexOutput) -> Result<RegionEdit> {
623 self.file_meta.available_indexes = output.build_available_indexes();
624 self.file_meta.index_file_size = output.file_size;
625 let edit = RegionEdit {
626 files_to_add: vec![self.file_meta.clone()],
627 files_to_remove: vec![],
628 timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
629 flushed_sequence: None,
630 flushed_entry_id: None,
631 committed_sequence: None,
632 compaction_time_window: None,
633 };
634 let version = self
635 .manifest_ctx
636 .update_manifest(
637 RegionLeaderState::Writable,
638 RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())),
639 )
640 .await?;
641 info!(
642 "Successfully update manifest version to {version}, region: {}, reason: {}",
643 self.file_meta.region_id,
644 self.reason.as_str()
645 );
646 Ok(edit)
647 }
648}
649
650#[derive(Clone)]
651pub struct IndexBuildScheduler {
652 scheduler: SchedulerRef,
653}
654
655impl IndexBuildScheduler {
656 pub fn new(scheduler: SchedulerRef) -> Self {
657 IndexBuildScheduler { scheduler }
658 }
659
660 pub(crate) fn schedule_build(
661 &mut self,
662 version_control: &VersionControlRef,
663 task: IndexBuildTask,
664 ) -> Result<()> {
665 let job = task.into_index_build_job(version_control);
666 self.scheduler.schedule(job)?;
667 Ok(())
668 }
669}
670
671#[cfg(test)]
672mod tests {
673 use std::sync::Arc;
674
675 use api::v1::SemanticType;
676 use common_base::readable_size::ReadableSize;
677 use datafusion_common::HashMap;
678 use datatypes::data_type::ConcreteDataType;
679 use datatypes::schema::{
680 ColumnSchema, FulltextOptions, SkippingIndexOptions, SkippingIndexType,
681 };
682 use object_store::ObjectStore;
683 use object_store::services::Memory;
684 use puffin_manager::PuffinManagerFactory;
685 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
686 use tokio::sync::{mpsc, oneshot};
687
688 use super::*;
689 use crate::access_layer::{FilePathProvider, SstWriteRequest, WriteType};
690 use crate::cache::write_cache::WriteCache;
691 use crate::config::{FulltextIndexConfig, IndexBuildMode, MitoConfig, Mode};
692 use crate::memtable::time_partition::TimePartitions;
693 use crate::region::version::{VersionBuilder, VersionControl};
694 use crate::sst::file::RegionFileId;
695 use crate::sst::file_purger::NoopFilePurger;
696 use crate::sst::location;
697 use crate::sst::parquet::WriteOptions;
698 use crate::test_util::memtable_util::EmptyMemtableBuilder;
699 use crate::test_util::scheduler_util::SchedulerEnv;
700 use crate::test_util::sst_util::{new_batch_by_range, new_source, sst_region_metadata};
701
702 struct MetaConfig {
703 with_inverted: bool,
704 with_fulltext: bool,
705 with_skipping_bloom: bool,
706 }
707
708 fn mock_region_metadata(
709 MetaConfig {
710 with_inverted,
711 with_fulltext,
712 with_skipping_bloom,
713 }: MetaConfig,
714 ) -> RegionMetadataRef {
715 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
716 let mut column_schema = ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false);
717 if with_inverted {
718 column_schema = column_schema.with_inverted_index(true);
719 }
720 builder
721 .push_column_metadata(ColumnMetadata {
722 column_schema,
723 semantic_type: SemanticType::Field,
724 column_id: 1,
725 })
726 .push_column_metadata(ColumnMetadata {
727 column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
728 semantic_type: SemanticType::Field,
729 column_id: 2,
730 })
731 .push_column_metadata(ColumnMetadata {
732 column_schema: ColumnSchema::new(
733 "c",
734 ConcreteDataType::timestamp_millisecond_datatype(),
735 false,
736 ),
737 semantic_type: SemanticType::Timestamp,
738 column_id: 3,
739 });
740
741 if with_fulltext {
742 let column_schema =
743 ColumnSchema::new("text", ConcreteDataType::string_datatype(), true)
744 .with_fulltext_options(FulltextOptions {
745 enable: true,
746 ..Default::default()
747 })
748 .unwrap();
749
750 let column = ColumnMetadata {
751 column_schema,
752 semantic_type: SemanticType::Field,
753 column_id: 4,
754 };
755
756 builder.push_column_metadata(column);
757 }
758
759 if with_skipping_bloom {
760 let column_schema =
761 ColumnSchema::new("bloom", ConcreteDataType::string_datatype(), false)
762 .with_skipping_options(SkippingIndexOptions::new_unchecked(
763 42,
764 0.01,
765 SkippingIndexType::BloomFilter,
766 ))
767 .unwrap();
768
769 let column = ColumnMetadata {
770 column_schema,
771 semantic_type: SemanticType::Field,
772 column_id: 5,
773 };
774
775 builder.push_column_metadata(column);
776 }
777
778 Arc::new(builder.build().unwrap())
779 }
780
781 fn mock_object_store() -> ObjectStore {
782 ObjectStore::new(Memory::default()).unwrap().finish()
783 }
784
785 async fn mock_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
786 IntermediateManager::init_fs(path).await.unwrap()
787 }
788 struct NoopPathProvider;
789
790 impl FilePathProvider for NoopPathProvider {
791 fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
792 unreachable!()
793 }
794
795 fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
796 unreachable!()
797 }
798 }
799
800 async fn mock_sst_file(
801 metadata: RegionMetadataRef,
802 env: &SchedulerEnv,
803 build_mode: IndexBuildMode,
804 ) -> SstInfo {
805 let source = new_source(&[
806 new_batch_by_range(&["a", "d"], 0, 60),
807 new_batch_by_range(&["b", "f"], 0, 40),
808 new_batch_by_range(&["b", "h"], 100, 200),
809 ]);
810 let mut index_config = MitoConfig::default().index;
811 index_config.build_mode = build_mode;
812 let write_request = SstWriteRequest {
813 op_type: OperationType::Flush,
814 metadata: metadata.clone(),
815 source: either::Left(source),
816 storage: None,
817 max_sequence: None,
818 cache_manager: Default::default(),
819 index_options: IndexOptions::default(),
820 index_config,
821 inverted_index_config: Default::default(),
822 fulltext_index_config: Default::default(),
823 bloom_filter_index_config: Default::default(),
824 };
825 env.access_layer
826 .write_sst(write_request, &WriteOptions::default(), WriteType::Flush)
827 .await
828 .unwrap()
829 .0
830 .remove(0)
831 }
832
833 async fn mock_version_control(
834 metadata: RegionMetadataRef,
835 file_purger: FilePurgerRef,
836 files: HashMap<FileId, FileMeta>,
837 ) -> VersionControlRef {
838 let mutable = Arc::new(TimePartitions::new(
839 metadata.clone(),
840 Arc::new(EmptyMemtableBuilder::default()),
841 0,
842 None,
843 ));
844 let version_builder = VersionBuilder::new(metadata, mutable)
845 .add_files(file_purger, files.values().cloned())
846 .build();
847 Arc::new(VersionControl::new(version_builder))
848 }
849
850 async fn mock_indexer_builder(
851 metadata: RegionMetadataRef,
852 env: &SchedulerEnv,
853 ) -> Arc<dyn IndexerBuilder + Send + Sync> {
854 let (dir, factory) = PuffinManagerFactory::new_for_test_async("mock_indexer_builder").await;
855 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
856 let puffin_manager = factory.build(
857 env.access_layer.object_store().clone(),
858 RegionFilePathFactory::new(
859 env.access_layer.table_dir().to_string(),
860 env.access_layer.path_type(),
861 ),
862 );
863 Arc::new(IndexerBuilderImpl {
864 build_type: IndexBuildType::Flush,
865 metadata,
866 row_group_size: 1024,
867 puffin_manager,
868 intermediate_manager: intm_manager,
869 index_options: IndexOptions::default(),
870 inverted_index_config: InvertedIndexConfig::default(),
871 fulltext_index_config: FulltextIndexConfig::default(),
872 bloom_filter_index_config: BloomFilterConfig::default(),
873 })
874 }
875
876 #[tokio::test]
877 async fn test_build_indexer_basic() {
878 let (dir, factory) =
879 PuffinManagerFactory::new_for_test_async("test_build_indexer_basic_").await;
880 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
881
882 let metadata = mock_region_metadata(MetaConfig {
883 with_inverted: true,
884 with_fulltext: true,
885 with_skipping_bloom: true,
886 });
887 let indexer = IndexerBuilderImpl {
888 build_type: IndexBuildType::Flush,
889 metadata,
890 row_group_size: 1024,
891 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
892 intermediate_manager: intm_manager,
893 index_options: IndexOptions::default(),
894 inverted_index_config: InvertedIndexConfig::default(),
895 fulltext_index_config: FulltextIndexConfig::default(),
896 bloom_filter_index_config: BloomFilterConfig::default(),
897 }
898 .build(FileId::random())
899 .await;
900
901 assert!(indexer.inverted_indexer.is_some());
902 assert!(indexer.fulltext_indexer.is_some());
903 assert!(indexer.bloom_filter_indexer.is_some());
904 }
905
906 #[tokio::test]
907 async fn test_build_indexer_disable_create() {
908 let (dir, factory) =
909 PuffinManagerFactory::new_for_test_async("test_build_indexer_disable_create_").await;
910 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
911
912 let metadata = mock_region_metadata(MetaConfig {
913 with_inverted: true,
914 with_fulltext: true,
915 with_skipping_bloom: true,
916 });
917 let indexer = IndexerBuilderImpl {
918 build_type: IndexBuildType::Flush,
919 metadata: metadata.clone(),
920 row_group_size: 1024,
921 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
922 intermediate_manager: intm_manager.clone(),
923 index_options: IndexOptions::default(),
924 inverted_index_config: InvertedIndexConfig {
925 create_on_flush: Mode::Disable,
926 ..Default::default()
927 },
928 fulltext_index_config: FulltextIndexConfig::default(),
929 bloom_filter_index_config: BloomFilterConfig::default(),
930 }
931 .build(FileId::random())
932 .await;
933
934 assert!(indexer.inverted_indexer.is_none());
935 assert!(indexer.fulltext_indexer.is_some());
936 assert!(indexer.bloom_filter_indexer.is_some());
937
938 let indexer = IndexerBuilderImpl {
939 build_type: IndexBuildType::Compact,
940 metadata: metadata.clone(),
941 row_group_size: 1024,
942 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
943 intermediate_manager: intm_manager.clone(),
944 index_options: IndexOptions::default(),
945 inverted_index_config: InvertedIndexConfig::default(),
946 fulltext_index_config: FulltextIndexConfig {
947 create_on_compaction: Mode::Disable,
948 ..Default::default()
949 },
950 bloom_filter_index_config: BloomFilterConfig::default(),
951 }
952 .build(FileId::random())
953 .await;
954
955 assert!(indexer.inverted_indexer.is_some());
956 assert!(indexer.fulltext_indexer.is_none());
957 assert!(indexer.bloom_filter_indexer.is_some());
958
959 let indexer = IndexerBuilderImpl {
960 build_type: IndexBuildType::Compact,
961 metadata,
962 row_group_size: 1024,
963 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
964 intermediate_manager: intm_manager,
965 index_options: IndexOptions::default(),
966 inverted_index_config: InvertedIndexConfig::default(),
967 fulltext_index_config: FulltextIndexConfig::default(),
968 bloom_filter_index_config: BloomFilterConfig {
969 create_on_compaction: Mode::Disable,
970 ..Default::default()
971 },
972 }
973 .build(FileId::random())
974 .await;
975
976 assert!(indexer.inverted_indexer.is_some());
977 assert!(indexer.fulltext_indexer.is_some());
978 assert!(indexer.bloom_filter_indexer.is_none());
979 }
980
981 #[tokio::test]
982 async fn test_build_indexer_no_required() {
983 let (dir, factory) =
984 PuffinManagerFactory::new_for_test_async("test_build_indexer_no_required_").await;
985 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
986
987 let metadata = mock_region_metadata(MetaConfig {
988 with_inverted: false,
989 with_fulltext: true,
990 with_skipping_bloom: true,
991 });
992 let indexer = IndexerBuilderImpl {
993 build_type: IndexBuildType::Flush,
994 metadata: metadata.clone(),
995 row_group_size: 1024,
996 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
997 intermediate_manager: intm_manager.clone(),
998 index_options: IndexOptions::default(),
999 inverted_index_config: InvertedIndexConfig::default(),
1000 fulltext_index_config: FulltextIndexConfig::default(),
1001 bloom_filter_index_config: BloomFilterConfig::default(),
1002 }
1003 .build(FileId::random())
1004 .await;
1005
1006 assert!(indexer.inverted_indexer.is_none());
1007 assert!(indexer.fulltext_indexer.is_some());
1008 assert!(indexer.bloom_filter_indexer.is_some());
1009
1010 let metadata = mock_region_metadata(MetaConfig {
1011 with_inverted: true,
1012 with_fulltext: false,
1013 with_skipping_bloom: true,
1014 });
1015 let indexer = IndexerBuilderImpl {
1016 build_type: IndexBuildType::Flush,
1017 metadata: metadata.clone(),
1018 row_group_size: 1024,
1019 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1020 intermediate_manager: intm_manager.clone(),
1021 index_options: IndexOptions::default(),
1022 inverted_index_config: InvertedIndexConfig::default(),
1023 fulltext_index_config: FulltextIndexConfig::default(),
1024 bloom_filter_index_config: BloomFilterConfig::default(),
1025 }
1026 .build(FileId::random())
1027 .await;
1028
1029 assert!(indexer.inverted_indexer.is_some());
1030 assert!(indexer.fulltext_indexer.is_none());
1031 assert!(indexer.bloom_filter_indexer.is_some());
1032
1033 let metadata = mock_region_metadata(MetaConfig {
1034 with_inverted: true,
1035 with_fulltext: true,
1036 with_skipping_bloom: false,
1037 });
1038 let indexer = IndexerBuilderImpl {
1039 build_type: IndexBuildType::Flush,
1040 metadata: metadata.clone(),
1041 row_group_size: 1024,
1042 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1043 intermediate_manager: intm_manager,
1044 index_options: IndexOptions::default(),
1045 inverted_index_config: InvertedIndexConfig::default(),
1046 fulltext_index_config: FulltextIndexConfig::default(),
1047 bloom_filter_index_config: BloomFilterConfig::default(),
1048 }
1049 .build(FileId::random())
1050 .await;
1051
1052 assert!(indexer.inverted_indexer.is_some());
1053 assert!(indexer.fulltext_indexer.is_some());
1054 assert!(indexer.bloom_filter_indexer.is_none());
1055 }
1056
1057 #[tokio::test]
1058 async fn test_build_indexer_zero_row_group() {
1059 let (dir, factory) =
1060 PuffinManagerFactory::new_for_test_async("test_build_indexer_zero_row_group_").await;
1061 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1062
1063 let metadata = mock_region_metadata(MetaConfig {
1064 with_inverted: true,
1065 with_fulltext: true,
1066 with_skipping_bloom: true,
1067 });
1068 let indexer = IndexerBuilderImpl {
1069 build_type: IndexBuildType::Flush,
1070 metadata,
1071 row_group_size: 0,
1072 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1073 intermediate_manager: intm_manager,
1074 index_options: IndexOptions::default(),
1075 inverted_index_config: InvertedIndexConfig::default(),
1076 fulltext_index_config: FulltextIndexConfig::default(),
1077 bloom_filter_index_config: BloomFilterConfig::default(),
1078 }
1079 .build(FileId::random())
1080 .await;
1081
1082 assert!(indexer.inverted_indexer.is_none());
1083 }
1084
1085 #[tokio::test]
1086 async fn test_index_build_task_sst_not_exist() {
1087 let env = SchedulerEnv::new().await;
1088 let (tx, _rx) = mpsc::channel(4);
1089 let (result_tx, result_rx) = oneshot::channel::<IndexBuildOutcome>();
1090 let mut scheduler = env.mock_index_build_scheduler();
1091 let metadata = Arc::new(sst_region_metadata());
1092 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1093 let file_purger = Arc::new(NoopFilePurger {});
1094 let files = HashMap::new();
1095 let version_control =
1096 mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1097 let region_id = metadata.region_id;
1098 let indexer_builder = mock_indexer_builder(metadata, &env).await;
1099
1100 let task = IndexBuildTask {
1102 file_meta: FileMeta {
1103 region_id,
1104 file_id: FileId::random(),
1105 file_size: 100,
1106 ..Default::default()
1107 },
1108 reason: IndexBuildType::Flush,
1109 access_layer: env.access_layer.clone(),
1110 manifest_ctx,
1111 write_cache: None,
1112 file_purger,
1113 indexer_builder,
1114 request_sender: tx,
1115 result_sender: Some(result_tx),
1116 };
1117
1118 scheduler.schedule_build(&version_control, task).unwrap();
1120 match result_rx.await.unwrap() {
1121 IndexBuildOutcome::Aborted(_) => {}
1122 _ => panic!("Expect aborted result due to missing SST file"),
1123 }
1124 }
1125
1126 #[tokio::test]
1127 async fn test_index_build_task_sst_exist() {
1128 let env = SchedulerEnv::new().await;
1129 let mut scheduler = env.mock_index_build_scheduler();
1130 let metadata = Arc::new(sst_region_metadata());
1131 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1132 let region_id = metadata.region_id;
1133 let file_purger = Arc::new(NoopFilePurger {});
1134 let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
1135 let file_meta = FileMeta {
1136 region_id,
1137 file_id: sst_info.file_id,
1138 file_size: sst_info.file_size,
1139 index_file_size: sst_info.index_metadata.file_size,
1140 num_rows: sst_info.num_rows as u64,
1141 num_row_groups: sst_info.num_row_groups,
1142 ..Default::default()
1143 };
1144 let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1145 let version_control =
1146 mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1147 let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1148
1149 let (tx, mut rx) = mpsc::channel(4);
1151 let (result_tx, result_rx) = oneshot::channel::<IndexBuildOutcome>();
1152 let task = IndexBuildTask {
1153 file_meta: file_meta.clone(),
1154 reason: IndexBuildType::Flush,
1155 access_layer: env.access_layer.clone(),
1156 manifest_ctx,
1157 write_cache: None,
1158 file_purger,
1159 indexer_builder,
1160 request_sender: tx,
1161 result_sender: Some(result_tx),
1162 };
1163
1164 scheduler.schedule_build(&version_control, task).unwrap();
1165
1166 assert_eq!(result_rx.await.unwrap(), IndexBuildOutcome::Finished);
1168
1169 let worker_req = rx.recv().await.unwrap().request;
1171 match worker_req {
1172 WorkerRequest::Background {
1173 region_id: req_region_id,
1174 notify: BackgroundNotify::IndexBuildFinished(finished),
1175 } => {
1176 assert_eq!(req_region_id, region_id);
1177 assert_eq!(finished.edit.files_to_add.len(), 1);
1178 let updated_meta = &finished.edit.files_to_add[0];
1179
1180 assert!(!updated_meta.available_indexes.is_empty());
1182 assert!(updated_meta.index_file_size > 0);
1183 assert_eq!(updated_meta.file_id, file_meta.file_id);
1184 }
1185 _ => panic!("Unexpected worker request: {:?}", worker_req),
1186 }
1187 }
1188
1189 async fn schedule_index_build_task_with_mode(build_mode: IndexBuildMode) {
1190 let env = SchedulerEnv::new().await;
1191 let mut scheduler = env.mock_index_build_scheduler();
1192 let metadata = Arc::new(sst_region_metadata());
1193 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1194 let file_purger = Arc::new(NoopFilePurger {});
1195 let region_id = metadata.region_id;
1196 let sst_info = mock_sst_file(metadata.clone(), &env, build_mode.clone()).await;
1197 let file_meta = FileMeta {
1198 region_id,
1199 file_id: sst_info.file_id,
1200 file_size: sst_info.file_size,
1201 index_file_size: sst_info.index_metadata.file_size,
1202 num_rows: sst_info.num_rows as u64,
1203 num_row_groups: sst_info.num_row_groups,
1204 ..Default::default()
1205 };
1206 let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1207 let version_control =
1208 mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1209 let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1210
1211 let (tx, _rx) = mpsc::channel(4);
1213 let (result_tx, result_rx) = oneshot::channel::<IndexBuildOutcome>();
1214 let task = IndexBuildTask {
1215 file_meta: file_meta.clone(),
1216 reason: IndexBuildType::Flush,
1217 access_layer: env.access_layer.clone(),
1218 manifest_ctx,
1219 write_cache: None,
1220 file_purger,
1221 indexer_builder,
1222 request_sender: tx,
1223 result_sender: Some(result_tx),
1224 };
1225
1226 scheduler.schedule_build(&version_control, task).unwrap();
1227
1228 let puffin_path = location::index_file_path(
1229 env.access_layer.table_dir(),
1230 RegionFileId::new(region_id, file_meta.file_id),
1231 env.access_layer.path_type(),
1232 );
1233
1234 if build_mode == IndexBuildMode::Async {
1235 assert!(
1237 !env.access_layer
1238 .object_store()
1239 .exists(&puffin_path)
1240 .await
1241 .unwrap()
1242 );
1243 } else {
1244 assert!(
1246 env.access_layer
1247 .object_store()
1248 .exists(&puffin_path)
1249 .await
1250 .unwrap()
1251 );
1252 }
1253
1254 assert_eq!(result_rx.await.unwrap(), IndexBuildOutcome::Finished);
1256
1257 assert!(
1259 env.access_layer
1260 .object_store()
1261 .exists(&puffin_path)
1262 .await
1263 .unwrap()
1264 );
1265 }
1266
1267 #[tokio::test]
1268 async fn test_index_build_task_build_mode() {
1269 schedule_index_build_task_with_mode(IndexBuildMode::Async).await;
1270 schedule_index_build_task_with_mode(IndexBuildMode::Sync).await;
1271 }
1272
1273 #[tokio::test]
1274 async fn test_index_build_task_no_index() {
1275 let env = SchedulerEnv::new().await;
1276 let mut scheduler = env.mock_index_build_scheduler();
1277 let mut metadata = sst_region_metadata();
1278 metadata.column_metadatas.iter_mut().for_each(|col| {
1280 col.column_schema.set_inverted_index(false);
1281 let _ = col.column_schema.unset_skipping_options();
1282 });
1283 let region_id = metadata.region_id;
1284 let metadata = Arc::new(metadata);
1285 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1286 let file_purger = Arc::new(NoopFilePurger {});
1287 let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
1288 let file_meta = FileMeta {
1289 region_id,
1290 file_id: sst_info.file_id,
1291 file_size: sst_info.file_size,
1292 index_file_size: sst_info.index_metadata.file_size,
1293 num_rows: sst_info.num_rows as u64,
1294 num_row_groups: sst_info.num_row_groups,
1295 ..Default::default()
1296 };
1297 let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1298 let version_control =
1299 mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1300 let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1301
1302 let (tx, mut rx) = mpsc::channel(4);
1304 let (result_tx, result_rx) = oneshot::channel::<IndexBuildOutcome>();
1305 let task = IndexBuildTask {
1306 file_meta: file_meta.clone(),
1307 reason: IndexBuildType::Flush,
1308 access_layer: env.access_layer.clone(),
1309 manifest_ctx,
1310 write_cache: None,
1311 file_purger,
1312 indexer_builder,
1313 request_sender: tx,
1314 result_sender: Some(result_tx),
1315 };
1316
1317 scheduler.schedule_build(&version_control, task).unwrap();
1318
1319 assert_eq!(result_rx.await.unwrap(), IndexBuildOutcome::Finished);
1321
1322 let _ = rx.recv().await.is_none();
1324 }
1325
1326 #[tokio::test]
1327 async fn test_index_build_task_with_write_cache() {
1328 let env = SchedulerEnv::new().await;
1329 let mut scheduler = env.mock_index_build_scheduler();
1330 let metadata = Arc::new(sst_region_metadata());
1331 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1332 let file_purger = Arc::new(NoopFilePurger {});
1333 let region_id = metadata.region_id;
1334
1335 let (dir, factory) = PuffinManagerFactory::new_for_test_async("test_write_cache").await;
1336 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1337
1338 let write_cache = Arc::new(
1340 WriteCache::new_fs(
1341 dir.path().to_str().unwrap(),
1342 ReadableSize::mb(10),
1343 None,
1344 factory,
1345 intm_manager,
1346 )
1347 .await
1348 .unwrap(),
1349 );
1350 let indexer_builder = Arc::new(IndexerBuilderImpl {
1352 build_type: IndexBuildType::Flush,
1353 metadata: metadata.clone(),
1354 row_group_size: 1024,
1355 puffin_manager: write_cache.build_puffin_manager().clone(),
1356 intermediate_manager: write_cache.intermediate_manager().clone(),
1357 index_options: IndexOptions::default(),
1358 inverted_index_config: InvertedIndexConfig::default(),
1359 fulltext_index_config: FulltextIndexConfig::default(),
1360 bloom_filter_index_config: BloomFilterConfig::default(),
1361 });
1362
1363 let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
1364 let file_meta = FileMeta {
1365 region_id,
1366 file_id: sst_info.file_id,
1367 file_size: sst_info.file_size,
1368 index_file_size: sst_info.index_metadata.file_size,
1369 num_rows: sst_info.num_rows as u64,
1370 num_row_groups: sst_info.num_row_groups,
1371 ..Default::default()
1372 };
1373 let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1374 let version_control =
1375 mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1376
1377 let (tx, mut _rx) = mpsc::channel(4);
1379 let (result_tx, result_rx) = oneshot::channel::<IndexBuildOutcome>();
1380 let task = IndexBuildTask {
1381 file_meta: file_meta.clone(),
1382 reason: IndexBuildType::Flush,
1383 access_layer: env.access_layer.clone(),
1384 manifest_ctx,
1385 write_cache: Some(write_cache.clone()),
1386 file_purger,
1387 indexer_builder,
1388 request_sender: tx,
1389 result_sender: Some(result_tx),
1390 };
1391
1392 scheduler.schedule_build(&version_control, task).unwrap();
1393
1394 assert_eq!(result_rx.await.unwrap(), IndexBuildOutcome::Finished);
1396
1397 let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Puffin);
1399 assert!(write_cache.file_cache().contains_key(&index_key));
1400 }
1401}