1pub(crate) mod bloom_filter;
16pub(crate) mod fulltext_index;
17mod indexer;
18pub mod intermediate;
19pub(crate) mod inverted_index;
20pub mod puffin_manager;
21mod statistics;
22pub(crate) mod store;
23
24use std::cmp::Ordering;
25use std::collections::{BinaryHeap, HashMap, HashSet};
26use std::num::NonZeroUsize;
27use std::sync::Arc;
28
29use bloom_filter::creator::BloomFilterIndexer;
30use common_telemetry::{debug, error, info, warn};
31use datatypes::arrow::array::BinaryArray;
32use datatypes::arrow::record_batch::RecordBatch;
33use mito_codec::index::IndexValuesCodec;
34use mito_codec::row_converter::CompositeValues;
35use puffin_manager::SstPuffinManager;
36use smallvec::{SmallVec, smallvec};
37use snafu::{OptionExt, ResultExt};
38use statistics::{ByteCount, RowCount};
39use store_api::metadata::RegionMetadataRef;
40use store_api::storage::{ColumnId, FileId, RegionId};
41use strum::IntoStaticStr;
42use tokio::sync::mpsc::Sender;
43
44use crate::access_layer::{AccessLayerRef, FilePathProvider, OperationType, RegionFilePathFactory};
45use crate::cache::file_cache::{FileType, IndexKey};
46use crate::cache::write_cache::{UploadTracker, WriteCacheRef};
47use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
48use crate::error::{
49 BuildIndexAsyncSnafu, DecodeSnafu, Error, InvalidRecordBatchSnafu, RegionClosedSnafu,
50 RegionDroppedSnafu, RegionTruncatedSnafu, Result,
51};
52use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
53use crate::metrics::INDEX_CREATE_MEMORY_USAGE;
54use crate::read::{Batch, BatchReader};
55use crate::region::options::IndexOptions;
56use crate::region::version::VersionControlRef;
57use crate::region::{ManifestContextRef, RegionLeaderState};
58use crate::request::{
59 BackgroundNotify, IndexBuildFailed, IndexBuildFinished, IndexBuildStopped, WorkerRequest,
60 WorkerRequestWithTime,
61};
62use crate::schedule::scheduler::{Job, SchedulerRef};
63use crate::sst::file::{
64 ColumnIndexMetadata, FileHandle, FileMeta, IndexType, IndexTypes, RegionFileId, RegionIndexId,
65};
66use crate::sst::file_purger::FilePurgerRef;
67use crate::sst::index::fulltext_index::creator::FulltextIndexer;
68use crate::sst::index::intermediate::IntermediateManager;
69use crate::sst::index::inverted_index::creator::InvertedIndexer;
70use crate::sst::parquet::SstInfo;
71use crate::sst::parquet::flat_format::primary_key_column_index;
72use crate::sst::parquet::format::PrimaryKeyArray;
73use crate::worker::WorkerListener;
74
75pub(crate) const TYPE_INVERTED_INDEX: &str = "inverted_index";
76pub(crate) const TYPE_FULLTEXT_INDEX: &str = "fulltext_index";
77pub(crate) const TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
78
79#[derive(Debug, Clone, Default)]
81pub struct IndexOutput {
82 pub file_size: u64,
84 pub version: u64,
86 pub inverted_index: InvertedIndexOutput,
88 pub fulltext_index: FulltextIndexOutput,
90 pub bloom_filter: BloomFilterOutput,
92}
93
94impl IndexOutput {
95 pub fn build_available_indexes(&self) -> SmallVec<[IndexType; 4]> {
96 let mut indexes = SmallVec::new();
97 if self.inverted_index.is_available() {
98 indexes.push(IndexType::InvertedIndex);
99 }
100 if self.fulltext_index.is_available() {
101 indexes.push(IndexType::FulltextIndex);
102 }
103 if self.bloom_filter.is_available() {
104 indexes.push(IndexType::BloomFilterIndex);
105 }
106 indexes
107 }
108
109 pub fn build_indexes(&self) -> Vec<ColumnIndexMetadata> {
110 let mut map: HashMap<ColumnId, IndexTypes> = HashMap::new();
111
112 if self.inverted_index.is_available() {
113 for &col in &self.inverted_index.columns {
114 map.entry(col).or_default().push(IndexType::InvertedIndex);
115 }
116 }
117 if self.fulltext_index.is_available() {
118 for &col in &self.fulltext_index.columns {
119 map.entry(col).or_default().push(IndexType::FulltextIndex);
120 }
121 }
122 if self.bloom_filter.is_available() {
123 for &col in &self.bloom_filter.columns {
124 map.entry(col)
125 .or_default()
126 .push(IndexType::BloomFilterIndex);
127 }
128 }
129
130 map.into_iter()
131 .map(|(column_id, created_indexes)| ColumnIndexMetadata {
132 column_id,
133 created_indexes,
134 })
135 .collect::<Vec<_>>()
136 }
137}
138
139#[derive(Debug, Clone, Default)]
141pub struct IndexBaseOutput {
142 pub index_size: ByteCount,
144 pub row_count: RowCount,
146 pub columns: Vec<ColumnId>,
148}
149
150impl IndexBaseOutput {
151 pub fn is_available(&self) -> bool {
152 self.index_size > 0
153 }
154}
155
156pub type InvertedIndexOutput = IndexBaseOutput;
158pub type FulltextIndexOutput = IndexBaseOutput;
160pub type BloomFilterOutput = IndexBaseOutput;
162
163#[derive(Default)]
165pub struct Indexer {
166 file_id: FileId,
167 region_id: RegionId,
168 index_version: u64,
169 puffin_manager: Option<SstPuffinManager>,
170 write_cache_enabled: bool,
171 inverted_indexer: Option<InvertedIndexer>,
172 last_mem_inverted_index: usize,
173 fulltext_indexer: Option<FulltextIndexer>,
174 last_mem_fulltext_index: usize,
175 bloom_filter_indexer: Option<BloomFilterIndexer>,
176 last_mem_bloom_filter: usize,
177 intermediate_manager: Option<IntermediateManager>,
178}
179
180impl Indexer {
181 pub async fn update(&mut self, batch: &mut Batch) {
183 self.do_update(batch).await;
184
185 self.flush_mem_metrics();
186 }
187
188 pub async fn update_flat(&mut self, batch: &RecordBatch) {
190 self.do_update_flat(batch).await;
191
192 self.flush_mem_metrics();
193 }
194
195 pub async fn finish(&mut self) -> IndexOutput {
197 let output = self.do_finish().await;
198
199 self.flush_mem_metrics();
200 output
201 }
202
203 pub async fn abort(&mut self) {
205 self.do_abort().await;
206
207 self.flush_mem_metrics();
208 }
209
210 fn flush_mem_metrics(&mut self) {
211 let inverted_mem = self
212 .inverted_indexer
213 .as_ref()
214 .map_or(0, |creator| creator.memory_usage());
215 INDEX_CREATE_MEMORY_USAGE
216 .with_label_values(&[TYPE_INVERTED_INDEX])
217 .add(inverted_mem as i64 - self.last_mem_inverted_index as i64);
218 self.last_mem_inverted_index = inverted_mem;
219
220 let fulltext_mem = self
221 .fulltext_indexer
222 .as_ref()
223 .map_or(0, |creator| creator.memory_usage());
224 INDEX_CREATE_MEMORY_USAGE
225 .with_label_values(&[TYPE_FULLTEXT_INDEX])
226 .add(fulltext_mem as i64 - self.last_mem_fulltext_index as i64);
227 self.last_mem_fulltext_index = fulltext_mem;
228
229 let bloom_filter_mem = self
230 .bloom_filter_indexer
231 .as_ref()
232 .map_or(0, |creator| creator.memory_usage());
233 INDEX_CREATE_MEMORY_USAGE
234 .with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
235 .add(bloom_filter_mem as i64 - self.last_mem_bloom_filter as i64);
236 self.last_mem_bloom_filter = bloom_filter_mem;
237 }
238}
239
240#[async_trait::async_trait]
241pub trait IndexerBuilder {
242 async fn build(&self, file_id: FileId, index_version: u64) -> Indexer;
244}
245#[derive(Clone)]
246pub(crate) struct IndexerBuilderImpl {
247 pub(crate) build_type: IndexBuildType,
248 pub(crate) metadata: RegionMetadataRef,
249 pub(crate) row_group_size: usize,
250 pub(crate) puffin_manager: SstPuffinManager,
251 pub(crate) write_cache_enabled: bool,
252 pub(crate) intermediate_manager: IntermediateManager,
253 pub(crate) index_options: IndexOptions,
254 pub(crate) inverted_index_config: InvertedIndexConfig,
255 pub(crate) fulltext_index_config: FulltextIndexConfig,
256 pub(crate) bloom_filter_index_config: BloomFilterConfig,
257}
258
259#[async_trait::async_trait]
260impl IndexerBuilder for IndexerBuilderImpl {
261 async fn build(&self, file_id: FileId, index_version: u64) -> Indexer {
263 let mut indexer = Indexer {
264 file_id,
265 region_id: self.metadata.region_id,
266 index_version,
267 write_cache_enabled: self.write_cache_enabled,
268 ..Default::default()
269 };
270
271 indexer.inverted_indexer = self.build_inverted_indexer(file_id);
272 indexer.fulltext_indexer = self.build_fulltext_indexer(file_id).await;
273 indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(file_id);
274 indexer.intermediate_manager = Some(self.intermediate_manager.clone());
275 if indexer.inverted_indexer.is_none()
276 && indexer.fulltext_indexer.is_none()
277 && indexer.bloom_filter_indexer.is_none()
278 {
279 indexer.abort().await;
280 return Indexer::default();
281 }
282
283 indexer.puffin_manager = Some(self.puffin_manager.clone());
284 indexer
285 }
286}
287
288impl IndexerBuilderImpl {
289 fn build_inverted_indexer(&self, file_id: FileId) -> Option<InvertedIndexer> {
290 let create = match self.build_type {
291 IndexBuildType::Flush => self.inverted_index_config.create_on_flush.auto(),
292 IndexBuildType::Compact => self.inverted_index_config.create_on_compaction.auto(),
293 _ => true,
294 };
295
296 if !create {
297 debug!(
298 "Skip creating inverted index due to config, region_id: {}, file_id: {}",
299 self.metadata.region_id, file_id,
300 );
301 return None;
302 }
303
304 let indexed_column_ids = self.metadata.inverted_indexed_column_ids(
305 self.index_options.inverted_index.ignore_column_ids.iter(),
306 );
307 if indexed_column_ids.is_empty() {
308 debug!(
309 "No columns to be indexed, skip creating inverted index, region_id: {}, file_id: {}",
310 self.metadata.region_id, file_id,
311 );
312 return None;
313 }
314
315 let Some(mut segment_row_count) =
316 NonZeroUsize::new(self.index_options.inverted_index.segment_row_count)
317 else {
318 warn!(
319 "Segment row count is 0, skip creating index, region_id: {}, file_id: {}",
320 self.metadata.region_id, file_id,
321 );
322 return None;
323 };
324
325 let Some(row_group_size) = NonZeroUsize::new(self.row_group_size) else {
326 warn!(
327 "Row group size is 0, skip creating index, region_id: {}, file_id: {}",
328 self.metadata.region_id, file_id,
329 );
330 return None;
331 };
332
333 if row_group_size.get() % segment_row_count.get() != 0 {
335 segment_row_count = row_group_size;
336 }
337
338 let indexer = InvertedIndexer::new(
339 file_id,
340 &self.metadata,
341 self.intermediate_manager.clone(),
342 self.inverted_index_config.mem_threshold_on_create(),
343 segment_row_count,
344 indexed_column_ids,
345 );
346
347 Some(indexer)
348 }
349
350 async fn build_fulltext_indexer(&self, file_id: FileId) -> Option<FulltextIndexer> {
351 let create = match self.build_type {
352 IndexBuildType::Flush => self.fulltext_index_config.create_on_flush.auto(),
353 IndexBuildType::Compact => self.fulltext_index_config.create_on_compaction.auto(),
354 _ => true,
355 };
356
357 if !create {
358 debug!(
359 "Skip creating full-text index due to config, region_id: {}, file_id: {}",
360 self.metadata.region_id, file_id,
361 );
362 return None;
363 }
364
365 let mem_limit = self.fulltext_index_config.mem_threshold_on_create();
366 let creator = FulltextIndexer::new(
367 &self.metadata.region_id,
368 &file_id,
369 &self.intermediate_manager,
370 &self.metadata,
371 self.fulltext_index_config.compress,
372 mem_limit,
373 )
374 .await;
375
376 let err = match creator {
377 Ok(creator) => {
378 if creator.is_none() {
379 debug!(
380 "Skip creating full-text index due to no columns require indexing, region_id: {}, file_id: {}",
381 self.metadata.region_id, file_id,
382 );
383 }
384 return creator;
385 }
386 Err(err) => err,
387 };
388
389 if cfg!(any(test, feature = "test")) {
390 panic!(
391 "Failed to create full-text indexer, region_id: {}, file_id: {}, err: {:?}",
392 self.metadata.region_id, file_id, err
393 );
394 } else {
395 warn!(
396 err; "Failed to create full-text indexer, region_id: {}, file_id: {}",
397 self.metadata.region_id, file_id,
398 );
399 }
400
401 None
402 }
403
404 fn build_bloom_filter_indexer(&self, file_id: FileId) -> Option<BloomFilterIndexer> {
405 let create = match self.build_type {
406 IndexBuildType::Flush => self.bloom_filter_index_config.create_on_flush.auto(),
407 IndexBuildType::Compact => self.bloom_filter_index_config.create_on_compaction.auto(),
408 _ => true,
409 };
410
411 if !create {
412 debug!(
413 "Skip creating bloom filter due to config, region_id: {}, file_id: {}",
414 self.metadata.region_id, file_id,
415 );
416 return None;
417 }
418
419 let mem_limit = self.bloom_filter_index_config.mem_threshold_on_create();
420 let indexer = BloomFilterIndexer::new(
421 file_id,
422 &self.metadata,
423 self.intermediate_manager.clone(),
424 mem_limit,
425 );
426
427 let err = match indexer {
428 Ok(indexer) => {
429 if indexer.is_none() {
430 debug!(
431 "Skip creating bloom filter due to no columns require indexing, region_id: {}, file_id: {}",
432 self.metadata.region_id, file_id,
433 );
434 }
435 return indexer;
436 }
437 Err(err) => err,
438 };
439
440 if cfg!(any(test, feature = "test")) {
441 panic!(
442 "Failed to create bloom filter, region_id: {}, file_id: {}, err: {:?}",
443 self.metadata.region_id, file_id, err
444 );
445 } else {
446 warn!(
447 err; "Failed to create bloom filter, region_id: {}, file_id: {}",
448 self.metadata.region_id, file_id,
449 );
450 }
451
452 None
453 }
454}
455
456#[derive(Debug, Clone, IntoStaticStr, PartialEq)]
458pub enum IndexBuildType {
459 SchemaChange,
461 Flush,
463 Compact,
465 Manual,
467}
468
469impl IndexBuildType {
470 fn as_str(&self) -> &'static str {
471 self.into()
472 }
473
474 fn priority(&self) -> u8 {
476 match self {
477 IndexBuildType::Manual => 3,
478 IndexBuildType::SchemaChange => 2,
479 IndexBuildType::Flush => 1,
480 IndexBuildType::Compact => 0,
481 }
482 }
483}
484
485impl From<OperationType> for IndexBuildType {
486 fn from(op_type: OperationType) -> Self {
487 match op_type {
488 OperationType::Flush => IndexBuildType::Flush,
489 OperationType::Compact => IndexBuildType::Compact,
490 }
491 }
492}
493
494#[derive(Debug, Clone, PartialEq, Eq, Hash)]
496pub enum IndexBuildOutcome {
497 Finished,
498 Aborted(String),
499}
500
501pub type ResultMpscSender = Sender<Result<IndexBuildOutcome>>;
503
504#[derive(Clone)]
505pub struct IndexBuildTask {
506 pub file_meta: FileMeta,
508 pub reason: IndexBuildType,
509 pub access_layer: AccessLayerRef,
510 pub(crate) listener: WorkerListener,
511 pub(crate) manifest_ctx: ManifestContextRef,
512 pub write_cache: Option<WriteCacheRef>,
513 pub file_purger: FilePurgerRef,
514 pub indexer_builder: Arc<dyn IndexerBuilder + Send + Sync>,
517 pub(crate) request_sender: Sender<WorkerRequestWithTime>,
519 pub(crate) result_sender: ResultMpscSender,
521}
522
523impl std::fmt::Debug for IndexBuildTask {
524 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
525 f.debug_struct("IndexBuildTask")
526 .field("region_id", &self.file_meta.region_id)
527 .field("file_id", &self.file_meta.file_id)
528 .field("reason", &self.reason)
529 .finish()
530 }
531}
532
533impl IndexBuildTask {
534 pub async fn on_success(&self, outcome: IndexBuildOutcome) {
536 let _ = self.result_sender.send(Ok(outcome)).await;
537 }
538
539 pub async fn on_failure(&self, err: Arc<Error>) {
541 let _ = self
542 .result_sender
543 .send(Err(err.clone()).context(BuildIndexAsyncSnafu {
544 region_id: self.file_meta.region_id,
545 }))
546 .await;
547 }
548
549 fn into_index_build_job(mut self, version_control: VersionControlRef) -> Job {
550 Box::pin(async move {
551 self.do_index_build(version_control).await;
552 })
553 }
554
555 async fn do_index_build(&mut self, version_control: VersionControlRef) {
556 self.listener
557 .on_index_build_begin(RegionFileId::new(
558 self.file_meta.region_id,
559 self.file_meta.file_id,
560 ))
561 .await;
562 match self.index_build(version_control).await {
563 Ok(outcome) => self.on_success(outcome).await,
564 Err(e) => {
565 warn!(
566 e; "Index build task failed, region: {}, file_id: {}",
567 self.file_meta.region_id, self.file_meta.file_id,
568 );
569 self.on_failure(e.into()).await
570 }
571 }
572 let worker_request = WorkerRequest::Background {
573 region_id: self.file_meta.region_id,
574 notify: BackgroundNotify::IndexBuildStopped(IndexBuildStopped {
575 region_id: self.file_meta.region_id,
576 file_id: self.file_meta.file_id,
577 }),
578 };
579 let _ = self
580 .request_sender
581 .send(WorkerRequestWithTime::new(worker_request))
582 .await;
583 }
584
585 async fn check_sst_file_exists(&self, version_control: &VersionControlRef) -> bool {
587 let file_id = self.file_meta.file_id;
588 let level = self.file_meta.level;
589 let version = version_control.current().version;
591
592 let Some(level_files) = version.ssts.levels().get(level as usize) else {
593 warn!(
594 "File id {} not found in level {} for index build, region: {}",
595 file_id, level, self.file_meta.region_id
596 );
597 return false;
598 };
599
600 match level_files.files.get(&file_id) {
601 Some(handle) if !handle.is_deleted() && !handle.compacting() => {
602 true
606 }
607 _ => {
608 warn!(
609 "File id {} not found in region version for index build, region: {}",
610 file_id, self.file_meta.region_id
611 );
612 false
613 }
614 }
615 }
616
617 async fn index_build(
618 &mut self,
619 version_control: VersionControlRef,
620 ) -> Result<IndexBuildOutcome> {
621 let new_index_version = if self.file_meta.index_file_size > 0 {
623 self.file_meta.index_version + 1
625 } else {
626 0 };
628
629 let index_file_id = self.file_meta.file_id;
631 let mut indexer = self
632 .indexer_builder
633 .build(index_file_id, new_index_version)
634 .await;
635
636 if !self.check_sst_file_exists(&version_control).await {
638 indexer.abort().await;
640 self.listener
641 .on_index_build_abort(RegionFileId::new(
642 self.file_meta.region_id,
643 self.file_meta.file_id,
644 ))
645 .await;
646 return Ok(IndexBuildOutcome::Aborted(format!(
647 "SST file not found during index build, region: {}, file_id: {}",
648 self.file_meta.region_id, self.file_meta.file_id
649 )));
650 }
651
652 let mut parquet_reader = self
653 .access_layer
654 .read_sst(FileHandle::new(
655 self.file_meta.clone(),
656 self.file_purger.clone(),
657 ))
658 .build()
659 .await?;
660
661 loop {
663 match parquet_reader.next_batch().await {
664 Ok(Some(batch)) => {
665 indexer.update(&mut batch.clone()).await;
666 }
667 Ok(None) => break,
668 Err(e) => {
669 indexer.abort().await;
670 return Err(e);
671 }
672 }
673 }
674 let index_output = indexer.finish().await;
675
676 if index_output.file_size > 0 {
677 if !self.check_sst_file_exists(&version_control).await {
679 indexer.abort().await;
681 self.listener
682 .on_index_build_abort(RegionFileId::new(
683 self.file_meta.region_id,
684 self.file_meta.file_id,
685 ))
686 .await;
687 return Ok(IndexBuildOutcome::Aborted(format!(
688 "SST file not found during index build, region: {}, file_id: {}",
689 self.file_meta.region_id, self.file_meta.file_id
690 )));
691 }
692
693 self.maybe_upload_index_file(index_output.clone(), index_file_id, new_index_version)
695 .await?;
696
697 let worker_request = match self.update_manifest(index_output, new_index_version).await {
698 Ok(edit) => {
699 let index_build_finished = IndexBuildFinished {
700 region_id: self.file_meta.region_id,
701 edit,
702 };
703 WorkerRequest::Background {
704 region_id: self.file_meta.region_id,
705 notify: BackgroundNotify::IndexBuildFinished(index_build_finished),
706 }
707 }
708 Err(e) => {
709 let err = Arc::new(e);
710 WorkerRequest::Background {
711 region_id: self.file_meta.region_id,
712 notify: BackgroundNotify::IndexBuildFailed(IndexBuildFailed { err }),
713 }
714 }
715 };
716
717 let _ = self
718 .request_sender
719 .send(WorkerRequestWithTime::new(worker_request))
720 .await;
721 }
722 Ok(IndexBuildOutcome::Finished)
723 }
724
725 async fn maybe_upload_index_file(
726 &self,
727 output: IndexOutput,
728 index_file_id: FileId,
729 index_version: u64,
730 ) -> Result<()> {
731 if let Some(write_cache) = &self.write_cache {
732 let file_id = self.file_meta.file_id;
733 let region_id = self.file_meta.region_id;
734 let remote_store = self.access_layer.object_store();
735 let mut upload_tracker = UploadTracker::new(region_id);
736 let mut err = None;
737 let puffin_key =
738 IndexKey::new(region_id, index_file_id, FileType::Puffin(output.version));
739 let index_id = RegionIndexId::new(RegionFileId::new(region_id, file_id), index_version);
740 let puffin_path = RegionFilePathFactory::new(
741 self.access_layer.table_dir().to_string(),
742 self.access_layer.path_type(),
743 )
744 .build_index_file_path_with_version(index_id);
745 if let Err(e) = write_cache
746 .upload(puffin_key, &puffin_path, remote_store)
747 .await
748 {
749 err = Some(e);
750 }
751 upload_tracker.push_uploaded_file(puffin_path);
752 if let Some(err) = err {
753 upload_tracker
755 .clean(
756 &smallvec![SstInfo {
757 file_id,
758 index_metadata: output,
759 ..Default::default()
760 }],
761 &write_cache.file_cache(),
762 remote_store,
763 )
764 .await;
765 return Err(err);
766 }
767 } else {
768 debug!("write cache is not available, skip uploading index file");
769 }
770 Ok(())
771 }
772
773 async fn update_manifest(
774 &mut self,
775 output: IndexOutput,
776 new_index_version: u64,
777 ) -> Result<RegionEdit> {
778 self.file_meta.available_indexes = output.build_available_indexes();
779 self.file_meta.indexes = output.build_indexes();
780 self.file_meta.index_file_size = output.file_size;
781 self.file_meta.index_version = new_index_version;
782 let edit = RegionEdit {
783 files_to_add: vec![self.file_meta.clone()],
784 files_to_remove: vec![],
785 timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
786 flushed_sequence: None,
787 flushed_entry_id: None,
788 committed_sequence: None,
789 compaction_time_window: None,
790 };
791 let version = self
792 .manifest_ctx
793 .update_manifest(
794 RegionLeaderState::Writable,
795 RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())),
796 false,
797 )
798 .await?;
799 info!(
800 "Successfully update manifest version to {version}, region: {}, reason: {}",
801 self.file_meta.region_id,
802 self.reason.as_str()
803 );
804 Ok(edit)
805 }
806}
807
808impl PartialEq for IndexBuildTask {
809 fn eq(&self, other: &Self) -> bool {
810 self.reason.priority() == other.reason.priority()
811 }
812}
813
814impl Eq for IndexBuildTask {}
815
816impl PartialOrd for IndexBuildTask {
817 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
818 Some(self.cmp(other))
819 }
820}
821
822impl Ord for IndexBuildTask {
823 fn cmp(&self, other: &Self) -> Ordering {
824 self.reason.priority().cmp(&other.reason.priority())
825 }
826}
827
828pub struct IndexBuildStatus {
830 pub region_id: RegionId,
831 pub building_files: HashSet<FileId>,
832 pub pending_tasks: BinaryHeap<IndexBuildTask>,
833}
834
835impl IndexBuildStatus {
836 pub fn new(region_id: RegionId) -> Self {
837 IndexBuildStatus {
838 region_id,
839 building_files: HashSet::new(),
840 pending_tasks: BinaryHeap::new(),
841 }
842 }
843
844 async fn on_failure(self, err: Arc<Error>) {
845 for task in self.pending_tasks {
846 task.on_failure(err.clone()).await;
847 }
848 }
849}
850
851pub struct IndexBuildScheduler {
852 scheduler: SchedulerRef,
854 region_status: HashMap<RegionId, IndexBuildStatus>,
856 files_limit: usize,
858}
859
860impl IndexBuildScheduler {
862 pub fn new(scheduler: SchedulerRef, files_limit: usize) -> Self {
863 IndexBuildScheduler {
864 scheduler,
865 region_status: HashMap::new(),
866 files_limit,
867 }
868 }
869
870 pub(crate) async fn schedule_build(
871 &mut self,
872 version_control: &VersionControlRef,
873 task: IndexBuildTask,
874 ) -> Result<()> {
875 let status = self
876 .region_status
877 .entry(task.file_meta.region_id)
878 .or_insert_with(|| IndexBuildStatus::new(task.file_meta.region_id));
879
880 if status.building_files.contains(&task.file_meta.file_id) {
881 let region_file_id =
882 RegionFileId::new(task.file_meta.region_id, task.file_meta.file_id);
883 debug!(
884 "Aborting index build task since index is already being built for region file {:?}",
885 region_file_id
886 );
887 task.on_success(IndexBuildOutcome::Aborted(format!(
888 "Index is already being built for region file {:?}",
889 region_file_id
890 )))
891 .await;
892 task.listener.on_index_build_abort(region_file_id).await;
893 return Ok(());
894 }
895
896 status.pending_tasks.push(task);
897
898 self.schedule_next_build_batch(version_control);
899 Ok(())
900 }
901
902 fn schedule_next_build_batch(&mut self, version_control: &VersionControlRef) {
904 let mut building_count = 0;
905 for status in self.region_status.values() {
906 building_count += status.building_files.len();
907 }
908
909 while building_count < self.files_limit {
910 if let Some(task) = self.find_next_task() {
911 let region_id = task.file_meta.region_id;
912 let file_id = task.file_meta.file_id;
913 let job = task.into_index_build_job(version_control.clone());
914 if self.scheduler.schedule(job).is_ok() {
915 if let Some(status) = self.region_status.get_mut(®ion_id) {
916 status.building_files.insert(file_id);
917 building_count += 1;
918 status
919 .pending_tasks
920 .retain(|t| t.file_meta.file_id != file_id);
921 } else {
922 error!(
923 "Region status not found when scheduling index build task, region: {}",
924 region_id
925 );
926 }
927 } else {
928 error!(
929 "Failed to schedule index build job, region: {}, file_id: {}",
930 region_id, file_id
931 );
932 }
933 } else {
934 break;
936 }
937 }
938 }
939
940 fn find_next_task(&self) -> Option<IndexBuildTask> {
942 self.region_status
943 .iter()
944 .filter_map(|(_, status)| status.pending_tasks.peek())
945 .max()
946 .cloned()
947 }
948
949 pub(crate) fn on_task_stopped(
950 &mut self,
951 region_id: RegionId,
952 file_id: FileId,
953 version_control: &VersionControlRef,
954 ) {
955 if let Some(status) = self.region_status.get_mut(®ion_id) {
956 status.building_files.remove(&file_id);
957 if status.building_files.is_empty() && status.pending_tasks.is_empty() {
958 self.region_status.remove(®ion_id);
960 }
961 }
962
963 self.schedule_next_build_batch(version_control);
964 }
965
966 pub(crate) async fn on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
967 error!(
968 err; "Index build scheduler encountered failure for region {}, removing all pending tasks.",
969 region_id
970 );
971 let Some(status) = self.region_status.remove(®ion_id) else {
972 return;
973 };
974 status.on_failure(err).await;
975 }
976
977 pub(crate) async fn on_region_dropped(&mut self, region_id: RegionId) {
979 self.remove_region_on_failure(
980 region_id,
981 Arc::new(RegionDroppedSnafu { region_id }.build()),
982 )
983 .await;
984 }
985
986 pub(crate) async fn on_region_closed(&mut self, region_id: RegionId) {
988 self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()))
989 .await;
990 }
991
992 pub(crate) async fn on_region_truncated(&mut self, region_id: RegionId) {
994 self.remove_region_on_failure(
995 region_id,
996 Arc::new(RegionTruncatedSnafu { region_id }.build()),
997 )
998 .await;
999 }
1000
1001 async fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
1002 let Some(status) = self.region_status.remove(®ion_id) else {
1003 return;
1004 };
1005 status.on_failure(err).await;
1006 }
1007}
1008
1009pub(crate) fn decode_primary_keys_with_counts(
1012 batch: &RecordBatch,
1013 codec: &IndexValuesCodec,
1014) -> Result<Vec<(CompositeValues, usize)>> {
1015 let primary_key_index = primary_key_column_index(batch.num_columns());
1016 let pk_dict_array = batch
1017 .column(primary_key_index)
1018 .as_any()
1019 .downcast_ref::<PrimaryKeyArray>()
1020 .context(InvalidRecordBatchSnafu {
1021 reason: "Primary key column is not a dictionary array",
1022 })?;
1023 let pk_values_array = pk_dict_array
1024 .values()
1025 .as_any()
1026 .downcast_ref::<BinaryArray>()
1027 .context(InvalidRecordBatchSnafu {
1028 reason: "Primary key values are not binary array",
1029 })?;
1030 let keys = pk_dict_array.keys();
1031
1032 let mut result: Vec<(CompositeValues, usize)> = Vec::new();
1034 let mut prev_key: Option<u32> = None;
1035
1036 for i in 0..keys.len() {
1037 let current_key = keys.value(i);
1038
1039 if let Some(prev) = prev_key
1041 && prev == current_key
1042 {
1043 result.last_mut().unwrap().1 += 1;
1045 continue;
1046 }
1047
1048 let pk_bytes = pk_values_array.value(current_key as usize);
1050 let decoded_value = codec.decoder().decode(pk_bytes).context(DecodeSnafu)?;
1051
1052 result.push((decoded_value, 1));
1053 prev_key = Some(current_key);
1054 }
1055
1056 Ok(result)
1057}
1058
1059#[cfg(test)]
1060mod tests {
1061 use std::sync::Arc;
1062
1063 use api::v1::SemanticType;
1064 use common_base::readable_size::ReadableSize;
1065 use datafusion_common::HashMap;
1066 use datatypes::data_type::ConcreteDataType;
1067 use datatypes::schema::{
1068 ColumnSchema, FulltextOptions, SkippingIndexOptions, SkippingIndexType,
1069 };
1070 use object_store::ObjectStore;
1071 use object_store::services::Memory;
1072 use puffin_manager::PuffinManagerFactory;
1073 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1074 use tokio::sync::mpsc;
1075
1076 use super::*;
1077 use crate::access_layer::{FilePathProvider, Metrics, SstWriteRequest, WriteType};
1078 use crate::cache::write_cache::WriteCache;
1079 use crate::config::{FulltextIndexConfig, IndexBuildMode, MitoConfig, Mode};
1080 use crate::memtable::time_partition::TimePartitions;
1081 use crate::region::version::{VersionBuilder, VersionControl};
1082 use crate::sst::file::RegionFileId;
1083 use crate::sst::file_purger::NoopFilePurger;
1084 use crate::sst::location;
1085 use crate::sst::parquet::WriteOptions;
1086 use crate::test_util::memtable_util::EmptyMemtableBuilder;
1087 use crate::test_util::scheduler_util::SchedulerEnv;
1088 use crate::test_util::sst_util::{new_batch_by_range, new_source, sst_region_metadata};
1089
1090 struct MetaConfig {
1091 with_inverted: bool,
1092 with_fulltext: bool,
1093 with_skipping_bloom: bool,
1094 }
1095
1096 fn mock_region_metadata(
1097 MetaConfig {
1098 with_inverted,
1099 with_fulltext,
1100 with_skipping_bloom,
1101 }: MetaConfig,
1102 ) -> RegionMetadataRef {
1103 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
1104 let mut column_schema = ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false);
1105 if with_inverted {
1106 column_schema = column_schema.with_inverted_index(true);
1107 }
1108 builder
1109 .push_column_metadata(ColumnMetadata {
1110 column_schema,
1111 semantic_type: SemanticType::Field,
1112 column_id: 1,
1113 })
1114 .push_column_metadata(ColumnMetadata {
1115 column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
1116 semantic_type: SemanticType::Field,
1117 column_id: 2,
1118 })
1119 .push_column_metadata(ColumnMetadata {
1120 column_schema: ColumnSchema::new(
1121 "c",
1122 ConcreteDataType::timestamp_millisecond_datatype(),
1123 false,
1124 ),
1125 semantic_type: SemanticType::Timestamp,
1126 column_id: 3,
1127 });
1128
1129 if with_fulltext {
1130 let column_schema =
1131 ColumnSchema::new("text", ConcreteDataType::string_datatype(), true)
1132 .with_fulltext_options(FulltextOptions {
1133 enable: true,
1134 ..Default::default()
1135 })
1136 .unwrap();
1137
1138 let column = ColumnMetadata {
1139 column_schema,
1140 semantic_type: SemanticType::Field,
1141 column_id: 4,
1142 };
1143
1144 builder.push_column_metadata(column);
1145 }
1146
1147 if with_skipping_bloom {
1148 let column_schema =
1149 ColumnSchema::new("bloom", ConcreteDataType::string_datatype(), false)
1150 .with_skipping_options(SkippingIndexOptions::new_unchecked(
1151 42,
1152 0.01,
1153 SkippingIndexType::BloomFilter,
1154 ))
1155 .unwrap();
1156
1157 let column = ColumnMetadata {
1158 column_schema,
1159 semantic_type: SemanticType::Field,
1160 column_id: 5,
1161 };
1162
1163 builder.push_column_metadata(column);
1164 }
1165
1166 Arc::new(builder.build().unwrap())
1167 }
1168
1169 fn mock_object_store() -> ObjectStore {
1170 ObjectStore::new(Memory::default()).unwrap().finish()
1171 }
1172
1173 async fn mock_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
1174 IntermediateManager::init_fs(path).await.unwrap()
1175 }
1176 struct NoopPathProvider;
1177
1178 impl FilePathProvider for NoopPathProvider {
1179 fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
1180 unreachable!()
1181 }
1182
1183 fn build_index_file_path_with_version(&self, _index_id: RegionIndexId) -> String {
1184 unreachable!()
1185 }
1186
1187 fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
1188 unreachable!()
1189 }
1190 }
1191
1192 async fn mock_sst_file(
1193 metadata: RegionMetadataRef,
1194 env: &SchedulerEnv,
1195 build_mode: IndexBuildMode,
1196 ) -> SstInfo {
1197 let source = new_source(&[
1198 new_batch_by_range(&["a", "d"], 0, 60),
1199 new_batch_by_range(&["b", "f"], 0, 40),
1200 new_batch_by_range(&["b", "h"], 100, 200),
1201 ]);
1202 let mut index_config = MitoConfig::default().index;
1203 index_config.build_mode = build_mode;
1204 let write_request = SstWriteRequest {
1205 op_type: OperationType::Flush,
1206 metadata: metadata.clone(),
1207 source: either::Left(source),
1208 storage: None,
1209 max_sequence: None,
1210 cache_manager: Default::default(),
1211 index_options: IndexOptions::default(),
1212 index_config,
1213 inverted_index_config: Default::default(),
1214 fulltext_index_config: Default::default(),
1215 bloom_filter_index_config: Default::default(),
1216 };
1217 let mut metrics = Metrics::new(WriteType::Flush);
1218 env.access_layer
1219 .write_sst(write_request, &WriteOptions::default(), &mut metrics)
1220 .await
1221 .unwrap()
1222 .remove(0)
1223 }
1224
1225 async fn mock_version_control(
1226 metadata: RegionMetadataRef,
1227 file_purger: FilePurgerRef,
1228 files: HashMap<FileId, FileMeta>,
1229 ) -> VersionControlRef {
1230 let mutable = Arc::new(TimePartitions::new(
1231 metadata.clone(),
1232 Arc::new(EmptyMemtableBuilder::default()),
1233 0,
1234 None,
1235 ));
1236 let version_builder = VersionBuilder::new(metadata, mutable)
1237 .add_files(file_purger, files.values().cloned())
1238 .build();
1239 Arc::new(VersionControl::new(version_builder))
1240 }
1241
1242 async fn mock_indexer_builder(
1243 metadata: RegionMetadataRef,
1244 env: &SchedulerEnv,
1245 ) -> Arc<dyn IndexerBuilder + Send + Sync> {
1246 let (dir, factory) = PuffinManagerFactory::new_for_test_async("mock_indexer_builder").await;
1247 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1248 let puffin_manager = factory.build(
1249 env.access_layer.object_store().clone(),
1250 RegionFilePathFactory::new(
1251 env.access_layer.table_dir().to_string(),
1252 env.access_layer.path_type(),
1253 ),
1254 );
1255 Arc::new(IndexerBuilderImpl {
1256 build_type: IndexBuildType::Flush,
1257 metadata,
1258 row_group_size: 1024,
1259 puffin_manager,
1260 write_cache_enabled: false,
1261 intermediate_manager: intm_manager,
1262 index_options: IndexOptions::default(),
1263 inverted_index_config: InvertedIndexConfig::default(),
1264 fulltext_index_config: FulltextIndexConfig::default(),
1265 bloom_filter_index_config: BloomFilterConfig::default(),
1266 })
1267 }
1268
1269 #[tokio::test]
1270 async fn test_build_indexer_basic() {
1271 let (dir, factory) =
1272 PuffinManagerFactory::new_for_test_async("test_build_indexer_basic_").await;
1273 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1274
1275 let metadata = mock_region_metadata(MetaConfig {
1276 with_inverted: true,
1277 with_fulltext: true,
1278 with_skipping_bloom: true,
1279 });
1280 let indexer = IndexerBuilderImpl {
1281 build_type: IndexBuildType::Flush,
1282 metadata,
1283 row_group_size: 1024,
1284 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1285 write_cache_enabled: false,
1286 intermediate_manager: intm_manager,
1287 index_options: IndexOptions::default(),
1288 inverted_index_config: InvertedIndexConfig::default(),
1289 fulltext_index_config: FulltextIndexConfig::default(),
1290 bloom_filter_index_config: BloomFilterConfig::default(),
1291 }
1292 .build(FileId::random(), 0)
1293 .await;
1294
1295 assert!(indexer.inverted_indexer.is_some());
1296 assert!(indexer.fulltext_indexer.is_some());
1297 assert!(indexer.bloom_filter_indexer.is_some());
1298 }
1299
1300 #[tokio::test]
1301 async fn test_build_indexer_disable_create() {
1302 let (dir, factory) =
1303 PuffinManagerFactory::new_for_test_async("test_build_indexer_disable_create_").await;
1304 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1305
1306 let metadata = mock_region_metadata(MetaConfig {
1307 with_inverted: true,
1308 with_fulltext: true,
1309 with_skipping_bloom: true,
1310 });
1311 let indexer = IndexerBuilderImpl {
1312 build_type: IndexBuildType::Flush,
1313 metadata: metadata.clone(),
1314 row_group_size: 1024,
1315 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1316 write_cache_enabled: false,
1317 intermediate_manager: intm_manager.clone(),
1318 index_options: IndexOptions::default(),
1319 inverted_index_config: InvertedIndexConfig {
1320 create_on_flush: Mode::Disable,
1321 ..Default::default()
1322 },
1323 fulltext_index_config: FulltextIndexConfig::default(),
1324 bloom_filter_index_config: BloomFilterConfig::default(),
1325 }
1326 .build(FileId::random(), 0)
1327 .await;
1328
1329 assert!(indexer.inverted_indexer.is_none());
1330 assert!(indexer.fulltext_indexer.is_some());
1331 assert!(indexer.bloom_filter_indexer.is_some());
1332
1333 let indexer = IndexerBuilderImpl {
1334 build_type: IndexBuildType::Compact,
1335 metadata: metadata.clone(),
1336 row_group_size: 1024,
1337 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1338 write_cache_enabled: false,
1339 intermediate_manager: intm_manager.clone(),
1340 index_options: IndexOptions::default(),
1341 inverted_index_config: InvertedIndexConfig::default(),
1342 fulltext_index_config: FulltextIndexConfig {
1343 create_on_compaction: Mode::Disable,
1344 ..Default::default()
1345 },
1346 bloom_filter_index_config: BloomFilterConfig::default(),
1347 }
1348 .build(FileId::random(), 0)
1349 .await;
1350
1351 assert!(indexer.inverted_indexer.is_some());
1352 assert!(indexer.fulltext_indexer.is_none());
1353 assert!(indexer.bloom_filter_indexer.is_some());
1354
1355 let indexer = IndexerBuilderImpl {
1356 build_type: IndexBuildType::Compact,
1357 metadata,
1358 row_group_size: 1024,
1359 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1360 write_cache_enabled: false,
1361 intermediate_manager: intm_manager,
1362 index_options: IndexOptions::default(),
1363 inverted_index_config: InvertedIndexConfig::default(),
1364 fulltext_index_config: FulltextIndexConfig::default(),
1365 bloom_filter_index_config: BloomFilterConfig {
1366 create_on_compaction: Mode::Disable,
1367 ..Default::default()
1368 },
1369 }
1370 .build(FileId::random(), 0)
1371 .await;
1372
1373 assert!(indexer.inverted_indexer.is_some());
1374 assert!(indexer.fulltext_indexer.is_some());
1375 assert!(indexer.bloom_filter_indexer.is_none());
1376 }
1377
1378 #[tokio::test]
1379 async fn test_build_indexer_no_required() {
1380 let (dir, factory) =
1381 PuffinManagerFactory::new_for_test_async("test_build_indexer_no_required_").await;
1382 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1383
1384 let metadata = mock_region_metadata(MetaConfig {
1385 with_inverted: false,
1386 with_fulltext: true,
1387 with_skipping_bloom: true,
1388 });
1389 let indexer = IndexerBuilderImpl {
1390 build_type: IndexBuildType::Flush,
1391 metadata: metadata.clone(),
1392 row_group_size: 1024,
1393 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1394 write_cache_enabled: false,
1395 intermediate_manager: intm_manager.clone(),
1396 index_options: IndexOptions::default(),
1397 inverted_index_config: InvertedIndexConfig::default(),
1398 fulltext_index_config: FulltextIndexConfig::default(),
1399 bloom_filter_index_config: BloomFilterConfig::default(),
1400 }
1401 .build(FileId::random(), 0)
1402 .await;
1403
1404 assert!(indexer.inverted_indexer.is_none());
1405 assert!(indexer.fulltext_indexer.is_some());
1406 assert!(indexer.bloom_filter_indexer.is_some());
1407
1408 let metadata = mock_region_metadata(MetaConfig {
1409 with_inverted: true,
1410 with_fulltext: false,
1411 with_skipping_bloom: true,
1412 });
1413 let indexer = IndexerBuilderImpl {
1414 build_type: IndexBuildType::Flush,
1415 metadata: metadata.clone(),
1416 row_group_size: 1024,
1417 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1418 write_cache_enabled: false,
1419 intermediate_manager: intm_manager.clone(),
1420 index_options: IndexOptions::default(),
1421 inverted_index_config: InvertedIndexConfig::default(),
1422 fulltext_index_config: FulltextIndexConfig::default(),
1423 bloom_filter_index_config: BloomFilterConfig::default(),
1424 }
1425 .build(FileId::random(), 0)
1426 .await;
1427
1428 assert!(indexer.inverted_indexer.is_some());
1429 assert!(indexer.fulltext_indexer.is_none());
1430 assert!(indexer.bloom_filter_indexer.is_some());
1431
1432 let metadata = mock_region_metadata(MetaConfig {
1433 with_inverted: true,
1434 with_fulltext: true,
1435 with_skipping_bloom: false,
1436 });
1437 let indexer = IndexerBuilderImpl {
1438 build_type: IndexBuildType::Flush,
1439 metadata: metadata.clone(),
1440 row_group_size: 1024,
1441 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1442 write_cache_enabled: false,
1443 intermediate_manager: intm_manager,
1444 index_options: IndexOptions::default(),
1445 inverted_index_config: InvertedIndexConfig::default(),
1446 fulltext_index_config: FulltextIndexConfig::default(),
1447 bloom_filter_index_config: BloomFilterConfig::default(),
1448 }
1449 .build(FileId::random(), 0)
1450 .await;
1451
1452 assert!(indexer.inverted_indexer.is_some());
1453 assert!(indexer.fulltext_indexer.is_some());
1454 assert!(indexer.bloom_filter_indexer.is_none());
1455 }
1456
1457 #[tokio::test]
1458 async fn test_build_indexer_zero_row_group() {
1459 let (dir, factory) =
1460 PuffinManagerFactory::new_for_test_async("test_build_indexer_zero_row_group_").await;
1461 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1462
1463 let metadata = mock_region_metadata(MetaConfig {
1464 with_inverted: true,
1465 with_fulltext: true,
1466 with_skipping_bloom: true,
1467 });
1468 let indexer = IndexerBuilderImpl {
1469 build_type: IndexBuildType::Flush,
1470 metadata,
1471 row_group_size: 0,
1472 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1473 write_cache_enabled: false,
1474 intermediate_manager: intm_manager,
1475 index_options: IndexOptions::default(),
1476 inverted_index_config: InvertedIndexConfig::default(),
1477 fulltext_index_config: FulltextIndexConfig::default(),
1478 bloom_filter_index_config: BloomFilterConfig::default(),
1479 }
1480 .build(FileId::random(), 0)
1481 .await;
1482
1483 assert!(indexer.inverted_indexer.is_none());
1484 }
1485
1486 #[tokio::test]
1487 async fn test_index_build_task_sst_not_exist() {
1488 let env = SchedulerEnv::new().await;
1489 let (tx, _rx) = mpsc::channel(4);
1490 let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1491 let mut scheduler = env.mock_index_build_scheduler(4);
1492 let metadata = Arc::new(sst_region_metadata());
1493 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1494 let file_purger = Arc::new(NoopFilePurger {});
1495 let files = HashMap::new();
1496 let version_control =
1497 mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1498 let region_id = metadata.region_id;
1499 let indexer_builder = mock_indexer_builder(metadata, &env).await;
1500
1501 let task = IndexBuildTask {
1503 file_meta: FileMeta {
1504 region_id,
1505 file_id: FileId::random(),
1506 file_size: 100,
1507 ..Default::default()
1508 },
1509 reason: IndexBuildType::Flush,
1510 access_layer: env.access_layer.clone(),
1511 listener: WorkerListener::default(),
1512 manifest_ctx,
1513 write_cache: None,
1514 file_purger,
1515 indexer_builder,
1516 request_sender: tx,
1517 result_sender: result_tx,
1518 };
1519
1520 scheduler
1522 .schedule_build(&version_control, task)
1523 .await
1524 .unwrap();
1525 match result_rx.recv().await.unwrap() {
1526 Ok(outcome) => {
1527 if outcome == IndexBuildOutcome::Finished {
1528 panic!("Expect aborted result due to missing SST file")
1529 }
1530 }
1531 _ => panic!("Expect aborted result due to missing SST file"),
1532 }
1533 }
1534
1535 #[tokio::test]
1536 async fn test_index_build_task_sst_exist() {
1537 let env = SchedulerEnv::new().await;
1538 let mut scheduler = env.mock_index_build_scheduler(4);
1539 let metadata = Arc::new(sst_region_metadata());
1540 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1541 let region_id = metadata.region_id;
1542 let file_purger = Arc::new(NoopFilePurger {});
1543 let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
1544 let file_meta = FileMeta {
1545 region_id,
1546 file_id: sst_info.file_id,
1547 file_size: sst_info.file_size,
1548 index_file_size: sst_info.index_metadata.file_size,
1549 num_rows: sst_info.num_rows as u64,
1550 num_row_groups: sst_info.num_row_groups,
1551 ..Default::default()
1552 };
1553 let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1554 let version_control =
1555 mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1556 let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1557
1558 let (tx, mut rx) = mpsc::channel(4);
1560 let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1561 let task = IndexBuildTask {
1562 file_meta: file_meta.clone(),
1563 reason: IndexBuildType::Flush,
1564 access_layer: env.access_layer.clone(),
1565 listener: WorkerListener::default(),
1566 manifest_ctx,
1567 write_cache: None,
1568 file_purger,
1569 indexer_builder,
1570 request_sender: tx,
1571 result_sender: result_tx,
1572 };
1573
1574 scheduler
1575 .schedule_build(&version_control, task)
1576 .await
1577 .unwrap();
1578
1579 match result_rx.recv().await.unwrap() {
1581 Ok(outcome) => {
1582 assert_eq!(outcome, IndexBuildOutcome::Finished);
1583 }
1584 _ => panic!("Expect finished result"),
1585 }
1586
1587 let worker_req = rx.recv().await.unwrap().request;
1589 match worker_req {
1590 WorkerRequest::Background {
1591 region_id: req_region_id,
1592 notify: BackgroundNotify::IndexBuildFinished(finished),
1593 } => {
1594 assert_eq!(req_region_id, region_id);
1595 assert_eq!(finished.edit.files_to_add.len(), 1);
1596 let updated_meta = &finished.edit.files_to_add[0];
1597
1598 assert!(!updated_meta.available_indexes.is_empty());
1600 assert!(updated_meta.index_file_size > 0);
1601 assert_eq!(updated_meta.file_id, file_meta.file_id);
1602 }
1603 _ => panic!("Unexpected worker request: {:?}", worker_req),
1604 }
1605 }
1606
1607 async fn schedule_index_build_task_with_mode(build_mode: IndexBuildMode) {
1608 let env = SchedulerEnv::new().await;
1609 let mut scheduler = env.mock_index_build_scheduler(4);
1610 let metadata = Arc::new(sst_region_metadata());
1611 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1612 let file_purger = Arc::new(NoopFilePurger {});
1613 let region_id = metadata.region_id;
1614 let sst_info = mock_sst_file(metadata.clone(), &env, build_mode.clone()).await;
1615 let file_meta = FileMeta {
1616 region_id,
1617 file_id: sst_info.file_id,
1618 file_size: sst_info.file_size,
1619 index_file_size: sst_info.index_metadata.file_size,
1620 num_rows: sst_info.num_rows as u64,
1621 num_row_groups: sst_info.num_row_groups,
1622 ..Default::default()
1623 };
1624 let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1625 let version_control =
1626 mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1627 let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1628
1629 let (tx, _rx) = mpsc::channel(4);
1631 let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1632 let task = IndexBuildTask {
1633 file_meta: file_meta.clone(),
1634 reason: IndexBuildType::Flush,
1635 access_layer: env.access_layer.clone(),
1636 listener: WorkerListener::default(),
1637 manifest_ctx,
1638 write_cache: None,
1639 file_purger,
1640 indexer_builder,
1641 request_sender: tx,
1642 result_sender: result_tx,
1643 };
1644
1645 scheduler
1646 .schedule_build(&version_control, task)
1647 .await
1648 .unwrap();
1649
1650 let puffin_path = location::index_file_path(
1651 env.access_layer.table_dir(),
1652 RegionIndexId::new(RegionFileId::new(region_id, file_meta.file_id), 0),
1653 env.access_layer.path_type(),
1654 );
1655
1656 if build_mode == IndexBuildMode::Async {
1657 assert!(
1659 !env.access_layer
1660 .object_store()
1661 .exists(&puffin_path)
1662 .await
1663 .unwrap()
1664 );
1665 } else {
1666 assert!(
1668 env.access_layer
1669 .object_store()
1670 .exists(&puffin_path)
1671 .await
1672 .unwrap()
1673 );
1674 }
1675
1676 match result_rx.recv().await.unwrap() {
1678 Ok(outcome) => {
1679 assert_eq!(outcome, IndexBuildOutcome::Finished);
1680 }
1681 _ => panic!("Expect finished result"),
1682 }
1683
1684 assert!(
1686 env.access_layer
1687 .object_store()
1688 .exists(&puffin_path)
1689 .await
1690 .unwrap()
1691 );
1692 }
1693
1694 #[tokio::test]
1695 async fn test_index_build_task_build_mode() {
1696 schedule_index_build_task_with_mode(IndexBuildMode::Async).await;
1697 schedule_index_build_task_with_mode(IndexBuildMode::Sync).await;
1698 }
1699
1700 #[tokio::test]
1701 async fn test_index_build_task_no_index() {
1702 let env = SchedulerEnv::new().await;
1703 let mut scheduler = env.mock_index_build_scheduler(4);
1704 let mut metadata = sst_region_metadata();
1705 metadata.column_metadatas.iter_mut().for_each(|col| {
1707 col.column_schema.set_inverted_index(false);
1708 let _ = col.column_schema.unset_skipping_options();
1709 });
1710 let region_id = metadata.region_id;
1711 let metadata = Arc::new(metadata);
1712 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1713 let file_purger = Arc::new(NoopFilePurger {});
1714 let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
1715 let file_meta = FileMeta {
1716 region_id,
1717 file_id: sst_info.file_id,
1718 file_size: sst_info.file_size,
1719 index_file_size: sst_info.index_metadata.file_size,
1720 num_rows: sst_info.num_rows as u64,
1721 num_row_groups: sst_info.num_row_groups,
1722 ..Default::default()
1723 };
1724 let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1725 let version_control =
1726 mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1727 let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1728
1729 let (tx, mut rx) = mpsc::channel(4);
1731 let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1732 let task = IndexBuildTask {
1733 file_meta: file_meta.clone(),
1734 reason: IndexBuildType::Flush,
1735 access_layer: env.access_layer.clone(),
1736 listener: WorkerListener::default(),
1737 manifest_ctx,
1738 write_cache: None,
1739 file_purger,
1740 indexer_builder,
1741 request_sender: tx,
1742 result_sender: result_tx,
1743 };
1744
1745 scheduler
1746 .schedule_build(&version_control, task)
1747 .await
1748 .unwrap();
1749
1750 match result_rx.recv().await.unwrap() {
1752 Ok(outcome) => {
1753 assert_eq!(outcome, IndexBuildOutcome::Finished);
1754 }
1755 _ => panic!("Expect finished result"),
1756 }
1757
1758 let _ = rx.recv().await.is_none();
1760 }
1761
1762 #[tokio::test]
1763 async fn test_index_build_task_with_write_cache() {
1764 let env = SchedulerEnv::new().await;
1765 let mut scheduler = env.mock_index_build_scheduler(4);
1766 let metadata = Arc::new(sst_region_metadata());
1767 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1768 let file_purger = Arc::new(NoopFilePurger {});
1769 let region_id = metadata.region_id;
1770
1771 let (dir, factory) = PuffinManagerFactory::new_for_test_async("test_write_cache").await;
1772 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1773
1774 let write_cache = Arc::new(
1776 WriteCache::new_fs(
1777 dir.path().to_str().unwrap(),
1778 ReadableSize::mb(10),
1779 None,
1780 None,
1781 factory,
1782 intm_manager,
1783 ReadableSize::mb(10),
1784 )
1785 .await
1786 .unwrap(),
1787 );
1788 let indexer_builder = Arc::new(IndexerBuilderImpl {
1790 build_type: IndexBuildType::Flush,
1791 metadata: metadata.clone(),
1792 row_group_size: 1024,
1793 puffin_manager: write_cache.build_puffin_manager().clone(),
1794 write_cache_enabled: true,
1795 intermediate_manager: write_cache.intermediate_manager().clone(),
1796 index_options: IndexOptions::default(),
1797 inverted_index_config: InvertedIndexConfig::default(),
1798 fulltext_index_config: FulltextIndexConfig::default(),
1799 bloom_filter_index_config: BloomFilterConfig::default(),
1800 });
1801
1802 let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
1803 let file_meta = FileMeta {
1804 region_id,
1805 file_id: sst_info.file_id,
1806 file_size: sst_info.file_size,
1807 index_file_size: sst_info.index_metadata.file_size,
1808 num_rows: sst_info.num_rows as u64,
1809 num_row_groups: sst_info.num_row_groups,
1810 ..Default::default()
1811 };
1812 let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1813 let version_control =
1814 mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1815
1816 let (tx, mut _rx) = mpsc::channel(4);
1818 let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1819 let task = IndexBuildTask {
1820 file_meta: file_meta.clone(),
1821 reason: IndexBuildType::Flush,
1822 access_layer: env.access_layer.clone(),
1823 listener: WorkerListener::default(),
1824 manifest_ctx,
1825 write_cache: Some(write_cache.clone()),
1826 file_purger,
1827 indexer_builder,
1828 request_sender: tx,
1829 result_sender: result_tx,
1830 };
1831
1832 scheduler
1833 .schedule_build(&version_control, task)
1834 .await
1835 .unwrap();
1836
1837 match result_rx.recv().await.unwrap() {
1839 Ok(outcome) => {
1840 assert_eq!(outcome, IndexBuildOutcome::Finished);
1841 }
1842 _ => panic!("Expect finished result"),
1843 }
1844
1845 let index_key = IndexKey::new(
1847 region_id,
1848 file_meta.file_id,
1849 FileType::Puffin(sst_info.index_metadata.version),
1850 );
1851 assert!(write_cache.file_cache().contains_key(&index_key));
1852 }
1853
1854 async fn create_mock_task_for_schedule(
1855 env: &SchedulerEnv,
1856 file_id: FileId,
1857 region_id: RegionId,
1858 reason: IndexBuildType,
1859 ) -> IndexBuildTask {
1860 let metadata = Arc::new(sst_region_metadata());
1861 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1862 let file_purger = Arc::new(NoopFilePurger {});
1863 let indexer_builder = mock_indexer_builder(metadata, env).await;
1864 let (tx, _rx) = mpsc::channel(4);
1865 let (result_tx, _result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1866
1867 IndexBuildTask {
1868 file_meta: FileMeta {
1869 region_id,
1870 file_id,
1871 file_size: 100,
1872 ..Default::default()
1873 },
1874 reason,
1875 access_layer: env.access_layer.clone(),
1876 listener: WorkerListener::default(),
1877 manifest_ctx,
1878 write_cache: None,
1879 file_purger,
1880 indexer_builder,
1881 request_sender: tx,
1882 result_sender: result_tx,
1883 }
1884 }
1885
1886 #[tokio::test]
1887 async fn test_scheduler_comprehensive() {
1888 let env = SchedulerEnv::new().await;
1889 let mut scheduler = env.mock_index_build_scheduler(2);
1890 let metadata = Arc::new(sst_region_metadata());
1891 let region_id = metadata.region_id;
1892 let file_purger = Arc::new(NoopFilePurger {});
1893
1894 let file_id1 = FileId::random();
1896 let file_id2 = FileId::random();
1897 let file_id3 = FileId::random();
1898 let file_id4 = FileId::random();
1899 let file_id5 = FileId::random();
1900
1901 let mut files = HashMap::new();
1902 for file_id in [file_id1, file_id2, file_id3, file_id4, file_id5] {
1903 files.insert(
1904 file_id,
1905 FileMeta {
1906 region_id,
1907 file_id,
1908 file_size: 100,
1909 ..Default::default()
1910 },
1911 );
1912 }
1913
1914 let version_control = mock_version_control(metadata, file_purger, files).await;
1915
1916 let task1 =
1918 create_mock_task_for_schedule(&env, file_id1, region_id, IndexBuildType::Flush).await;
1919 assert!(
1920 scheduler
1921 .schedule_build(&version_control, task1)
1922 .await
1923 .is_ok()
1924 );
1925 assert!(scheduler.region_status.contains_key(®ion_id));
1926 let status = scheduler.region_status.get(®ion_id).unwrap();
1927 assert_eq!(status.building_files.len(), 1);
1928 assert!(status.building_files.contains(&file_id1));
1929
1930 let task1_dup =
1932 create_mock_task_for_schedule(&env, file_id1, region_id, IndexBuildType::Flush).await;
1933 scheduler
1934 .schedule_build(&version_control, task1_dup)
1935 .await
1936 .unwrap();
1937 let status = scheduler.region_status.get(®ion_id).unwrap();
1938 assert_eq!(status.building_files.len(), 1); let task2 =
1942 create_mock_task_for_schedule(&env, file_id2, region_id, IndexBuildType::Flush).await;
1943 scheduler
1944 .schedule_build(&version_control, task2)
1945 .await
1946 .unwrap();
1947 let status = scheduler.region_status.get(®ion_id).unwrap();
1948 assert_eq!(status.building_files.len(), 2); assert_eq!(status.pending_tasks.len(), 0);
1950
1951 let task3 =
1954 create_mock_task_for_schedule(&env, file_id3, region_id, IndexBuildType::Compact).await;
1955 let task4 =
1956 create_mock_task_for_schedule(&env, file_id4, region_id, IndexBuildType::SchemaChange)
1957 .await;
1958 let task5 =
1959 create_mock_task_for_schedule(&env, file_id5, region_id, IndexBuildType::Manual).await;
1960
1961 scheduler
1962 .schedule_build(&version_control, task3)
1963 .await
1964 .unwrap();
1965 scheduler
1966 .schedule_build(&version_control, task4)
1967 .await
1968 .unwrap();
1969 scheduler
1970 .schedule_build(&version_control, task5)
1971 .await
1972 .unwrap();
1973
1974 let status = scheduler.region_status.get(®ion_id).unwrap();
1975 assert_eq!(status.building_files.len(), 2); assert_eq!(status.pending_tasks.len(), 3); scheduler.on_task_stopped(region_id, file_id1, &version_control);
1980 let status = scheduler.region_status.get(®ion_id).unwrap();
1981 assert!(!status.building_files.contains(&file_id1));
1982 assert_eq!(status.building_files.len(), 2); assert_eq!(status.pending_tasks.len(), 2); assert!(status.building_files.contains(&file_id5));
1986
1987 scheduler.on_task_stopped(region_id, file_id2, &version_control);
1989 let status = scheduler.region_status.get(®ion_id).unwrap();
1990 assert_eq!(status.building_files.len(), 2);
1991 assert_eq!(status.pending_tasks.len(), 1); assert!(status.building_files.contains(&file_id4)); scheduler.on_task_stopped(region_id, file_id5, &version_control);
1996 scheduler.on_task_stopped(region_id, file_id4, &version_control);
1997
1998 let status = scheduler.region_status.get(®ion_id).unwrap();
1999 assert_eq!(status.building_files.len(), 1); assert_eq!(status.pending_tasks.len(), 0);
2001 assert!(status.building_files.contains(&file_id3));
2002
2003 scheduler.on_task_stopped(region_id, file_id3, &version_control);
2004
2005 assert!(!scheduler.region_status.contains_key(®ion_id));
2007
2008 let task6 =
2010 create_mock_task_for_schedule(&env, file_id1, region_id, IndexBuildType::Flush).await;
2011 let task7 =
2012 create_mock_task_for_schedule(&env, file_id2, region_id, IndexBuildType::Flush).await;
2013 let task8 =
2014 create_mock_task_for_schedule(&env, file_id3, region_id, IndexBuildType::Manual).await;
2015
2016 scheduler
2017 .schedule_build(&version_control, task6)
2018 .await
2019 .unwrap();
2020 scheduler
2021 .schedule_build(&version_control, task7)
2022 .await
2023 .unwrap();
2024 scheduler
2025 .schedule_build(&version_control, task8)
2026 .await
2027 .unwrap();
2028
2029 assert!(scheduler.region_status.contains_key(®ion_id));
2030 let status = scheduler.region_status.get(®ion_id).unwrap();
2031 assert_eq!(status.building_files.len(), 2);
2032 assert_eq!(status.pending_tasks.len(), 1);
2033
2034 scheduler.on_region_dropped(region_id).await;
2035 assert!(!scheduler.region_status.contains_key(®ion_id));
2036 }
2037}