1pub(crate) mod bloom_filter;
16pub(crate) mod fulltext_index;
17mod indexer;
18pub mod intermediate;
19pub(crate) mod inverted_index;
20pub mod puffin_manager;
21mod statistics;
22pub(crate) mod store;
23
24use std::cmp::Ordering;
25use std::collections::{BinaryHeap, HashMap, HashSet};
26use std::num::NonZeroUsize;
27use std::sync::Arc;
28
29use bloom_filter::creator::BloomFilterIndexer;
30use common_telemetry::{debug, error, info, warn};
31use datatypes::arrow::array::BinaryArray;
32use datatypes::arrow::record_batch::RecordBatch;
33use mito_codec::index::IndexValuesCodec;
34use mito_codec::row_converter::CompositeValues;
35use object_store::ObjectStore;
36use puffin_manager::SstPuffinManager;
37use smallvec::{SmallVec, smallvec};
38use snafu::{OptionExt, ResultExt};
39use statistics::{ByteCount, RowCount};
40use store_api::metadata::RegionMetadataRef;
41use store_api::storage::{ColumnId, FileId, RegionId};
42use strum::IntoStaticStr;
43use tokio::sync::mpsc::Sender;
44
45use crate::access_layer::{AccessLayerRef, FilePathProvider, OperationType, RegionFilePathFactory};
46use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
47use crate::cache::write_cache::{UploadTracker, WriteCacheRef};
48use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
49use crate::error::{
50 BuildIndexAsyncSnafu, DecodeSnafu, Error, InvalidRecordBatchSnafu, RegionClosedSnafu,
51 RegionDroppedSnafu, RegionTruncatedSnafu, Result,
52};
53use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
54use crate::metrics::INDEX_CREATE_MEMORY_USAGE;
55use crate::read::{Batch, BatchReader};
56use crate::region::options::IndexOptions;
57use crate::region::version::VersionControlRef;
58use crate::region::{ManifestContextRef, RegionLeaderState};
59use crate::request::{
60 BackgroundNotify, IndexBuildFailed, IndexBuildFinished, IndexBuildStopped, WorkerRequest,
61 WorkerRequestWithTime,
62};
63use crate::schedule::scheduler::{Job, SchedulerRef};
64use crate::sst::file::{
65 ColumnIndexMetadata, FileHandle, FileMeta, IndexType, IndexTypes, RegionFileId, RegionIndexId,
66};
67use crate::sst::file_purger::FilePurgerRef;
68use crate::sst::index::fulltext_index::creator::FulltextIndexer;
69use crate::sst::index::intermediate::IntermediateManager;
70use crate::sst::index::inverted_index::creator::InvertedIndexer;
71use crate::sst::parquet::SstInfo;
72use crate::sst::parquet::flat_format::primary_key_column_index;
73use crate::sst::parquet::format::PrimaryKeyArray;
74use crate::worker::WorkerListener;
75
76pub(crate) const TYPE_INVERTED_INDEX: &str = "inverted_index";
77pub(crate) const TYPE_FULLTEXT_INDEX: &str = "fulltext_index";
78pub(crate) const TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
79
80pub(crate) fn trigger_index_background_download(
82 file_cache: Option<&FileCacheRef>,
83 file_id: &RegionIndexId,
84 file_size_hint: Option<u64>,
85 path_factory: &RegionFilePathFactory,
86 object_store: &ObjectStore,
87) {
88 if let (Some(file_cache), Some(file_size)) = (file_cache, file_size_hint) {
89 let index_key = IndexKey::new(
90 file_id.region_id(),
91 file_id.file_id(),
92 FileType::Puffin(file_id.version),
93 );
94 let remote_path = path_factory.build_index_file_path(file_id.file_id);
95 file_cache.maybe_download_background(
96 index_key,
97 remote_path,
98 object_store.clone(),
99 file_size,
100 );
101 }
102}
103
104#[derive(Debug, Clone, Default)]
106pub struct IndexOutput {
107 pub file_size: u64,
109 pub version: u64,
111 pub inverted_index: InvertedIndexOutput,
113 pub fulltext_index: FulltextIndexOutput,
115 pub bloom_filter: BloomFilterOutput,
117}
118
119impl IndexOutput {
120 pub fn build_available_indexes(&self) -> SmallVec<[IndexType; 4]> {
121 let mut indexes = SmallVec::new();
122 if self.inverted_index.is_available() {
123 indexes.push(IndexType::InvertedIndex);
124 }
125 if self.fulltext_index.is_available() {
126 indexes.push(IndexType::FulltextIndex);
127 }
128 if self.bloom_filter.is_available() {
129 indexes.push(IndexType::BloomFilterIndex);
130 }
131 indexes
132 }
133
134 pub fn build_indexes(&self) -> Vec<ColumnIndexMetadata> {
135 let mut map: HashMap<ColumnId, IndexTypes> = HashMap::new();
136
137 if self.inverted_index.is_available() {
138 for &col in &self.inverted_index.columns {
139 map.entry(col).or_default().push(IndexType::InvertedIndex);
140 }
141 }
142 if self.fulltext_index.is_available() {
143 for &col in &self.fulltext_index.columns {
144 map.entry(col).or_default().push(IndexType::FulltextIndex);
145 }
146 }
147 if self.bloom_filter.is_available() {
148 for &col in &self.bloom_filter.columns {
149 map.entry(col)
150 .or_default()
151 .push(IndexType::BloomFilterIndex);
152 }
153 }
154
155 map.into_iter()
156 .map(|(column_id, created_indexes)| ColumnIndexMetadata {
157 column_id,
158 created_indexes,
159 })
160 .collect::<Vec<_>>()
161 }
162}
163
164#[derive(Debug, Clone, Default)]
166pub struct IndexBaseOutput {
167 pub index_size: ByteCount,
169 pub row_count: RowCount,
171 pub columns: Vec<ColumnId>,
173}
174
175impl IndexBaseOutput {
176 pub fn is_available(&self) -> bool {
177 self.index_size > 0
178 }
179}
180
181pub type InvertedIndexOutput = IndexBaseOutput;
183pub type FulltextIndexOutput = IndexBaseOutput;
185pub type BloomFilterOutput = IndexBaseOutput;
187
188#[derive(Default)]
190pub struct Indexer {
191 file_id: FileId,
192 region_id: RegionId,
193 index_version: u64,
194 puffin_manager: Option<SstPuffinManager>,
195 write_cache_enabled: bool,
196 inverted_indexer: Option<InvertedIndexer>,
197 last_mem_inverted_index: usize,
198 fulltext_indexer: Option<FulltextIndexer>,
199 last_mem_fulltext_index: usize,
200 bloom_filter_indexer: Option<BloomFilterIndexer>,
201 last_mem_bloom_filter: usize,
202 intermediate_manager: Option<IntermediateManager>,
203}
204
205impl Indexer {
206 pub async fn update(&mut self, batch: &mut Batch) {
208 self.do_update(batch).await;
209
210 self.flush_mem_metrics();
211 }
212
213 pub async fn update_flat(&mut self, batch: &RecordBatch) {
215 self.do_update_flat(batch).await;
216
217 self.flush_mem_metrics();
218 }
219
220 pub async fn finish(&mut self) -> IndexOutput {
222 let output = self.do_finish().await;
223
224 self.flush_mem_metrics();
225 output
226 }
227
228 pub async fn abort(&mut self) {
230 self.do_abort().await;
231
232 self.flush_mem_metrics();
233 }
234
235 fn flush_mem_metrics(&mut self) {
236 let inverted_mem = self
237 .inverted_indexer
238 .as_ref()
239 .map_or(0, |creator| creator.memory_usage());
240 INDEX_CREATE_MEMORY_USAGE
241 .with_label_values(&[TYPE_INVERTED_INDEX])
242 .add(inverted_mem as i64 - self.last_mem_inverted_index as i64);
243 self.last_mem_inverted_index = inverted_mem;
244
245 let fulltext_mem = self
246 .fulltext_indexer
247 .as_ref()
248 .map_or(0, |creator| creator.memory_usage());
249 INDEX_CREATE_MEMORY_USAGE
250 .with_label_values(&[TYPE_FULLTEXT_INDEX])
251 .add(fulltext_mem as i64 - self.last_mem_fulltext_index as i64);
252 self.last_mem_fulltext_index = fulltext_mem;
253
254 let bloom_filter_mem = self
255 .bloom_filter_indexer
256 .as_ref()
257 .map_or(0, |creator| creator.memory_usage());
258 INDEX_CREATE_MEMORY_USAGE
259 .with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
260 .add(bloom_filter_mem as i64 - self.last_mem_bloom_filter as i64);
261 self.last_mem_bloom_filter = bloom_filter_mem;
262 }
263}
264
265#[async_trait::async_trait]
266pub trait IndexerBuilder {
267 async fn build(&self, file_id: FileId, index_version: u64) -> Indexer;
269}
270#[derive(Clone)]
271pub(crate) struct IndexerBuilderImpl {
272 pub(crate) build_type: IndexBuildType,
273 pub(crate) metadata: RegionMetadataRef,
274 pub(crate) row_group_size: usize,
275 pub(crate) puffin_manager: SstPuffinManager,
276 pub(crate) write_cache_enabled: bool,
277 pub(crate) intermediate_manager: IntermediateManager,
278 pub(crate) index_options: IndexOptions,
279 pub(crate) inverted_index_config: InvertedIndexConfig,
280 pub(crate) fulltext_index_config: FulltextIndexConfig,
281 pub(crate) bloom_filter_index_config: BloomFilterConfig,
282}
283
284#[async_trait::async_trait]
285impl IndexerBuilder for IndexerBuilderImpl {
286 async fn build(&self, file_id: FileId, index_version: u64) -> Indexer {
288 let mut indexer = Indexer {
289 file_id,
290 region_id: self.metadata.region_id,
291 index_version,
292 write_cache_enabled: self.write_cache_enabled,
293 ..Default::default()
294 };
295
296 indexer.inverted_indexer = self.build_inverted_indexer(file_id);
297 indexer.fulltext_indexer = self.build_fulltext_indexer(file_id).await;
298 indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(file_id);
299 indexer.intermediate_manager = Some(self.intermediate_manager.clone());
300 if indexer.inverted_indexer.is_none()
301 && indexer.fulltext_indexer.is_none()
302 && indexer.bloom_filter_indexer.is_none()
303 {
304 indexer.abort().await;
305 return Indexer::default();
306 }
307
308 indexer.puffin_manager = Some(self.puffin_manager.clone());
309 indexer
310 }
311}
312
313impl IndexerBuilderImpl {
314 fn build_inverted_indexer(&self, file_id: FileId) -> Option<InvertedIndexer> {
315 let create = match self.build_type {
316 IndexBuildType::Flush => self.inverted_index_config.create_on_flush.auto(),
317 IndexBuildType::Compact => self.inverted_index_config.create_on_compaction.auto(),
318 _ => true,
319 };
320
321 if !create {
322 debug!(
323 "Skip creating inverted index due to config, region_id: {}, file_id: {}",
324 self.metadata.region_id, file_id,
325 );
326 return None;
327 }
328
329 let indexed_column_ids = self.metadata.inverted_indexed_column_ids(
330 self.index_options.inverted_index.ignore_column_ids.iter(),
331 );
332 if indexed_column_ids.is_empty() {
333 debug!(
334 "No columns to be indexed, skip creating inverted index, region_id: {}, file_id: {}",
335 self.metadata.region_id, file_id,
336 );
337 return None;
338 }
339
340 let Some(mut segment_row_count) =
341 NonZeroUsize::new(self.index_options.inverted_index.segment_row_count)
342 else {
343 warn!(
344 "Segment row count is 0, skip creating index, region_id: {}, file_id: {}",
345 self.metadata.region_id, file_id,
346 );
347 return None;
348 };
349
350 let Some(row_group_size) = NonZeroUsize::new(self.row_group_size) else {
351 warn!(
352 "Row group size is 0, skip creating index, region_id: {}, file_id: {}",
353 self.metadata.region_id, file_id,
354 );
355 return None;
356 };
357
358 if row_group_size.get() % segment_row_count.get() != 0 {
360 segment_row_count = row_group_size;
361 }
362
363 let indexer = InvertedIndexer::new(
364 file_id,
365 &self.metadata,
366 self.intermediate_manager.clone(),
367 self.inverted_index_config.mem_threshold_on_create(),
368 segment_row_count,
369 indexed_column_ids,
370 );
371
372 Some(indexer)
373 }
374
375 async fn build_fulltext_indexer(&self, file_id: FileId) -> Option<FulltextIndexer> {
376 let create = match self.build_type {
377 IndexBuildType::Flush => self.fulltext_index_config.create_on_flush.auto(),
378 IndexBuildType::Compact => self.fulltext_index_config.create_on_compaction.auto(),
379 _ => true,
380 };
381
382 if !create {
383 debug!(
384 "Skip creating full-text index due to config, region_id: {}, file_id: {}",
385 self.metadata.region_id, file_id,
386 );
387 return None;
388 }
389
390 let mem_limit = self.fulltext_index_config.mem_threshold_on_create();
391 let creator = FulltextIndexer::new(
392 &self.metadata.region_id,
393 &file_id,
394 &self.intermediate_manager,
395 &self.metadata,
396 self.fulltext_index_config.compress,
397 mem_limit,
398 )
399 .await;
400
401 let err = match creator {
402 Ok(creator) => {
403 if creator.is_none() {
404 debug!(
405 "Skip creating full-text index due to no columns require indexing, region_id: {}, file_id: {}",
406 self.metadata.region_id, file_id,
407 );
408 }
409 return creator;
410 }
411 Err(err) => err,
412 };
413
414 if cfg!(any(test, feature = "test")) {
415 panic!(
416 "Failed to create full-text indexer, region_id: {}, file_id: {}, err: {:?}",
417 self.metadata.region_id, file_id, err
418 );
419 } else {
420 warn!(
421 err; "Failed to create full-text indexer, region_id: {}, file_id: {}",
422 self.metadata.region_id, file_id,
423 );
424 }
425
426 None
427 }
428
429 fn build_bloom_filter_indexer(&self, file_id: FileId) -> Option<BloomFilterIndexer> {
430 let create = match self.build_type {
431 IndexBuildType::Flush => self.bloom_filter_index_config.create_on_flush.auto(),
432 IndexBuildType::Compact => self.bloom_filter_index_config.create_on_compaction.auto(),
433 _ => true,
434 };
435
436 if !create {
437 debug!(
438 "Skip creating bloom filter due to config, region_id: {}, file_id: {}",
439 self.metadata.region_id, file_id,
440 );
441 return None;
442 }
443
444 let mem_limit = self.bloom_filter_index_config.mem_threshold_on_create();
445 let indexer = BloomFilterIndexer::new(
446 file_id,
447 &self.metadata,
448 self.intermediate_manager.clone(),
449 mem_limit,
450 );
451
452 let err = match indexer {
453 Ok(indexer) => {
454 if indexer.is_none() {
455 debug!(
456 "Skip creating bloom filter due to no columns require indexing, region_id: {}, file_id: {}",
457 self.metadata.region_id, file_id,
458 );
459 }
460 return indexer;
461 }
462 Err(err) => err,
463 };
464
465 if cfg!(any(test, feature = "test")) {
466 panic!(
467 "Failed to create bloom filter, region_id: {}, file_id: {}, err: {:?}",
468 self.metadata.region_id, file_id, err
469 );
470 } else {
471 warn!(
472 err; "Failed to create bloom filter, region_id: {}, file_id: {}",
473 self.metadata.region_id, file_id,
474 );
475 }
476
477 None
478 }
479}
480
481#[derive(Debug, Clone, IntoStaticStr, PartialEq)]
483pub enum IndexBuildType {
484 SchemaChange,
486 Flush,
488 Compact,
490 Manual,
492}
493
494impl IndexBuildType {
495 fn as_str(&self) -> &'static str {
496 self.into()
497 }
498
499 fn priority(&self) -> u8 {
501 match self {
502 IndexBuildType::Manual => 3,
503 IndexBuildType::SchemaChange => 2,
504 IndexBuildType::Flush => 1,
505 IndexBuildType::Compact => 0,
506 }
507 }
508}
509
510impl From<OperationType> for IndexBuildType {
511 fn from(op_type: OperationType) -> Self {
512 match op_type {
513 OperationType::Flush => IndexBuildType::Flush,
514 OperationType::Compact => IndexBuildType::Compact,
515 }
516 }
517}
518
519#[derive(Debug, Clone, PartialEq, Eq, Hash)]
521pub enum IndexBuildOutcome {
522 Finished,
523 Aborted(String),
524}
525
526pub type ResultMpscSender = Sender<Result<IndexBuildOutcome>>;
528
529#[derive(Clone)]
530pub struct IndexBuildTask {
531 pub file: FileHandle,
533 pub file_meta: FileMeta,
535 pub reason: IndexBuildType,
536 pub access_layer: AccessLayerRef,
537 pub(crate) listener: WorkerListener,
538 pub(crate) manifest_ctx: ManifestContextRef,
539 pub write_cache: Option<WriteCacheRef>,
540 pub file_purger: FilePurgerRef,
541 pub indexer_builder: Arc<dyn IndexerBuilder + Send + Sync>,
544 pub(crate) request_sender: Sender<WorkerRequestWithTime>,
546 pub(crate) result_sender: ResultMpscSender,
548}
549
550impl std::fmt::Debug for IndexBuildTask {
551 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
552 f.debug_struct("IndexBuildTask")
553 .field("region_id", &self.file_meta.region_id)
554 .field("file_id", &self.file_meta.file_id)
555 .field("reason", &self.reason)
556 .finish()
557 }
558}
559
560impl IndexBuildTask {
561 pub async fn on_success(&self, outcome: IndexBuildOutcome) {
563 let _ = self.result_sender.send(Ok(outcome)).await;
564 }
565
566 pub async fn on_failure(&self, err: Arc<Error>) {
568 let _ = self
569 .result_sender
570 .send(Err(err.clone()).context(BuildIndexAsyncSnafu {
571 region_id: self.file_meta.region_id,
572 }))
573 .await;
574 }
575
576 fn into_index_build_job(mut self, version_control: VersionControlRef) -> Job {
577 Box::pin(async move {
578 self.do_index_build(version_control).await;
579 })
580 }
581
582 async fn do_index_build(&mut self, version_control: VersionControlRef) {
583 self.listener
584 .on_index_build_begin(RegionFileId::new(
585 self.file_meta.region_id,
586 self.file_meta.file_id,
587 ))
588 .await;
589 match self.index_build(version_control).await {
590 Ok(outcome) => self.on_success(outcome).await,
591 Err(e) => {
592 warn!(
593 e; "Index build task failed, region: {}, file_id: {}",
594 self.file_meta.region_id, self.file_meta.file_id,
595 );
596 self.on_failure(e.into()).await
597 }
598 }
599 let worker_request = WorkerRequest::Background {
600 region_id: self.file_meta.region_id,
601 notify: BackgroundNotify::IndexBuildStopped(IndexBuildStopped {
602 region_id: self.file_meta.region_id,
603 file_id: self.file_meta.file_id,
604 }),
605 };
606 let _ = self
607 .request_sender
608 .send(WorkerRequestWithTime::new(worker_request))
609 .await;
610 }
611
612 async fn check_sst_file_exists(&self, version_control: &VersionControlRef) -> bool {
614 let file_id = self.file_meta.file_id;
615 let level = self.file_meta.level;
616 let version = version_control.current().version;
618
619 let Some(level_files) = version.ssts.levels().get(level as usize) else {
620 warn!(
621 "File id {} not found in level {} for index build, region: {}",
622 file_id, level, self.file_meta.region_id
623 );
624 return false;
625 };
626
627 match level_files.files.get(&file_id) {
628 Some(handle) if !handle.is_deleted() && !handle.compacting() => {
629 true
633 }
634 _ => {
635 warn!(
636 "File id {} not found in region version for index build, region: {}",
637 file_id, self.file_meta.region_id
638 );
639 false
640 }
641 }
642 }
643
644 async fn index_build(
645 &mut self,
646 version_control: VersionControlRef,
647 ) -> Result<IndexBuildOutcome> {
648 let new_index_version = if self.file_meta.index_file_size > 0 {
650 self.file_meta.index_version + 1
652 } else {
653 0 };
655
656 let index_file_id = self.file_meta.file_id;
658 let mut indexer = self
659 .indexer_builder
660 .build(index_file_id, new_index_version)
661 .await;
662
663 if !self.check_sst_file_exists(&version_control).await {
665 indexer.abort().await;
667 self.listener
668 .on_index_build_abort(RegionFileId::new(
669 self.file_meta.region_id,
670 self.file_meta.file_id,
671 ))
672 .await;
673 return Ok(IndexBuildOutcome::Aborted(format!(
674 "SST file not found during index build, region: {}, file_id: {}",
675 self.file_meta.region_id, self.file_meta.file_id
676 )));
677 }
678
679 let mut parquet_reader = self
680 .access_layer
681 .read_sst(self.file.clone()) .build()
683 .await?;
684
685 loop {
687 match parquet_reader.next_batch().await {
688 Ok(Some(mut batch)) => {
689 indexer.update(&mut batch).await;
690 }
691 Ok(None) => break,
692 Err(e) => {
693 indexer.abort().await;
694 return Err(e);
695 }
696 }
697 }
698 let index_output = indexer.finish().await;
699
700 if index_output.file_size > 0 {
701 if !self.check_sst_file_exists(&version_control).await {
703 indexer.abort().await;
705 self.listener
706 .on_index_build_abort(RegionFileId::new(
707 self.file_meta.region_id,
708 self.file_meta.file_id,
709 ))
710 .await;
711 return Ok(IndexBuildOutcome::Aborted(format!(
712 "SST file not found during index build, region: {}, file_id: {}",
713 self.file_meta.region_id, self.file_meta.file_id
714 )));
715 }
716
717 self.maybe_upload_index_file(index_output.clone(), index_file_id, new_index_version)
719 .await?;
720
721 let worker_request = match self.update_manifest(index_output, new_index_version).await {
722 Ok(edit) => {
723 let index_build_finished = IndexBuildFinished {
724 region_id: self.file_meta.region_id,
725 edit,
726 };
727 WorkerRequest::Background {
728 region_id: self.file_meta.region_id,
729 notify: BackgroundNotify::IndexBuildFinished(index_build_finished),
730 }
731 }
732 Err(e) => {
733 let err = Arc::new(e);
734 WorkerRequest::Background {
735 region_id: self.file_meta.region_id,
736 notify: BackgroundNotify::IndexBuildFailed(IndexBuildFailed { err }),
737 }
738 }
739 };
740
741 let _ = self
742 .request_sender
743 .send(WorkerRequestWithTime::new(worker_request))
744 .await;
745 }
746 Ok(IndexBuildOutcome::Finished)
747 }
748
749 async fn maybe_upload_index_file(
750 &self,
751 output: IndexOutput,
752 index_file_id: FileId,
753 index_version: u64,
754 ) -> Result<()> {
755 if let Some(write_cache) = &self.write_cache {
756 let file_id = self.file_meta.file_id;
757 let region_id = self.file_meta.region_id;
758 let remote_store = self.access_layer.object_store();
759 let mut upload_tracker = UploadTracker::new(region_id);
760 let mut err = None;
761 let puffin_key =
762 IndexKey::new(region_id, index_file_id, FileType::Puffin(output.version));
763 let index_id = RegionIndexId::new(RegionFileId::new(region_id, file_id), index_version);
764 let puffin_path = RegionFilePathFactory::new(
765 self.access_layer.table_dir().to_string(),
766 self.access_layer.path_type(),
767 )
768 .build_index_file_path_with_version(index_id);
769 if let Err(e) = write_cache
770 .upload(puffin_key, &puffin_path, remote_store)
771 .await
772 {
773 err = Some(e);
774 }
775 upload_tracker.push_uploaded_file(puffin_path);
776 if let Some(err) = err {
777 upload_tracker
779 .clean(
780 &smallvec![SstInfo {
781 file_id,
782 index_metadata: output,
783 ..Default::default()
784 }],
785 &write_cache.file_cache(),
786 remote_store,
787 )
788 .await;
789 return Err(err);
790 }
791 } else {
792 debug!("write cache is not available, skip uploading index file");
793 }
794 Ok(())
795 }
796
797 async fn update_manifest(
798 &mut self,
799 output: IndexOutput,
800 new_index_version: u64,
801 ) -> Result<RegionEdit> {
802 self.file_meta.available_indexes = output.build_available_indexes();
803 self.file_meta.indexes = output.build_indexes();
804 self.file_meta.index_file_size = output.file_size;
805 self.file_meta.index_version = new_index_version;
806 let edit = RegionEdit {
807 files_to_add: vec![self.file_meta.clone()],
808 files_to_remove: vec![],
809 timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
810 flushed_sequence: None,
811 flushed_entry_id: None,
812 committed_sequence: None,
813 compaction_time_window: None,
814 };
815 let version = self
816 .manifest_ctx
817 .update_manifest(
818 RegionLeaderState::Writable,
819 RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())),
820 false,
821 )
822 .await?;
823 info!(
824 "Successfully update manifest version to {version}, region: {}, reason: {}",
825 self.file_meta.region_id,
826 self.reason.as_str()
827 );
828 Ok(edit)
829 }
830}
831
832impl PartialEq for IndexBuildTask {
833 fn eq(&self, other: &Self) -> bool {
834 self.reason.priority() == other.reason.priority()
835 }
836}
837
838impl Eq for IndexBuildTask {}
839
840impl PartialOrd for IndexBuildTask {
841 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
842 Some(self.cmp(other))
843 }
844}
845
846impl Ord for IndexBuildTask {
847 fn cmp(&self, other: &Self) -> Ordering {
848 self.reason.priority().cmp(&other.reason.priority())
849 }
850}
851
852pub struct IndexBuildStatus {
854 pub region_id: RegionId,
855 pub building_files: HashSet<FileId>,
856 pub pending_tasks: BinaryHeap<IndexBuildTask>,
857}
858
859impl IndexBuildStatus {
860 pub fn new(region_id: RegionId) -> Self {
861 IndexBuildStatus {
862 region_id,
863 building_files: HashSet::new(),
864 pending_tasks: BinaryHeap::new(),
865 }
866 }
867
868 async fn on_failure(self, err: Arc<Error>) {
869 for task in self.pending_tasks {
870 task.on_failure(err.clone()).await;
871 }
872 }
873}
874
875pub struct IndexBuildScheduler {
876 scheduler: SchedulerRef,
878 region_status: HashMap<RegionId, IndexBuildStatus>,
880 files_limit: usize,
882}
883
884impl IndexBuildScheduler {
886 pub fn new(scheduler: SchedulerRef, files_limit: usize) -> Self {
887 IndexBuildScheduler {
888 scheduler,
889 region_status: HashMap::new(),
890 files_limit,
891 }
892 }
893
894 pub(crate) async fn schedule_build(
895 &mut self,
896 version_control: &VersionControlRef,
897 task: IndexBuildTask,
898 ) -> Result<()> {
899 let status = self
900 .region_status
901 .entry(task.file_meta.region_id)
902 .or_insert_with(|| IndexBuildStatus::new(task.file_meta.region_id));
903
904 if status.building_files.contains(&task.file_meta.file_id) {
905 let region_file_id =
906 RegionFileId::new(task.file_meta.region_id, task.file_meta.file_id);
907 debug!(
908 "Aborting index build task since index is already being built for region file {:?}",
909 region_file_id
910 );
911 task.on_success(IndexBuildOutcome::Aborted(format!(
912 "Index is already being built for region file {:?}",
913 region_file_id
914 )))
915 .await;
916 task.listener.on_index_build_abort(region_file_id).await;
917 return Ok(());
918 }
919
920 status.pending_tasks.push(task);
921
922 self.schedule_next_build_batch(version_control);
923 Ok(())
924 }
925
926 fn schedule_next_build_batch(&mut self, version_control: &VersionControlRef) {
928 let mut building_count = 0;
929 for status in self.region_status.values() {
930 building_count += status.building_files.len();
931 }
932
933 while building_count < self.files_limit {
934 if let Some(task) = self.find_next_task() {
935 let region_id = task.file_meta.region_id;
936 let file_id = task.file_meta.file_id;
937 let job = task.into_index_build_job(version_control.clone());
938 if self.scheduler.schedule(job).is_ok() {
939 if let Some(status) = self.region_status.get_mut(®ion_id) {
940 status.building_files.insert(file_id);
941 building_count += 1;
942 status
943 .pending_tasks
944 .retain(|t| t.file_meta.file_id != file_id);
945 } else {
946 error!(
947 "Region status not found when scheduling index build task, region: {}",
948 region_id
949 );
950 }
951 } else {
952 error!(
953 "Failed to schedule index build job, region: {}, file_id: {}",
954 region_id, file_id
955 );
956 }
957 } else {
958 break;
960 }
961 }
962 }
963
964 fn find_next_task(&self) -> Option<IndexBuildTask> {
966 self.region_status
967 .iter()
968 .filter_map(|(_, status)| status.pending_tasks.peek())
969 .max()
970 .cloned()
971 }
972
973 pub(crate) fn on_task_stopped(
974 &mut self,
975 region_id: RegionId,
976 file_id: FileId,
977 version_control: &VersionControlRef,
978 ) {
979 if let Some(status) = self.region_status.get_mut(®ion_id) {
980 status.building_files.remove(&file_id);
981 if status.building_files.is_empty() && status.pending_tasks.is_empty() {
982 self.region_status.remove(®ion_id);
984 }
985 }
986
987 self.schedule_next_build_batch(version_control);
988 }
989
990 pub(crate) async fn on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
991 error!(
992 err; "Index build scheduler encountered failure for region {}, removing all pending tasks.",
993 region_id
994 );
995 let Some(status) = self.region_status.remove(®ion_id) else {
996 return;
997 };
998 status.on_failure(err).await;
999 }
1000
1001 pub(crate) async fn on_region_dropped(&mut self, region_id: RegionId) {
1003 self.remove_region_on_failure(
1004 region_id,
1005 Arc::new(RegionDroppedSnafu { region_id }.build()),
1006 )
1007 .await;
1008 }
1009
1010 pub(crate) async fn on_region_closed(&mut self, region_id: RegionId) {
1012 self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()))
1013 .await;
1014 }
1015
1016 pub(crate) async fn on_region_truncated(&mut self, region_id: RegionId) {
1018 self.remove_region_on_failure(
1019 region_id,
1020 Arc::new(RegionTruncatedSnafu { region_id }.build()),
1021 )
1022 .await;
1023 }
1024
1025 async fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
1026 let Some(status) = self.region_status.remove(®ion_id) else {
1027 return;
1028 };
1029 status.on_failure(err).await;
1030 }
1031}
1032
1033pub(crate) fn decode_primary_keys_with_counts(
1036 batch: &RecordBatch,
1037 codec: &IndexValuesCodec,
1038) -> Result<Vec<(CompositeValues, usize)>> {
1039 let primary_key_index = primary_key_column_index(batch.num_columns());
1040 let pk_dict_array = batch
1041 .column(primary_key_index)
1042 .as_any()
1043 .downcast_ref::<PrimaryKeyArray>()
1044 .context(InvalidRecordBatchSnafu {
1045 reason: "Primary key column is not a dictionary array",
1046 })?;
1047 let pk_values_array = pk_dict_array
1048 .values()
1049 .as_any()
1050 .downcast_ref::<BinaryArray>()
1051 .context(InvalidRecordBatchSnafu {
1052 reason: "Primary key values are not binary array",
1053 })?;
1054 let keys = pk_dict_array.keys();
1055
1056 let mut result: Vec<(CompositeValues, usize)> = Vec::new();
1058 let mut prev_key: Option<u32> = None;
1059
1060 for i in 0..keys.len() {
1061 let current_key = keys.value(i);
1062
1063 if let Some(prev) = prev_key
1065 && prev == current_key
1066 {
1067 result.last_mut().unwrap().1 += 1;
1069 continue;
1070 }
1071
1072 let pk_bytes = pk_values_array.value(current_key as usize);
1074 let decoded_value = codec.decoder().decode(pk_bytes).context(DecodeSnafu)?;
1075
1076 result.push((decoded_value, 1));
1077 prev_key = Some(current_key);
1078 }
1079
1080 Ok(result)
1081}
1082
1083#[cfg(test)]
1084mod tests {
1085 use std::sync::Arc;
1086
1087 use api::v1::SemanticType;
1088 use common_base::readable_size::ReadableSize;
1089 use datafusion_common::HashMap;
1090 use datatypes::data_type::ConcreteDataType;
1091 use datatypes::schema::{
1092 ColumnSchema, FulltextOptions, SkippingIndexOptions, SkippingIndexType,
1093 };
1094 use object_store::ObjectStore;
1095 use object_store::services::Memory;
1096 use puffin_manager::PuffinManagerFactory;
1097 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1098 use tokio::sync::mpsc;
1099
1100 use super::*;
1101 use crate::access_layer::{FilePathProvider, Metrics, SstWriteRequest, WriteType};
1102 use crate::cache::write_cache::WriteCache;
1103 use crate::config::{FulltextIndexConfig, IndexBuildMode, MitoConfig, Mode};
1104 use crate::memtable::time_partition::TimePartitions;
1105 use crate::region::version::{VersionBuilder, VersionControl};
1106 use crate::sst::file::RegionFileId;
1107 use crate::sst::file_purger::NoopFilePurger;
1108 use crate::sst::location;
1109 use crate::sst::parquet::WriteOptions;
1110 use crate::test_util::memtable_util::EmptyMemtableBuilder;
1111 use crate::test_util::scheduler_util::SchedulerEnv;
1112 use crate::test_util::sst_util::{new_batch_by_range, new_source, sst_region_metadata};
1113
1114 struct MetaConfig {
1115 with_inverted: bool,
1116 with_fulltext: bool,
1117 with_skipping_bloom: bool,
1118 }
1119
1120 fn mock_region_metadata(
1121 MetaConfig {
1122 with_inverted,
1123 with_fulltext,
1124 with_skipping_bloom,
1125 }: MetaConfig,
1126 ) -> RegionMetadataRef {
1127 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
1128 let mut column_schema = ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false);
1129 if with_inverted {
1130 column_schema = column_schema.with_inverted_index(true);
1131 }
1132 builder
1133 .push_column_metadata(ColumnMetadata {
1134 column_schema,
1135 semantic_type: SemanticType::Field,
1136 column_id: 1,
1137 })
1138 .push_column_metadata(ColumnMetadata {
1139 column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
1140 semantic_type: SemanticType::Field,
1141 column_id: 2,
1142 })
1143 .push_column_metadata(ColumnMetadata {
1144 column_schema: ColumnSchema::new(
1145 "c",
1146 ConcreteDataType::timestamp_millisecond_datatype(),
1147 false,
1148 ),
1149 semantic_type: SemanticType::Timestamp,
1150 column_id: 3,
1151 });
1152
1153 if with_fulltext {
1154 let column_schema =
1155 ColumnSchema::new("text", ConcreteDataType::string_datatype(), true)
1156 .with_fulltext_options(FulltextOptions {
1157 enable: true,
1158 ..Default::default()
1159 })
1160 .unwrap();
1161
1162 let column = ColumnMetadata {
1163 column_schema,
1164 semantic_type: SemanticType::Field,
1165 column_id: 4,
1166 };
1167
1168 builder.push_column_metadata(column);
1169 }
1170
1171 if with_skipping_bloom {
1172 let column_schema =
1173 ColumnSchema::new("bloom", ConcreteDataType::string_datatype(), false)
1174 .with_skipping_options(SkippingIndexOptions::new_unchecked(
1175 42,
1176 0.01,
1177 SkippingIndexType::BloomFilter,
1178 ))
1179 .unwrap();
1180
1181 let column = ColumnMetadata {
1182 column_schema,
1183 semantic_type: SemanticType::Field,
1184 column_id: 5,
1185 };
1186
1187 builder.push_column_metadata(column);
1188 }
1189
1190 Arc::new(builder.build().unwrap())
1191 }
1192
1193 fn mock_object_store() -> ObjectStore {
1194 ObjectStore::new(Memory::default()).unwrap().finish()
1195 }
1196
1197 async fn mock_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
1198 IntermediateManager::init_fs(path).await.unwrap()
1199 }
1200 struct NoopPathProvider;
1201
1202 impl FilePathProvider for NoopPathProvider {
1203 fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
1204 unreachable!()
1205 }
1206
1207 fn build_index_file_path_with_version(&self, _index_id: RegionIndexId) -> String {
1208 unreachable!()
1209 }
1210
1211 fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
1212 unreachable!()
1213 }
1214 }
1215
1216 async fn mock_sst_file(
1217 metadata: RegionMetadataRef,
1218 env: &SchedulerEnv,
1219 build_mode: IndexBuildMode,
1220 ) -> SstInfo {
1221 let source = new_source(&[
1222 new_batch_by_range(&["a", "d"], 0, 60),
1223 new_batch_by_range(&["b", "f"], 0, 40),
1224 new_batch_by_range(&["b", "h"], 100, 200),
1225 ]);
1226 let mut index_config = MitoConfig::default().index;
1227 index_config.build_mode = build_mode;
1228 let write_request = SstWriteRequest {
1229 op_type: OperationType::Flush,
1230 metadata: metadata.clone(),
1231 source: either::Left(source),
1232 storage: None,
1233 max_sequence: None,
1234 cache_manager: Default::default(),
1235 index_options: IndexOptions::default(),
1236 index_config,
1237 inverted_index_config: Default::default(),
1238 fulltext_index_config: Default::default(),
1239 bloom_filter_index_config: Default::default(),
1240 };
1241 let mut metrics = Metrics::new(WriteType::Flush);
1242 env.access_layer
1243 .write_sst(write_request, &WriteOptions::default(), &mut metrics)
1244 .await
1245 .unwrap()
1246 .remove(0)
1247 }
1248
1249 async fn mock_version_control(
1250 metadata: RegionMetadataRef,
1251 file_purger: FilePurgerRef,
1252 files: HashMap<FileId, FileMeta>,
1253 ) -> VersionControlRef {
1254 let mutable = Arc::new(TimePartitions::new(
1255 metadata.clone(),
1256 Arc::new(EmptyMemtableBuilder::default()),
1257 0,
1258 None,
1259 ));
1260 let version_builder = VersionBuilder::new(metadata, mutable)
1261 .add_files(file_purger, files.values().cloned())
1262 .build();
1263 Arc::new(VersionControl::new(version_builder))
1264 }
1265
1266 async fn mock_indexer_builder(
1267 metadata: RegionMetadataRef,
1268 env: &SchedulerEnv,
1269 ) -> Arc<dyn IndexerBuilder + Send + Sync> {
1270 let (dir, factory) = PuffinManagerFactory::new_for_test_async("mock_indexer_builder").await;
1271 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1272 let puffin_manager = factory.build(
1273 env.access_layer.object_store().clone(),
1274 RegionFilePathFactory::new(
1275 env.access_layer.table_dir().to_string(),
1276 env.access_layer.path_type(),
1277 ),
1278 );
1279 Arc::new(IndexerBuilderImpl {
1280 build_type: IndexBuildType::Flush,
1281 metadata,
1282 row_group_size: 1024,
1283 puffin_manager,
1284 write_cache_enabled: false,
1285 intermediate_manager: intm_manager,
1286 index_options: IndexOptions::default(),
1287 inverted_index_config: InvertedIndexConfig::default(),
1288 fulltext_index_config: FulltextIndexConfig::default(),
1289 bloom_filter_index_config: BloomFilterConfig::default(),
1290 })
1291 }
1292
1293 #[tokio::test]
1294 async fn test_build_indexer_basic() {
1295 let (dir, factory) =
1296 PuffinManagerFactory::new_for_test_async("test_build_indexer_basic_").await;
1297 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1298
1299 let metadata = mock_region_metadata(MetaConfig {
1300 with_inverted: true,
1301 with_fulltext: true,
1302 with_skipping_bloom: true,
1303 });
1304 let indexer = IndexerBuilderImpl {
1305 build_type: IndexBuildType::Flush,
1306 metadata,
1307 row_group_size: 1024,
1308 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1309 write_cache_enabled: false,
1310 intermediate_manager: intm_manager,
1311 index_options: IndexOptions::default(),
1312 inverted_index_config: InvertedIndexConfig::default(),
1313 fulltext_index_config: FulltextIndexConfig::default(),
1314 bloom_filter_index_config: BloomFilterConfig::default(),
1315 }
1316 .build(FileId::random(), 0)
1317 .await;
1318
1319 assert!(indexer.inverted_indexer.is_some());
1320 assert!(indexer.fulltext_indexer.is_some());
1321 assert!(indexer.bloom_filter_indexer.is_some());
1322 }
1323
1324 #[tokio::test]
1325 async fn test_build_indexer_disable_create() {
1326 let (dir, factory) =
1327 PuffinManagerFactory::new_for_test_async("test_build_indexer_disable_create_").await;
1328 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1329
1330 let metadata = mock_region_metadata(MetaConfig {
1331 with_inverted: true,
1332 with_fulltext: true,
1333 with_skipping_bloom: true,
1334 });
1335 let indexer = IndexerBuilderImpl {
1336 build_type: IndexBuildType::Flush,
1337 metadata: metadata.clone(),
1338 row_group_size: 1024,
1339 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1340 write_cache_enabled: false,
1341 intermediate_manager: intm_manager.clone(),
1342 index_options: IndexOptions::default(),
1343 inverted_index_config: InvertedIndexConfig {
1344 create_on_flush: Mode::Disable,
1345 ..Default::default()
1346 },
1347 fulltext_index_config: FulltextIndexConfig::default(),
1348 bloom_filter_index_config: BloomFilterConfig::default(),
1349 }
1350 .build(FileId::random(), 0)
1351 .await;
1352
1353 assert!(indexer.inverted_indexer.is_none());
1354 assert!(indexer.fulltext_indexer.is_some());
1355 assert!(indexer.bloom_filter_indexer.is_some());
1356
1357 let indexer = IndexerBuilderImpl {
1358 build_type: IndexBuildType::Compact,
1359 metadata: metadata.clone(),
1360 row_group_size: 1024,
1361 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1362 write_cache_enabled: false,
1363 intermediate_manager: intm_manager.clone(),
1364 index_options: IndexOptions::default(),
1365 inverted_index_config: InvertedIndexConfig::default(),
1366 fulltext_index_config: FulltextIndexConfig {
1367 create_on_compaction: Mode::Disable,
1368 ..Default::default()
1369 },
1370 bloom_filter_index_config: BloomFilterConfig::default(),
1371 }
1372 .build(FileId::random(), 0)
1373 .await;
1374
1375 assert!(indexer.inverted_indexer.is_some());
1376 assert!(indexer.fulltext_indexer.is_none());
1377 assert!(indexer.bloom_filter_indexer.is_some());
1378
1379 let indexer = IndexerBuilderImpl {
1380 build_type: IndexBuildType::Compact,
1381 metadata,
1382 row_group_size: 1024,
1383 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1384 write_cache_enabled: false,
1385 intermediate_manager: intm_manager,
1386 index_options: IndexOptions::default(),
1387 inverted_index_config: InvertedIndexConfig::default(),
1388 fulltext_index_config: FulltextIndexConfig::default(),
1389 bloom_filter_index_config: BloomFilterConfig {
1390 create_on_compaction: Mode::Disable,
1391 ..Default::default()
1392 },
1393 }
1394 .build(FileId::random(), 0)
1395 .await;
1396
1397 assert!(indexer.inverted_indexer.is_some());
1398 assert!(indexer.fulltext_indexer.is_some());
1399 assert!(indexer.bloom_filter_indexer.is_none());
1400 }
1401
1402 #[tokio::test]
1403 async fn test_build_indexer_no_required() {
1404 let (dir, factory) =
1405 PuffinManagerFactory::new_for_test_async("test_build_indexer_no_required_").await;
1406 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1407
1408 let metadata = mock_region_metadata(MetaConfig {
1409 with_inverted: false,
1410 with_fulltext: true,
1411 with_skipping_bloom: true,
1412 });
1413 let indexer = IndexerBuilderImpl {
1414 build_type: IndexBuildType::Flush,
1415 metadata: metadata.clone(),
1416 row_group_size: 1024,
1417 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1418 write_cache_enabled: false,
1419 intermediate_manager: intm_manager.clone(),
1420 index_options: IndexOptions::default(),
1421 inverted_index_config: InvertedIndexConfig::default(),
1422 fulltext_index_config: FulltextIndexConfig::default(),
1423 bloom_filter_index_config: BloomFilterConfig::default(),
1424 }
1425 .build(FileId::random(), 0)
1426 .await;
1427
1428 assert!(indexer.inverted_indexer.is_none());
1429 assert!(indexer.fulltext_indexer.is_some());
1430 assert!(indexer.bloom_filter_indexer.is_some());
1431
1432 let metadata = mock_region_metadata(MetaConfig {
1433 with_inverted: true,
1434 with_fulltext: false,
1435 with_skipping_bloom: true,
1436 });
1437 let indexer = IndexerBuilderImpl {
1438 build_type: IndexBuildType::Flush,
1439 metadata: metadata.clone(),
1440 row_group_size: 1024,
1441 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1442 write_cache_enabled: false,
1443 intermediate_manager: intm_manager.clone(),
1444 index_options: IndexOptions::default(),
1445 inverted_index_config: InvertedIndexConfig::default(),
1446 fulltext_index_config: FulltextIndexConfig::default(),
1447 bloom_filter_index_config: BloomFilterConfig::default(),
1448 }
1449 .build(FileId::random(), 0)
1450 .await;
1451
1452 assert!(indexer.inverted_indexer.is_some());
1453 assert!(indexer.fulltext_indexer.is_none());
1454 assert!(indexer.bloom_filter_indexer.is_some());
1455
1456 let metadata = mock_region_metadata(MetaConfig {
1457 with_inverted: true,
1458 with_fulltext: true,
1459 with_skipping_bloom: false,
1460 });
1461 let indexer = IndexerBuilderImpl {
1462 build_type: IndexBuildType::Flush,
1463 metadata: metadata.clone(),
1464 row_group_size: 1024,
1465 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1466 write_cache_enabled: false,
1467 intermediate_manager: intm_manager,
1468 index_options: IndexOptions::default(),
1469 inverted_index_config: InvertedIndexConfig::default(),
1470 fulltext_index_config: FulltextIndexConfig::default(),
1471 bloom_filter_index_config: BloomFilterConfig::default(),
1472 }
1473 .build(FileId::random(), 0)
1474 .await;
1475
1476 assert!(indexer.inverted_indexer.is_some());
1477 assert!(indexer.fulltext_indexer.is_some());
1478 assert!(indexer.bloom_filter_indexer.is_none());
1479 }
1480
1481 #[tokio::test]
1482 async fn test_build_indexer_zero_row_group() {
1483 let (dir, factory) =
1484 PuffinManagerFactory::new_for_test_async("test_build_indexer_zero_row_group_").await;
1485 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1486
1487 let metadata = mock_region_metadata(MetaConfig {
1488 with_inverted: true,
1489 with_fulltext: true,
1490 with_skipping_bloom: true,
1491 });
1492 let indexer = IndexerBuilderImpl {
1493 build_type: IndexBuildType::Flush,
1494 metadata,
1495 row_group_size: 0,
1496 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1497 write_cache_enabled: false,
1498 intermediate_manager: intm_manager,
1499 index_options: IndexOptions::default(),
1500 inverted_index_config: InvertedIndexConfig::default(),
1501 fulltext_index_config: FulltextIndexConfig::default(),
1502 bloom_filter_index_config: BloomFilterConfig::default(),
1503 }
1504 .build(FileId::random(), 0)
1505 .await;
1506
1507 assert!(indexer.inverted_indexer.is_none());
1508 }
1509
1510 #[tokio::test]
1511 async fn test_index_build_task_sst_not_exist() {
1512 let env = SchedulerEnv::new().await;
1513 let (tx, _rx) = mpsc::channel(4);
1514 let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1515 let mut scheduler = env.mock_index_build_scheduler(4);
1516 let metadata = Arc::new(sst_region_metadata());
1517 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1518 let file_purger = Arc::new(NoopFilePurger {});
1519 let files = HashMap::new();
1520 let version_control =
1521 mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1522 let region_id = metadata.region_id;
1523 let indexer_builder = mock_indexer_builder(metadata, &env).await;
1524
1525 let file_meta = FileMeta {
1526 region_id,
1527 file_id: FileId::random(),
1528 file_size: 100,
1529 ..Default::default()
1530 };
1531
1532 let file = FileHandle::new(file_meta.clone(), file_purger.clone());
1533
1534 let task = IndexBuildTask {
1536 file,
1537 file_meta,
1538 reason: IndexBuildType::Flush,
1539 access_layer: env.access_layer.clone(),
1540 listener: WorkerListener::default(),
1541 manifest_ctx,
1542 write_cache: None,
1543 file_purger,
1544 indexer_builder,
1545 request_sender: tx,
1546 result_sender: result_tx,
1547 };
1548
1549 scheduler
1551 .schedule_build(&version_control, task)
1552 .await
1553 .unwrap();
1554 match result_rx.recv().await.unwrap() {
1555 Ok(outcome) => {
1556 if outcome == IndexBuildOutcome::Finished {
1557 panic!("Expect aborted result due to missing SST file")
1558 }
1559 }
1560 _ => panic!("Expect aborted result due to missing SST file"),
1561 }
1562 }
1563
1564 #[tokio::test]
1565 async fn test_index_build_task_sst_exist() {
1566 let env = SchedulerEnv::new().await;
1567 let mut scheduler = env.mock_index_build_scheduler(4);
1568 let metadata = Arc::new(sst_region_metadata());
1569 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1570 let region_id = metadata.region_id;
1571 let file_purger = Arc::new(NoopFilePurger {});
1572 let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
1573 let file_meta = FileMeta {
1574 region_id,
1575 file_id: sst_info.file_id,
1576 file_size: sst_info.file_size,
1577 max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
1578 index_file_size: sst_info.index_metadata.file_size,
1579 num_rows: sst_info.num_rows as u64,
1580 num_row_groups: sst_info.num_row_groups,
1581 ..Default::default()
1582 };
1583 let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1584 let version_control =
1585 mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1586 let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1587
1588 let file = FileHandle::new(file_meta.clone(), file_purger.clone());
1589
1590 let (tx, mut rx) = mpsc::channel(4);
1592 let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1593 let task = IndexBuildTask {
1594 file,
1595 file_meta: file_meta.clone(),
1596 reason: IndexBuildType::Flush,
1597 access_layer: env.access_layer.clone(),
1598 listener: WorkerListener::default(),
1599 manifest_ctx,
1600 write_cache: None,
1601 file_purger,
1602 indexer_builder,
1603 request_sender: tx,
1604 result_sender: result_tx,
1605 };
1606
1607 scheduler
1608 .schedule_build(&version_control, task)
1609 .await
1610 .unwrap();
1611
1612 match result_rx.recv().await.unwrap() {
1614 Ok(outcome) => {
1615 assert_eq!(outcome, IndexBuildOutcome::Finished);
1616 }
1617 _ => panic!("Expect finished result"),
1618 }
1619
1620 let worker_req = rx.recv().await.unwrap().request;
1622 match worker_req {
1623 WorkerRequest::Background {
1624 region_id: req_region_id,
1625 notify: BackgroundNotify::IndexBuildFinished(finished),
1626 } => {
1627 assert_eq!(req_region_id, region_id);
1628 assert_eq!(finished.edit.files_to_add.len(), 1);
1629 let updated_meta = &finished.edit.files_to_add[0];
1630
1631 assert!(!updated_meta.available_indexes.is_empty());
1633 assert!(updated_meta.index_file_size > 0);
1634 assert_eq!(updated_meta.file_id, file_meta.file_id);
1635 }
1636 _ => panic!("Unexpected worker request: {:?}", worker_req),
1637 }
1638 }
1639
1640 async fn schedule_index_build_task_with_mode(build_mode: IndexBuildMode) {
1641 let env = SchedulerEnv::new().await;
1642 let mut scheduler = env.mock_index_build_scheduler(4);
1643 let metadata = Arc::new(sst_region_metadata());
1644 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1645 let file_purger = Arc::new(NoopFilePurger {});
1646 let region_id = metadata.region_id;
1647 let sst_info = mock_sst_file(metadata.clone(), &env, build_mode.clone()).await;
1648 let file_meta = FileMeta {
1649 region_id,
1650 file_id: sst_info.file_id,
1651 file_size: sst_info.file_size,
1652 max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
1653 index_file_size: sst_info.index_metadata.file_size,
1654 num_rows: sst_info.num_rows as u64,
1655 num_row_groups: sst_info.num_row_groups,
1656 ..Default::default()
1657 };
1658 let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1659 let version_control =
1660 mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1661 let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1662
1663 let file = FileHandle::new(file_meta.clone(), file_purger.clone());
1664
1665 let (tx, _rx) = mpsc::channel(4);
1667 let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1668 let task = IndexBuildTask {
1669 file,
1670 file_meta: file_meta.clone(),
1671 reason: IndexBuildType::Flush,
1672 access_layer: env.access_layer.clone(),
1673 listener: WorkerListener::default(),
1674 manifest_ctx,
1675 write_cache: None,
1676 file_purger,
1677 indexer_builder,
1678 request_sender: tx,
1679 result_sender: result_tx,
1680 };
1681
1682 scheduler
1683 .schedule_build(&version_control, task)
1684 .await
1685 .unwrap();
1686
1687 let puffin_path = location::index_file_path(
1688 env.access_layer.table_dir(),
1689 RegionIndexId::new(RegionFileId::new(region_id, file_meta.file_id), 0),
1690 env.access_layer.path_type(),
1691 );
1692
1693 if build_mode == IndexBuildMode::Async {
1694 assert!(
1696 !env.access_layer
1697 .object_store()
1698 .exists(&puffin_path)
1699 .await
1700 .unwrap()
1701 );
1702 } else {
1703 assert!(
1705 env.access_layer
1706 .object_store()
1707 .exists(&puffin_path)
1708 .await
1709 .unwrap()
1710 );
1711 }
1712
1713 match result_rx.recv().await.unwrap() {
1715 Ok(outcome) => {
1716 assert_eq!(outcome, IndexBuildOutcome::Finished);
1717 }
1718 _ => panic!("Expect finished result"),
1719 }
1720
1721 assert!(
1723 env.access_layer
1724 .object_store()
1725 .exists(&puffin_path)
1726 .await
1727 .unwrap()
1728 );
1729 }
1730
1731 #[tokio::test]
1732 async fn test_index_build_task_build_mode() {
1733 schedule_index_build_task_with_mode(IndexBuildMode::Async).await;
1734 schedule_index_build_task_with_mode(IndexBuildMode::Sync).await;
1735 }
1736
1737 #[tokio::test]
1738 async fn test_index_build_task_no_index() {
1739 let env = SchedulerEnv::new().await;
1740 let mut scheduler = env.mock_index_build_scheduler(4);
1741 let mut metadata = sst_region_metadata();
1742 metadata.column_metadatas.iter_mut().for_each(|col| {
1744 col.column_schema.set_inverted_index(false);
1745 let _ = col.column_schema.unset_skipping_options();
1746 });
1747 let region_id = metadata.region_id;
1748 let metadata = Arc::new(metadata);
1749 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1750 let file_purger = Arc::new(NoopFilePurger {});
1751 let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
1752 let file_meta = FileMeta {
1753 region_id,
1754 file_id: sst_info.file_id,
1755 file_size: sst_info.file_size,
1756 max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
1757 index_file_size: sst_info.index_metadata.file_size,
1758 num_rows: sst_info.num_rows as u64,
1759 num_row_groups: sst_info.num_row_groups,
1760 ..Default::default()
1761 };
1762 let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1763 let version_control =
1764 mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1765 let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1766
1767 let file = FileHandle::new(file_meta.clone(), file_purger.clone());
1768
1769 let (tx, mut rx) = mpsc::channel(4);
1771 let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1772 let task = IndexBuildTask {
1773 file,
1774 file_meta: file_meta.clone(),
1775 reason: IndexBuildType::Flush,
1776 access_layer: env.access_layer.clone(),
1777 listener: WorkerListener::default(),
1778 manifest_ctx,
1779 write_cache: None,
1780 file_purger,
1781 indexer_builder,
1782 request_sender: tx,
1783 result_sender: result_tx,
1784 };
1785
1786 scheduler
1787 .schedule_build(&version_control, task)
1788 .await
1789 .unwrap();
1790
1791 match result_rx.recv().await.unwrap() {
1793 Ok(outcome) => {
1794 assert_eq!(outcome, IndexBuildOutcome::Finished);
1795 }
1796 _ => panic!("Expect finished result"),
1797 }
1798
1799 let _ = rx.recv().await.is_none();
1801 }
1802
1803 #[tokio::test]
1804 async fn test_index_build_task_with_write_cache() {
1805 let env = SchedulerEnv::new().await;
1806 let mut scheduler = env.mock_index_build_scheduler(4);
1807 let metadata = Arc::new(sst_region_metadata());
1808 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1809 let file_purger = Arc::new(NoopFilePurger {});
1810 let region_id = metadata.region_id;
1811
1812 let (dir, factory) = PuffinManagerFactory::new_for_test_async("test_write_cache").await;
1813 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1814
1815 let write_cache = Arc::new(
1817 WriteCache::new_fs(
1818 dir.path().to_str().unwrap(),
1819 ReadableSize::mb(10),
1820 None,
1821 None,
1822 true, factory,
1824 intm_manager,
1825 ReadableSize::mb(10),
1826 )
1827 .await
1828 .unwrap(),
1829 );
1830 let indexer_builder = Arc::new(IndexerBuilderImpl {
1832 build_type: IndexBuildType::Flush,
1833 metadata: metadata.clone(),
1834 row_group_size: 1024,
1835 puffin_manager: write_cache.build_puffin_manager().clone(),
1836 write_cache_enabled: true,
1837 intermediate_manager: write_cache.intermediate_manager().clone(),
1838 index_options: IndexOptions::default(),
1839 inverted_index_config: InvertedIndexConfig::default(),
1840 fulltext_index_config: FulltextIndexConfig::default(),
1841 bloom_filter_index_config: BloomFilterConfig::default(),
1842 });
1843
1844 let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
1845 let file_meta = FileMeta {
1846 region_id,
1847 file_id: sst_info.file_id,
1848 file_size: sst_info.file_size,
1849 index_file_size: sst_info.index_metadata.file_size,
1850 num_rows: sst_info.num_rows as u64,
1851 num_row_groups: sst_info.num_row_groups,
1852 ..Default::default()
1853 };
1854 let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1855 let version_control =
1856 mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1857
1858 let file = FileHandle::new(file_meta.clone(), file_purger.clone());
1859
1860 let (tx, mut _rx) = mpsc::channel(4);
1862 let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1863 let task = IndexBuildTask {
1864 file,
1865 file_meta: file_meta.clone(),
1866 reason: IndexBuildType::Flush,
1867 access_layer: env.access_layer.clone(),
1868 listener: WorkerListener::default(),
1869 manifest_ctx,
1870 write_cache: Some(write_cache.clone()),
1871 file_purger,
1872 indexer_builder,
1873 request_sender: tx,
1874 result_sender: result_tx,
1875 };
1876
1877 scheduler
1878 .schedule_build(&version_control, task)
1879 .await
1880 .unwrap();
1881
1882 match result_rx.recv().await.unwrap() {
1884 Ok(outcome) => {
1885 assert_eq!(outcome, IndexBuildOutcome::Finished);
1886 }
1887 _ => panic!("Expect finished result"),
1888 }
1889
1890 let index_key = IndexKey::new(
1892 region_id,
1893 file_meta.file_id,
1894 FileType::Puffin(sst_info.index_metadata.version),
1895 );
1896 assert!(write_cache.file_cache().contains_key(&index_key));
1897 }
1898
1899 async fn create_mock_task_for_schedule(
1900 env: &SchedulerEnv,
1901 file_id: FileId,
1902 region_id: RegionId,
1903 reason: IndexBuildType,
1904 ) -> IndexBuildTask {
1905 let metadata = Arc::new(sst_region_metadata());
1906 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1907 let file_purger = Arc::new(NoopFilePurger {});
1908 let indexer_builder = mock_indexer_builder(metadata, env).await;
1909 let (tx, _rx) = mpsc::channel(4);
1910 let (result_tx, _result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1911
1912 let file_meta = FileMeta {
1913 region_id,
1914 file_id,
1915 file_size: 100,
1916 ..Default::default()
1917 };
1918
1919 let file = FileHandle::new(file_meta.clone(), file_purger.clone());
1920
1921 IndexBuildTask {
1922 file,
1923 file_meta,
1924 reason,
1925 access_layer: env.access_layer.clone(),
1926 listener: WorkerListener::default(),
1927 manifest_ctx,
1928 write_cache: None,
1929 file_purger,
1930 indexer_builder,
1931 request_sender: tx,
1932 result_sender: result_tx,
1933 }
1934 }
1935
1936 #[tokio::test]
1937 async fn test_scheduler_comprehensive() {
1938 let env = SchedulerEnv::new().await;
1939 let mut scheduler = env.mock_index_build_scheduler(2);
1940 let metadata = Arc::new(sst_region_metadata());
1941 let region_id = metadata.region_id;
1942 let file_purger = Arc::new(NoopFilePurger {});
1943
1944 let file_id1 = FileId::random();
1946 let file_id2 = FileId::random();
1947 let file_id3 = FileId::random();
1948 let file_id4 = FileId::random();
1949 let file_id5 = FileId::random();
1950
1951 let mut files = HashMap::new();
1952 for file_id in [file_id1, file_id2, file_id3, file_id4, file_id5] {
1953 files.insert(
1954 file_id,
1955 FileMeta {
1956 region_id,
1957 file_id,
1958 file_size: 100,
1959 ..Default::default()
1960 },
1961 );
1962 }
1963
1964 let version_control = mock_version_control(metadata, file_purger, files).await;
1965
1966 let task1 =
1968 create_mock_task_for_schedule(&env, file_id1, region_id, IndexBuildType::Flush).await;
1969 assert!(
1970 scheduler
1971 .schedule_build(&version_control, task1)
1972 .await
1973 .is_ok()
1974 );
1975 assert!(scheduler.region_status.contains_key(®ion_id));
1976 let status = scheduler.region_status.get(®ion_id).unwrap();
1977 assert_eq!(status.building_files.len(), 1);
1978 assert!(status.building_files.contains(&file_id1));
1979
1980 let task1_dup =
1982 create_mock_task_for_schedule(&env, file_id1, region_id, IndexBuildType::Flush).await;
1983 scheduler
1984 .schedule_build(&version_control, task1_dup)
1985 .await
1986 .unwrap();
1987 let status = scheduler.region_status.get(®ion_id).unwrap();
1988 assert_eq!(status.building_files.len(), 1); let task2 =
1992 create_mock_task_for_schedule(&env, file_id2, region_id, IndexBuildType::Flush).await;
1993 scheduler
1994 .schedule_build(&version_control, task2)
1995 .await
1996 .unwrap();
1997 let status = scheduler.region_status.get(®ion_id).unwrap();
1998 assert_eq!(status.building_files.len(), 2); assert_eq!(status.pending_tasks.len(), 0);
2000
2001 let task3 =
2004 create_mock_task_for_schedule(&env, file_id3, region_id, IndexBuildType::Compact).await;
2005 let task4 =
2006 create_mock_task_for_schedule(&env, file_id4, region_id, IndexBuildType::SchemaChange)
2007 .await;
2008 let task5 =
2009 create_mock_task_for_schedule(&env, file_id5, region_id, IndexBuildType::Manual).await;
2010
2011 scheduler
2012 .schedule_build(&version_control, task3)
2013 .await
2014 .unwrap();
2015 scheduler
2016 .schedule_build(&version_control, task4)
2017 .await
2018 .unwrap();
2019 scheduler
2020 .schedule_build(&version_control, task5)
2021 .await
2022 .unwrap();
2023
2024 let status = scheduler.region_status.get(®ion_id).unwrap();
2025 assert_eq!(status.building_files.len(), 2); assert_eq!(status.pending_tasks.len(), 3); scheduler.on_task_stopped(region_id, file_id1, &version_control);
2030 let status = scheduler.region_status.get(®ion_id).unwrap();
2031 assert!(!status.building_files.contains(&file_id1));
2032 assert_eq!(status.building_files.len(), 2); assert_eq!(status.pending_tasks.len(), 2); assert!(status.building_files.contains(&file_id5));
2036
2037 scheduler.on_task_stopped(region_id, file_id2, &version_control);
2039 let status = scheduler.region_status.get(®ion_id).unwrap();
2040 assert_eq!(status.building_files.len(), 2);
2041 assert_eq!(status.pending_tasks.len(), 1); assert!(status.building_files.contains(&file_id4)); scheduler.on_task_stopped(region_id, file_id5, &version_control);
2046 scheduler.on_task_stopped(region_id, file_id4, &version_control);
2047
2048 let status = scheduler.region_status.get(®ion_id).unwrap();
2049 assert_eq!(status.building_files.len(), 1); assert_eq!(status.pending_tasks.len(), 0);
2051 assert!(status.building_files.contains(&file_id3));
2052
2053 scheduler.on_task_stopped(region_id, file_id3, &version_control);
2054
2055 assert!(!scheduler.region_status.contains_key(®ion_id));
2057
2058 let task6 =
2060 create_mock_task_for_schedule(&env, file_id1, region_id, IndexBuildType::Flush).await;
2061 let task7 =
2062 create_mock_task_for_schedule(&env, file_id2, region_id, IndexBuildType::Flush).await;
2063 let task8 =
2064 create_mock_task_for_schedule(&env, file_id3, region_id, IndexBuildType::Manual).await;
2065
2066 scheduler
2067 .schedule_build(&version_control, task6)
2068 .await
2069 .unwrap();
2070 scheduler
2071 .schedule_build(&version_control, task7)
2072 .await
2073 .unwrap();
2074 scheduler
2075 .schedule_build(&version_control, task8)
2076 .await
2077 .unwrap();
2078
2079 assert!(scheduler.region_status.contains_key(®ion_id));
2080 let status = scheduler.region_status.get(®ion_id).unwrap();
2081 assert_eq!(status.building_files.len(), 2);
2082 assert_eq!(status.pending_tasks.len(), 1);
2083
2084 scheduler.on_region_dropped(region_id).await;
2085 assert!(!scheduler.region_status.contains_key(®ion_id));
2086 }
2087}