1pub(crate) mod bloom_filter;
16pub(crate) mod fulltext_index;
17mod indexer;
18pub mod intermediate;
19pub(crate) mod inverted_index;
20pub mod puffin_manager;
21mod statistics;
22pub(crate) mod store;
23
24use std::cmp::Ordering;
25use std::collections::{BinaryHeap, HashMap, HashSet};
26use std::num::NonZeroUsize;
27use std::sync::Arc;
28
29use bloom_filter::creator::BloomFilterIndexer;
30use common_telemetry::{debug, error, info, warn};
31use datatypes::arrow::array::BinaryArray;
32use datatypes::arrow::record_batch::RecordBatch;
33use mito_codec::index::IndexValuesCodec;
34use mito_codec::row_converter::CompositeValues;
35use puffin_manager::SstPuffinManager;
36use smallvec::{SmallVec, smallvec};
37use snafu::{OptionExt, ResultExt};
38use statistics::{ByteCount, RowCount};
39use store_api::metadata::RegionMetadataRef;
40use store_api::storage::{ColumnId, FileId, RegionId};
41use strum::IntoStaticStr;
42use tokio::sync::mpsc::Sender;
43
44use crate::access_layer::{AccessLayerRef, FilePathProvider, OperationType, RegionFilePathFactory};
45use crate::cache::file_cache::{FileType, IndexKey};
46use crate::cache::write_cache::{UploadTracker, WriteCacheRef};
47use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
48use crate::error::{
49 BuildIndexAsyncSnafu, DecodeSnafu, Error, InvalidRecordBatchSnafu, RegionClosedSnafu,
50 RegionDroppedSnafu, RegionTruncatedSnafu, Result,
51};
52use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
53use crate::metrics::INDEX_CREATE_MEMORY_USAGE;
54use crate::read::{Batch, BatchReader};
55use crate::region::options::IndexOptions;
56use crate::region::version::VersionControlRef;
57use crate::region::{ManifestContextRef, RegionLeaderState};
58use crate::request::{
59 BackgroundNotify, IndexBuildFailed, IndexBuildFinished, IndexBuildStopped, WorkerRequest,
60 WorkerRequestWithTime,
61};
62use crate::schedule::scheduler::{Job, SchedulerRef};
63use crate::sst::file::{
64 ColumnIndexMetadata, FileHandle, FileMeta, IndexType, IndexTypes, RegionFileId,
65};
66use crate::sst::file_purger::FilePurgerRef;
67use crate::sst::index::fulltext_index::creator::FulltextIndexer;
68use crate::sst::index::intermediate::IntermediateManager;
69use crate::sst::index::inverted_index::creator::InvertedIndexer;
70use crate::sst::parquet::SstInfo;
71use crate::sst::parquet::flat_format::primary_key_column_index;
72use crate::sst::parquet::format::PrimaryKeyArray;
73use crate::worker::WorkerListener;
74
75pub(crate) const TYPE_INVERTED_INDEX: &str = "inverted_index";
76pub(crate) const TYPE_FULLTEXT_INDEX: &str = "fulltext_index";
77pub(crate) const TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
78
79#[derive(Debug, Clone, Default)]
81pub struct IndexOutput {
82 pub file_size: u64,
84 pub inverted_index: InvertedIndexOutput,
86 pub fulltext_index: FulltextIndexOutput,
88 pub bloom_filter: BloomFilterOutput,
90}
91
92impl IndexOutput {
93 pub fn build_available_indexes(&self) -> SmallVec<[IndexType; 4]> {
94 let mut indexes = SmallVec::new();
95 if self.inverted_index.is_available() {
96 indexes.push(IndexType::InvertedIndex);
97 }
98 if self.fulltext_index.is_available() {
99 indexes.push(IndexType::FulltextIndex);
100 }
101 if self.bloom_filter.is_available() {
102 indexes.push(IndexType::BloomFilterIndex);
103 }
104 indexes
105 }
106
107 pub fn build_indexes(&self) -> Vec<ColumnIndexMetadata> {
108 let mut map: HashMap<ColumnId, IndexTypes> = HashMap::new();
109
110 if self.inverted_index.is_available() {
111 for &col in &self.inverted_index.columns {
112 map.entry(col).or_default().push(IndexType::InvertedIndex);
113 }
114 }
115 if self.fulltext_index.is_available() {
116 for &col in &self.fulltext_index.columns {
117 map.entry(col).or_default().push(IndexType::FulltextIndex);
118 }
119 }
120 if self.bloom_filter.is_available() {
121 for &col in &self.bloom_filter.columns {
122 map.entry(col)
123 .or_default()
124 .push(IndexType::BloomFilterIndex);
125 }
126 }
127
128 map.into_iter()
129 .map(|(column_id, created_indexes)| ColumnIndexMetadata {
130 column_id,
131 created_indexes,
132 })
133 .collect::<Vec<_>>()
134 }
135}
136
137#[derive(Debug, Clone, Default)]
139pub struct IndexBaseOutput {
140 pub index_size: ByteCount,
142 pub row_count: RowCount,
144 pub columns: Vec<ColumnId>,
146}
147
148impl IndexBaseOutput {
149 pub fn is_available(&self) -> bool {
150 self.index_size > 0
151 }
152}
153
154pub type InvertedIndexOutput = IndexBaseOutput;
156pub type FulltextIndexOutput = IndexBaseOutput;
158pub type BloomFilterOutput = IndexBaseOutput;
160
161#[derive(Default)]
163pub struct Indexer {
164 file_id: FileId,
165 region_id: RegionId,
166 puffin_manager: Option<SstPuffinManager>,
167 inverted_indexer: Option<InvertedIndexer>,
168 last_mem_inverted_index: usize,
169 fulltext_indexer: Option<FulltextIndexer>,
170 last_mem_fulltext_index: usize,
171 bloom_filter_indexer: Option<BloomFilterIndexer>,
172 last_mem_bloom_filter: usize,
173 intermediate_manager: Option<IntermediateManager>,
174}
175
176impl Indexer {
177 pub async fn update(&mut self, batch: &mut Batch) {
179 self.do_update(batch).await;
180
181 self.flush_mem_metrics();
182 }
183
184 pub async fn update_flat(&mut self, batch: &RecordBatch) {
186 self.do_update_flat(batch).await;
187
188 self.flush_mem_metrics();
189 }
190
191 pub async fn finish(&mut self) -> IndexOutput {
193 let output = self.do_finish().await;
194
195 self.flush_mem_metrics();
196 output
197 }
198
199 pub async fn abort(&mut self) {
201 self.do_abort().await;
202
203 self.flush_mem_metrics();
204 }
205
206 fn flush_mem_metrics(&mut self) {
207 let inverted_mem = self
208 .inverted_indexer
209 .as_ref()
210 .map_or(0, |creator| creator.memory_usage());
211 INDEX_CREATE_MEMORY_USAGE
212 .with_label_values(&[TYPE_INVERTED_INDEX])
213 .add(inverted_mem as i64 - self.last_mem_inverted_index as i64);
214 self.last_mem_inverted_index = inverted_mem;
215
216 let fulltext_mem = self
217 .fulltext_indexer
218 .as_ref()
219 .map_or(0, |creator| creator.memory_usage());
220 INDEX_CREATE_MEMORY_USAGE
221 .with_label_values(&[TYPE_FULLTEXT_INDEX])
222 .add(fulltext_mem as i64 - self.last_mem_fulltext_index as i64);
223 self.last_mem_fulltext_index = fulltext_mem;
224
225 let bloom_filter_mem = self
226 .bloom_filter_indexer
227 .as_ref()
228 .map_or(0, |creator| creator.memory_usage());
229 INDEX_CREATE_MEMORY_USAGE
230 .with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
231 .add(bloom_filter_mem as i64 - self.last_mem_bloom_filter as i64);
232 self.last_mem_bloom_filter = bloom_filter_mem;
233 }
234}
235
236#[async_trait::async_trait]
237pub trait IndexerBuilder {
238 async fn build(&self, file_id: FileId) -> Indexer;
240}
241#[derive(Clone)]
242pub(crate) struct IndexerBuilderImpl {
243 pub(crate) build_type: IndexBuildType,
244 pub(crate) metadata: RegionMetadataRef,
245 pub(crate) row_group_size: usize,
246 pub(crate) puffin_manager: SstPuffinManager,
247 pub(crate) intermediate_manager: IntermediateManager,
248 pub(crate) index_options: IndexOptions,
249 pub(crate) inverted_index_config: InvertedIndexConfig,
250 pub(crate) fulltext_index_config: FulltextIndexConfig,
251 pub(crate) bloom_filter_index_config: BloomFilterConfig,
252}
253
254#[async_trait::async_trait]
255impl IndexerBuilder for IndexerBuilderImpl {
256 async fn build(&self, file_id: FileId) -> Indexer {
258 let mut indexer = Indexer {
259 file_id,
260 region_id: self.metadata.region_id,
261 ..Default::default()
262 };
263
264 indexer.inverted_indexer = self.build_inverted_indexer(file_id);
265 indexer.fulltext_indexer = self.build_fulltext_indexer(file_id).await;
266 indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(file_id);
267 indexer.intermediate_manager = Some(self.intermediate_manager.clone());
268 if indexer.inverted_indexer.is_none()
269 && indexer.fulltext_indexer.is_none()
270 && indexer.bloom_filter_indexer.is_none()
271 {
272 indexer.abort().await;
273 return Indexer::default();
274 }
275
276 indexer.puffin_manager = Some(self.puffin_manager.clone());
277 indexer
278 }
279}
280
281impl IndexerBuilderImpl {
282 fn build_inverted_indexer(&self, file_id: FileId) -> Option<InvertedIndexer> {
283 let create = match self.build_type {
284 IndexBuildType::Flush => self.inverted_index_config.create_on_flush.auto(),
285 IndexBuildType::Compact => self.inverted_index_config.create_on_compaction.auto(),
286 _ => true,
287 };
288
289 if !create {
290 debug!(
291 "Skip creating inverted index due to config, region_id: {}, file_id: {}",
292 self.metadata.region_id, file_id,
293 );
294 return None;
295 }
296
297 let indexed_column_ids = self.metadata.inverted_indexed_column_ids(
298 self.index_options.inverted_index.ignore_column_ids.iter(),
299 );
300 if indexed_column_ids.is_empty() {
301 debug!(
302 "No columns to be indexed, skip creating inverted index, region_id: {}, file_id: {}",
303 self.metadata.region_id, file_id,
304 );
305 return None;
306 }
307
308 let Some(mut segment_row_count) =
309 NonZeroUsize::new(self.index_options.inverted_index.segment_row_count)
310 else {
311 warn!(
312 "Segment row count is 0, skip creating index, region_id: {}, file_id: {}",
313 self.metadata.region_id, file_id,
314 );
315 return None;
316 };
317
318 let Some(row_group_size) = NonZeroUsize::new(self.row_group_size) else {
319 warn!(
320 "Row group size is 0, skip creating index, region_id: {}, file_id: {}",
321 self.metadata.region_id, file_id,
322 );
323 return None;
324 };
325
326 if row_group_size.get() % segment_row_count.get() != 0 {
328 segment_row_count = row_group_size;
329 }
330
331 let indexer = InvertedIndexer::new(
332 file_id,
333 &self.metadata,
334 self.intermediate_manager.clone(),
335 self.inverted_index_config.mem_threshold_on_create(),
336 segment_row_count,
337 indexed_column_ids,
338 );
339
340 Some(indexer)
341 }
342
343 async fn build_fulltext_indexer(&self, file_id: FileId) -> Option<FulltextIndexer> {
344 let create = match self.build_type {
345 IndexBuildType::Flush => self.fulltext_index_config.create_on_flush.auto(),
346 IndexBuildType::Compact => self.fulltext_index_config.create_on_compaction.auto(),
347 _ => true,
348 };
349
350 if !create {
351 debug!(
352 "Skip creating full-text index due to config, region_id: {}, file_id: {}",
353 self.metadata.region_id, file_id,
354 );
355 return None;
356 }
357
358 let mem_limit = self.fulltext_index_config.mem_threshold_on_create();
359 let creator = FulltextIndexer::new(
360 &self.metadata.region_id,
361 &file_id,
362 &self.intermediate_manager,
363 &self.metadata,
364 self.fulltext_index_config.compress,
365 mem_limit,
366 )
367 .await;
368
369 let err = match creator {
370 Ok(creator) => {
371 if creator.is_none() {
372 debug!(
373 "Skip creating full-text index due to no columns require indexing, region_id: {}, file_id: {}",
374 self.metadata.region_id, file_id,
375 );
376 }
377 return creator;
378 }
379 Err(err) => err,
380 };
381
382 if cfg!(any(test, feature = "test")) {
383 panic!(
384 "Failed to create full-text indexer, region_id: {}, file_id: {}, err: {:?}",
385 self.metadata.region_id, file_id, err
386 );
387 } else {
388 warn!(
389 err; "Failed to create full-text indexer, region_id: {}, file_id: {}",
390 self.metadata.region_id, file_id,
391 );
392 }
393
394 None
395 }
396
397 fn build_bloom_filter_indexer(&self, file_id: FileId) -> Option<BloomFilterIndexer> {
398 let create = match self.build_type {
399 IndexBuildType::Flush => self.bloom_filter_index_config.create_on_flush.auto(),
400 IndexBuildType::Compact => self.bloom_filter_index_config.create_on_compaction.auto(),
401 _ => true,
402 };
403
404 if !create {
405 debug!(
406 "Skip creating bloom filter due to config, region_id: {}, file_id: {}",
407 self.metadata.region_id, file_id,
408 );
409 return None;
410 }
411
412 let mem_limit = self.bloom_filter_index_config.mem_threshold_on_create();
413 let indexer = BloomFilterIndexer::new(
414 file_id,
415 &self.metadata,
416 self.intermediate_manager.clone(),
417 mem_limit,
418 );
419
420 let err = match indexer {
421 Ok(indexer) => {
422 if indexer.is_none() {
423 debug!(
424 "Skip creating bloom filter due to no columns require indexing, region_id: {}, file_id: {}",
425 self.metadata.region_id, file_id,
426 );
427 }
428 return indexer;
429 }
430 Err(err) => err,
431 };
432
433 if cfg!(any(test, feature = "test")) {
434 panic!(
435 "Failed to create bloom filter, region_id: {}, file_id: {}, err: {:?}",
436 self.metadata.region_id, file_id, err
437 );
438 } else {
439 warn!(
440 err; "Failed to create bloom filter, region_id: {}, file_id: {}",
441 self.metadata.region_id, file_id,
442 );
443 }
444
445 None
446 }
447}
448
449#[derive(Debug, Clone, IntoStaticStr, PartialEq)]
451pub enum IndexBuildType {
452 SchemaChange,
454 Flush,
456 Compact,
458 Manual,
460}
461
462impl IndexBuildType {
463 fn as_str(&self) -> &'static str {
464 self.into()
465 }
466
467 fn priority(&self) -> u8 {
469 match self {
470 IndexBuildType::Manual => 3,
471 IndexBuildType::SchemaChange => 2,
472 IndexBuildType::Flush => 1,
473 IndexBuildType::Compact => 0,
474 }
475 }
476}
477
478impl From<OperationType> for IndexBuildType {
479 fn from(op_type: OperationType) -> Self {
480 match op_type {
481 OperationType::Flush => IndexBuildType::Flush,
482 OperationType::Compact => IndexBuildType::Compact,
483 }
484 }
485}
486
487#[derive(Debug, Clone, PartialEq, Eq, Hash)]
489pub enum IndexBuildOutcome {
490 Finished,
491 Aborted(String),
492}
493
494pub type ResultMpscSender = Sender<Result<IndexBuildOutcome>>;
496
497#[derive(Clone)]
498pub struct IndexBuildTask {
499 pub file_meta: FileMeta,
501 pub reason: IndexBuildType,
502 pub access_layer: AccessLayerRef,
503 pub(crate) listener: WorkerListener,
504 pub(crate) manifest_ctx: ManifestContextRef,
505 pub write_cache: Option<WriteCacheRef>,
506 pub file_purger: FilePurgerRef,
507 pub indexer_builder: Arc<dyn IndexerBuilder + Send + Sync>,
510 pub(crate) request_sender: Sender<WorkerRequestWithTime>,
512 pub(crate) result_sender: ResultMpscSender,
514}
515
516impl std::fmt::Debug for IndexBuildTask {
517 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
518 f.debug_struct("IndexBuildTask")
519 .field("region_id", &self.file_meta.region_id)
520 .field("file_id", &self.file_meta.file_id)
521 .field("reason", &self.reason)
522 .finish()
523 }
524}
525
526impl IndexBuildTask {
527 pub async fn on_success(&self, outcome: IndexBuildOutcome) {
529 let _ = self.result_sender.send(Ok(outcome)).await;
530 }
531
532 pub async fn on_failure(&self, err: Arc<Error>) {
534 let _ = self
535 .result_sender
536 .send(Err(err.clone()).context(BuildIndexAsyncSnafu {
537 region_id: self.file_meta.region_id,
538 }))
539 .await;
540 }
541
542 fn into_index_build_job(mut self, version_control: VersionControlRef) -> Job {
543 Box::pin(async move {
544 self.do_index_build(version_control).await;
545 })
546 }
547
548 async fn do_index_build(&mut self, version_control: VersionControlRef) {
549 self.listener
550 .on_index_build_begin(RegionFileId::new(
551 self.file_meta.region_id,
552 self.file_meta.file_id,
553 ))
554 .await;
555 match self.index_build(version_control).await {
556 Ok(outcome) => self.on_success(outcome).await,
557 Err(e) => {
558 warn!(
559 e; "Index build task failed, region: {}, file_id: {}",
560 self.file_meta.region_id, self.file_meta.file_id,
561 );
562 self.on_failure(e.into()).await
563 }
564 }
565 let worker_request = WorkerRequest::Background {
566 region_id: self.file_meta.region_id,
567 notify: BackgroundNotify::IndexBuildStopped(IndexBuildStopped {
568 region_id: self.file_meta.region_id,
569 file_id: self.file_meta.file_id,
570 }),
571 };
572 let _ = self
573 .request_sender
574 .send(WorkerRequestWithTime::new(worker_request))
575 .await;
576 }
577
578 async fn check_sst_file_exists(&self, version_control: &VersionControlRef) -> bool {
580 let file_id = self.file_meta.file_id;
581 let level = self.file_meta.level;
582 let version = version_control.current().version;
584
585 let Some(level_files) = version.ssts.levels().get(level as usize) else {
586 warn!(
587 "File id {} not found in level {} for index build, region: {}",
588 file_id, level, self.file_meta.region_id
589 );
590 return false;
591 };
592
593 match level_files.files.get(&file_id) {
594 Some(handle) if !handle.is_deleted() && !handle.compacting() => {
595 true
599 }
600 _ => {
601 warn!(
602 "File id {} not found in region version for index build, region: {}",
603 file_id, self.file_meta.region_id
604 );
605 false
606 }
607 }
608 }
609
610 async fn index_build(
611 &mut self,
612 version_control: VersionControlRef,
613 ) -> Result<IndexBuildOutcome> {
614 let index_file_id = if self.file_meta.index_file_size > 0 {
615 FileId::random()
617 } else {
618 self.file_meta.file_id
619 };
620 let mut indexer = self.indexer_builder.build(index_file_id).await;
621
622 if !self.check_sst_file_exists(&version_control).await {
624 indexer.abort().await;
626 self.listener
627 .on_index_build_abort(RegionFileId::new(
628 self.file_meta.region_id,
629 self.file_meta.file_id,
630 ))
631 .await;
632 return Ok(IndexBuildOutcome::Aborted(format!(
633 "SST file not found during index build, region: {}, file_id: {}",
634 self.file_meta.region_id, self.file_meta.file_id
635 )));
636 }
637
638 let mut parquet_reader = self
639 .access_layer
640 .read_sst(FileHandle::new(
641 self.file_meta.clone(),
642 self.file_purger.clone(),
643 ))
644 .build()
645 .await?;
646
647 loop {
649 match parquet_reader.next_batch().await {
650 Ok(Some(batch)) => {
651 indexer.update(&mut batch.clone()).await;
652 }
653 Ok(None) => break,
654 Err(e) => {
655 indexer.abort().await;
656 return Err(e);
657 }
658 }
659 }
660 let index_output = indexer.finish().await;
661
662 if index_output.file_size > 0 {
663 if !self.check_sst_file_exists(&version_control).await {
665 indexer.abort().await;
667 self.listener
668 .on_index_build_abort(RegionFileId::new(
669 self.file_meta.region_id,
670 self.file_meta.file_id,
671 ))
672 .await;
673 return Ok(IndexBuildOutcome::Aborted(format!(
674 "SST file not found during index build, region: {}, file_id: {}",
675 self.file_meta.region_id, self.file_meta.file_id
676 )));
677 }
678
679 self.maybe_upload_index_file(index_output.clone(), index_file_id)
681 .await?;
682
683 let worker_request = match self.update_manifest(index_output, index_file_id).await {
684 Ok(edit) => {
685 let index_build_finished = IndexBuildFinished {
686 region_id: self.file_meta.region_id,
687 edit,
688 };
689 WorkerRequest::Background {
690 region_id: self.file_meta.region_id,
691 notify: BackgroundNotify::IndexBuildFinished(index_build_finished),
692 }
693 }
694 Err(e) => {
695 let err = Arc::new(e);
696 WorkerRequest::Background {
697 region_id: self.file_meta.region_id,
698 notify: BackgroundNotify::IndexBuildFailed(IndexBuildFailed { err }),
699 }
700 }
701 };
702
703 let _ = self
704 .request_sender
705 .send(WorkerRequestWithTime::new(worker_request))
706 .await;
707 }
708 Ok(IndexBuildOutcome::Finished)
709 }
710
711 async fn maybe_upload_index_file(
712 &self,
713 output: IndexOutput,
714 index_file_id: FileId,
715 ) -> Result<()> {
716 if let Some(write_cache) = &self.write_cache {
717 let file_id = self.file_meta.file_id;
718 let region_id = self.file_meta.region_id;
719 let remote_store = self.access_layer.object_store();
720 let mut upload_tracker = UploadTracker::new(region_id);
721 let mut err = None;
722 let puffin_key = IndexKey::new(region_id, index_file_id, FileType::Puffin);
723 let puffin_path = RegionFilePathFactory::new(
724 self.access_layer.table_dir().to_string(),
725 self.access_layer.path_type(),
726 )
727 .build_index_file_path(RegionFileId::new(region_id, file_id));
728 if let Err(e) = write_cache
729 .upload(puffin_key, &puffin_path, remote_store)
730 .await
731 {
732 err = Some(e);
733 }
734 upload_tracker.push_uploaded_file(puffin_path);
735 if let Some(err) = err {
736 upload_tracker
738 .clean(
739 &smallvec![SstInfo {
740 file_id,
741 index_metadata: output,
742 ..Default::default()
743 }],
744 &write_cache.file_cache(),
745 remote_store,
746 )
747 .await;
748 return Err(err);
749 }
750 } else {
751 debug!("write cache is not available, skip uploading index file");
752 }
753 Ok(())
754 }
755
756 async fn update_manifest(
757 &mut self,
758 output: IndexOutput,
759 index_file_id: FileId,
760 ) -> Result<RegionEdit> {
761 self.file_meta.available_indexes = output.build_available_indexes();
762 self.file_meta.indexes = output.build_indexes();
763 self.file_meta.index_file_size = output.file_size;
764 self.file_meta.index_file_id = Some(index_file_id);
765 let edit = RegionEdit {
766 files_to_add: vec![self.file_meta.clone()],
767 files_to_remove: vec![],
768 timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
769 flushed_sequence: None,
770 flushed_entry_id: None,
771 committed_sequence: None,
772 compaction_time_window: None,
773 };
774 let version = self
775 .manifest_ctx
776 .update_manifest(
777 RegionLeaderState::Writable,
778 RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())),
779 )
780 .await?;
781 info!(
782 "Successfully update manifest version to {version}, region: {}, reason: {}",
783 self.file_meta.region_id,
784 self.reason.as_str()
785 );
786 Ok(edit)
787 }
788}
789
790impl PartialEq for IndexBuildTask {
791 fn eq(&self, other: &Self) -> bool {
792 self.reason.priority() == other.reason.priority()
793 }
794}
795
796impl Eq for IndexBuildTask {}
797
798impl PartialOrd for IndexBuildTask {
799 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
800 Some(self.cmp(other))
801 }
802}
803
804impl Ord for IndexBuildTask {
805 fn cmp(&self, other: &Self) -> Ordering {
806 self.reason.priority().cmp(&other.reason.priority())
807 }
808}
809
810pub struct IndexBuildStatus {
812 pub region_id: RegionId,
813 pub building_files: HashSet<FileId>,
814 pub pending_tasks: BinaryHeap<IndexBuildTask>,
815}
816
817impl IndexBuildStatus {
818 pub fn new(region_id: RegionId) -> Self {
819 IndexBuildStatus {
820 region_id,
821 building_files: HashSet::new(),
822 pending_tasks: BinaryHeap::new(),
823 }
824 }
825
826 async fn on_failure(self, err: Arc<Error>) {
827 for task in self.pending_tasks {
828 task.on_failure(err.clone()).await;
829 }
830 }
831}
832
833pub struct IndexBuildScheduler {
834 scheduler: SchedulerRef,
836 region_status: HashMap<RegionId, IndexBuildStatus>,
838 files_limit: usize,
840}
841
842impl IndexBuildScheduler {
844 pub fn new(scheduler: SchedulerRef, files_limit: usize) -> Self {
845 IndexBuildScheduler {
846 scheduler,
847 region_status: HashMap::new(),
848 files_limit,
849 }
850 }
851
852 pub(crate) async fn schedule_build(
853 &mut self,
854 version_control: &VersionControlRef,
855 task: IndexBuildTask,
856 ) -> Result<()> {
857 let status = self
858 .region_status
859 .entry(task.file_meta.region_id)
860 .or_insert_with(|| IndexBuildStatus::new(task.file_meta.region_id));
861
862 if status.building_files.contains(&task.file_meta.file_id) {
863 let region_file_id =
864 RegionFileId::new(task.file_meta.region_id, task.file_meta.file_id);
865 debug!(
866 "Aborting index build task since index is already being built for region file {:?}",
867 region_file_id
868 );
869 task.on_success(IndexBuildOutcome::Aborted(format!(
870 "Index is already being built for region file {:?}",
871 region_file_id
872 )))
873 .await;
874 task.listener.on_index_build_abort(region_file_id).await;
875 return Ok(());
876 }
877
878 status.pending_tasks.push(task);
879
880 self.schedule_next_build_batch(version_control);
881 Ok(())
882 }
883
884 fn schedule_next_build_batch(&mut self, version_control: &VersionControlRef) {
886 let mut building_count = 0;
887 for status in self.region_status.values() {
888 building_count += status.building_files.len();
889 }
890
891 while building_count < self.files_limit {
892 if let Some(task) = self.find_next_task() {
893 let region_id = task.file_meta.region_id;
894 let file_id = task.file_meta.file_id;
895 let job = task.into_index_build_job(version_control.clone());
896 if self.scheduler.schedule(job).is_ok() {
897 if let Some(status) = self.region_status.get_mut(®ion_id) {
898 status.building_files.insert(file_id);
899 building_count += 1;
900 status
901 .pending_tasks
902 .retain(|t| t.file_meta.file_id != file_id);
903 } else {
904 error!(
905 "Region status not found when scheduling index build task, region: {}",
906 region_id
907 );
908 }
909 } else {
910 error!(
911 "Failed to schedule index build job, region: {}, file_id: {}",
912 region_id, file_id
913 );
914 }
915 } else {
916 break;
918 }
919 }
920 }
921
922 fn find_next_task(&self) -> Option<IndexBuildTask> {
924 self.region_status
925 .iter()
926 .filter_map(|(_, status)| status.pending_tasks.peek())
927 .max()
928 .cloned()
929 }
930
931 pub(crate) fn on_task_stopped(
932 &mut self,
933 region_id: RegionId,
934 file_id: FileId,
935 version_control: &VersionControlRef,
936 ) {
937 if let Some(status) = self.region_status.get_mut(®ion_id) {
938 status.building_files.remove(&file_id);
939 if status.building_files.is_empty() && status.pending_tasks.is_empty() {
940 self.region_status.remove(®ion_id);
942 }
943 }
944
945 self.schedule_next_build_batch(version_control);
946 }
947
948 pub(crate) async fn on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
949 error!(
950 err; "Index build scheduler encountered failure for region {}, removing all pending tasks.",
951 region_id
952 );
953 let Some(status) = self.region_status.remove(®ion_id) else {
954 return;
955 };
956 status.on_failure(err).await;
957 }
958
959 pub(crate) async fn on_region_dropped(&mut self, region_id: RegionId) {
961 self.remove_region_on_failure(
962 region_id,
963 Arc::new(RegionDroppedSnafu { region_id }.build()),
964 )
965 .await;
966 }
967
968 pub(crate) async fn on_region_closed(&mut self, region_id: RegionId) {
970 self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()))
971 .await;
972 }
973
974 pub(crate) async fn on_region_truncated(&mut self, region_id: RegionId) {
976 self.remove_region_on_failure(
977 region_id,
978 Arc::new(RegionTruncatedSnafu { region_id }.build()),
979 )
980 .await;
981 }
982
983 async fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
984 let Some(status) = self.region_status.remove(®ion_id) else {
985 return;
986 };
987 status.on_failure(err).await;
988 }
989}
990
991pub(crate) fn decode_primary_keys_with_counts(
994 batch: &RecordBatch,
995 codec: &IndexValuesCodec,
996) -> Result<Vec<(CompositeValues, usize)>> {
997 let primary_key_index = primary_key_column_index(batch.num_columns());
998 let pk_dict_array = batch
999 .column(primary_key_index)
1000 .as_any()
1001 .downcast_ref::<PrimaryKeyArray>()
1002 .context(InvalidRecordBatchSnafu {
1003 reason: "Primary key column is not a dictionary array",
1004 })?;
1005 let pk_values_array = pk_dict_array
1006 .values()
1007 .as_any()
1008 .downcast_ref::<BinaryArray>()
1009 .context(InvalidRecordBatchSnafu {
1010 reason: "Primary key values are not binary array",
1011 })?;
1012 let keys = pk_dict_array.keys();
1013
1014 let mut result: Vec<(CompositeValues, usize)> = Vec::new();
1016 let mut prev_key: Option<u32> = None;
1017
1018 for i in 0..keys.len() {
1019 let current_key = keys.value(i);
1020
1021 if let Some(prev) = prev_key
1023 && prev == current_key
1024 {
1025 result.last_mut().unwrap().1 += 1;
1027 continue;
1028 }
1029
1030 let pk_bytes = pk_values_array.value(current_key as usize);
1032 let decoded_value = codec.decoder().decode(pk_bytes).context(DecodeSnafu)?;
1033
1034 result.push((decoded_value, 1));
1035 prev_key = Some(current_key);
1036 }
1037
1038 Ok(result)
1039}
1040
1041#[cfg(test)]
1042mod tests {
1043 use std::sync::Arc;
1044
1045 use api::v1::SemanticType;
1046 use common_base::readable_size::ReadableSize;
1047 use datafusion_common::HashMap;
1048 use datatypes::data_type::ConcreteDataType;
1049 use datatypes::schema::{
1050 ColumnSchema, FulltextOptions, SkippingIndexOptions, SkippingIndexType,
1051 };
1052 use object_store::ObjectStore;
1053 use object_store::services::Memory;
1054 use puffin_manager::PuffinManagerFactory;
1055 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1056 use tokio::sync::mpsc;
1057
1058 use super::*;
1059 use crate::access_layer::{FilePathProvider, Metrics, SstWriteRequest, WriteType};
1060 use crate::cache::write_cache::WriteCache;
1061 use crate::config::{FulltextIndexConfig, IndexBuildMode, MitoConfig, Mode};
1062 use crate::memtable::time_partition::TimePartitions;
1063 use crate::region::version::{VersionBuilder, VersionControl};
1064 use crate::sst::file::RegionFileId;
1065 use crate::sst::file_purger::NoopFilePurger;
1066 use crate::sst::location;
1067 use crate::sst::parquet::WriteOptions;
1068 use crate::test_util::memtable_util::EmptyMemtableBuilder;
1069 use crate::test_util::scheduler_util::SchedulerEnv;
1070 use crate::test_util::sst_util::{new_batch_by_range, new_source, sst_region_metadata};
1071
1072 struct MetaConfig {
1073 with_inverted: bool,
1074 with_fulltext: bool,
1075 with_skipping_bloom: bool,
1076 }
1077
1078 fn mock_region_metadata(
1079 MetaConfig {
1080 with_inverted,
1081 with_fulltext,
1082 with_skipping_bloom,
1083 }: MetaConfig,
1084 ) -> RegionMetadataRef {
1085 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
1086 let mut column_schema = ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false);
1087 if with_inverted {
1088 column_schema = column_schema.with_inverted_index(true);
1089 }
1090 builder
1091 .push_column_metadata(ColumnMetadata {
1092 column_schema,
1093 semantic_type: SemanticType::Field,
1094 column_id: 1,
1095 })
1096 .push_column_metadata(ColumnMetadata {
1097 column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
1098 semantic_type: SemanticType::Field,
1099 column_id: 2,
1100 })
1101 .push_column_metadata(ColumnMetadata {
1102 column_schema: ColumnSchema::new(
1103 "c",
1104 ConcreteDataType::timestamp_millisecond_datatype(),
1105 false,
1106 ),
1107 semantic_type: SemanticType::Timestamp,
1108 column_id: 3,
1109 });
1110
1111 if with_fulltext {
1112 let column_schema =
1113 ColumnSchema::new("text", ConcreteDataType::string_datatype(), true)
1114 .with_fulltext_options(FulltextOptions {
1115 enable: true,
1116 ..Default::default()
1117 })
1118 .unwrap();
1119
1120 let column = ColumnMetadata {
1121 column_schema,
1122 semantic_type: SemanticType::Field,
1123 column_id: 4,
1124 };
1125
1126 builder.push_column_metadata(column);
1127 }
1128
1129 if with_skipping_bloom {
1130 let column_schema =
1131 ColumnSchema::new("bloom", ConcreteDataType::string_datatype(), false)
1132 .with_skipping_options(SkippingIndexOptions::new_unchecked(
1133 42,
1134 0.01,
1135 SkippingIndexType::BloomFilter,
1136 ))
1137 .unwrap();
1138
1139 let column = ColumnMetadata {
1140 column_schema,
1141 semantic_type: SemanticType::Field,
1142 column_id: 5,
1143 };
1144
1145 builder.push_column_metadata(column);
1146 }
1147
1148 Arc::new(builder.build().unwrap())
1149 }
1150
1151 fn mock_object_store() -> ObjectStore {
1152 ObjectStore::new(Memory::default()).unwrap().finish()
1153 }
1154
1155 async fn mock_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
1156 IntermediateManager::init_fs(path).await.unwrap()
1157 }
1158 struct NoopPathProvider;
1159
1160 impl FilePathProvider for NoopPathProvider {
1161 fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
1162 unreachable!()
1163 }
1164
1165 fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
1166 unreachable!()
1167 }
1168 }
1169
1170 async fn mock_sst_file(
1171 metadata: RegionMetadataRef,
1172 env: &SchedulerEnv,
1173 build_mode: IndexBuildMode,
1174 ) -> SstInfo {
1175 let source = new_source(&[
1176 new_batch_by_range(&["a", "d"], 0, 60),
1177 new_batch_by_range(&["b", "f"], 0, 40),
1178 new_batch_by_range(&["b", "h"], 100, 200),
1179 ]);
1180 let mut index_config = MitoConfig::default().index;
1181 index_config.build_mode = build_mode;
1182 let write_request = SstWriteRequest {
1183 op_type: OperationType::Flush,
1184 metadata: metadata.clone(),
1185 source: either::Left(source),
1186 storage: None,
1187 max_sequence: None,
1188 cache_manager: Default::default(),
1189 index_options: IndexOptions::default(),
1190 index_config,
1191 inverted_index_config: Default::default(),
1192 fulltext_index_config: Default::default(),
1193 bloom_filter_index_config: Default::default(),
1194 };
1195 let mut metrics = Metrics::new(WriteType::Flush);
1196 env.access_layer
1197 .write_sst(write_request, &WriteOptions::default(), &mut metrics)
1198 .await
1199 .unwrap()
1200 .remove(0)
1201 }
1202
1203 async fn mock_version_control(
1204 metadata: RegionMetadataRef,
1205 file_purger: FilePurgerRef,
1206 files: HashMap<FileId, FileMeta>,
1207 ) -> VersionControlRef {
1208 let mutable = Arc::new(TimePartitions::new(
1209 metadata.clone(),
1210 Arc::new(EmptyMemtableBuilder::default()),
1211 0,
1212 None,
1213 ));
1214 let version_builder = VersionBuilder::new(metadata, mutable)
1215 .add_files(file_purger, files.values().cloned())
1216 .build();
1217 Arc::new(VersionControl::new(version_builder))
1218 }
1219
1220 async fn mock_indexer_builder(
1221 metadata: RegionMetadataRef,
1222 env: &SchedulerEnv,
1223 ) -> Arc<dyn IndexerBuilder + Send + Sync> {
1224 let (dir, factory) = PuffinManagerFactory::new_for_test_async("mock_indexer_builder").await;
1225 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1226 let puffin_manager = factory.build(
1227 env.access_layer.object_store().clone(),
1228 RegionFilePathFactory::new(
1229 env.access_layer.table_dir().to_string(),
1230 env.access_layer.path_type(),
1231 ),
1232 );
1233 Arc::new(IndexerBuilderImpl {
1234 build_type: IndexBuildType::Flush,
1235 metadata,
1236 row_group_size: 1024,
1237 puffin_manager,
1238 intermediate_manager: intm_manager,
1239 index_options: IndexOptions::default(),
1240 inverted_index_config: InvertedIndexConfig::default(),
1241 fulltext_index_config: FulltextIndexConfig::default(),
1242 bloom_filter_index_config: BloomFilterConfig::default(),
1243 })
1244 }
1245
1246 #[tokio::test]
1247 async fn test_build_indexer_basic() {
1248 let (dir, factory) =
1249 PuffinManagerFactory::new_for_test_async("test_build_indexer_basic_").await;
1250 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1251
1252 let metadata = mock_region_metadata(MetaConfig {
1253 with_inverted: true,
1254 with_fulltext: true,
1255 with_skipping_bloom: true,
1256 });
1257 let indexer = IndexerBuilderImpl {
1258 build_type: IndexBuildType::Flush,
1259 metadata,
1260 row_group_size: 1024,
1261 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1262 intermediate_manager: intm_manager,
1263 index_options: IndexOptions::default(),
1264 inverted_index_config: InvertedIndexConfig::default(),
1265 fulltext_index_config: FulltextIndexConfig::default(),
1266 bloom_filter_index_config: BloomFilterConfig::default(),
1267 }
1268 .build(FileId::random())
1269 .await;
1270
1271 assert!(indexer.inverted_indexer.is_some());
1272 assert!(indexer.fulltext_indexer.is_some());
1273 assert!(indexer.bloom_filter_indexer.is_some());
1274 }
1275
1276 #[tokio::test]
1277 async fn test_build_indexer_disable_create() {
1278 let (dir, factory) =
1279 PuffinManagerFactory::new_for_test_async("test_build_indexer_disable_create_").await;
1280 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1281
1282 let metadata = mock_region_metadata(MetaConfig {
1283 with_inverted: true,
1284 with_fulltext: true,
1285 with_skipping_bloom: true,
1286 });
1287 let indexer = IndexerBuilderImpl {
1288 build_type: IndexBuildType::Flush,
1289 metadata: metadata.clone(),
1290 row_group_size: 1024,
1291 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1292 intermediate_manager: intm_manager.clone(),
1293 index_options: IndexOptions::default(),
1294 inverted_index_config: InvertedIndexConfig {
1295 create_on_flush: Mode::Disable,
1296 ..Default::default()
1297 },
1298 fulltext_index_config: FulltextIndexConfig::default(),
1299 bloom_filter_index_config: BloomFilterConfig::default(),
1300 }
1301 .build(FileId::random())
1302 .await;
1303
1304 assert!(indexer.inverted_indexer.is_none());
1305 assert!(indexer.fulltext_indexer.is_some());
1306 assert!(indexer.bloom_filter_indexer.is_some());
1307
1308 let indexer = IndexerBuilderImpl {
1309 build_type: IndexBuildType::Compact,
1310 metadata: metadata.clone(),
1311 row_group_size: 1024,
1312 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1313 intermediate_manager: intm_manager.clone(),
1314 index_options: IndexOptions::default(),
1315 inverted_index_config: InvertedIndexConfig::default(),
1316 fulltext_index_config: FulltextIndexConfig {
1317 create_on_compaction: Mode::Disable,
1318 ..Default::default()
1319 },
1320 bloom_filter_index_config: BloomFilterConfig::default(),
1321 }
1322 .build(FileId::random())
1323 .await;
1324
1325 assert!(indexer.inverted_indexer.is_some());
1326 assert!(indexer.fulltext_indexer.is_none());
1327 assert!(indexer.bloom_filter_indexer.is_some());
1328
1329 let indexer = IndexerBuilderImpl {
1330 build_type: IndexBuildType::Compact,
1331 metadata,
1332 row_group_size: 1024,
1333 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1334 intermediate_manager: intm_manager,
1335 index_options: IndexOptions::default(),
1336 inverted_index_config: InvertedIndexConfig::default(),
1337 fulltext_index_config: FulltextIndexConfig::default(),
1338 bloom_filter_index_config: BloomFilterConfig {
1339 create_on_compaction: Mode::Disable,
1340 ..Default::default()
1341 },
1342 }
1343 .build(FileId::random())
1344 .await;
1345
1346 assert!(indexer.inverted_indexer.is_some());
1347 assert!(indexer.fulltext_indexer.is_some());
1348 assert!(indexer.bloom_filter_indexer.is_none());
1349 }
1350
1351 #[tokio::test]
1352 async fn test_build_indexer_no_required() {
1353 let (dir, factory) =
1354 PuffinManagerFactory::new_for_test_async("test_build_indexer_no_required_").await;
1355 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1356
1357 let metadata = mock_region_metadata(MetaConfig {
1358 with_inverted: false,
1359 with_fulltext: true,
1360 with_skipping_bloom: true,
1361 });
1362 let indexer = IndexerBuilderImpl {
1363 build_type: IndexBuildType::Flush,
1364 metadata: metadata.clone(),
1365 row_group_size: 1024,
1366 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1367 intermediate_manager: intm_manager.clone(),
1368 index_options: IndexOptions::default(),
1369 inverted_index_config: InvertedIndexConfig::default(),
1370 fulltext_index_config: FulltextIndexConfig::default(),
1371 bloom_filter_index_config: BloomFilterConfig::default(),
1372 }
1373 .build(FileId::random())
1374 .await;
1375
1376 assert!(indexer.inverted_indexer.is_none());
1377 assert!(indexer.fulltext_indexer.is_some());
1378 assert!(indexer.bloom_filter_indexer.is_some());
1379
1380 let metadata = mock_region_metadata(MetaConfig {
1381 with_inverted: true,
1382 with_fulltext: false,
1383 with_skipping_bloom: true,
1384 });
1385 let indexer = IndexerBuilderImpl {
1386 build_type: IndexBuildType::Flush,
1387 metadata: metadata.clone(),
1388 row_group_size: 1024,
1389 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1390 intermediate_manager: intm_manager.clone(),
1391 index_options: IndexOptions::default(),
1392 inverted_index_config: InvertedIndexConfig::default(),
1393 fulltext_index_config: FulltextIndexConfig::default(),
1394 bloom_filter_index_config: BloomFilterConfig::default(),
1395 }
1396 .build(FileId::random())
1397 .await;
1398
1399 assert!(indexer.inverted_indexer.is_some());
1400 assert!(indexer.fulltext_indexer.is_none());
1401 assert!(indexer.bloom_filter_indexer.is_some());
1402
1403 let metadata = mock_region_metadata(MetaConfig {
1404 with_inverted: true,
1405 with_fulltext: true,
1406 with_skipping_bloom: false,
1407 });
1408 let indexer = IndexerBuilderImpl {
1409 build_type: IndexBuildType::Flush,
1410 metadata: metadata.clone(),
1411 row_group_size: 1024,
1412 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1413 intermediate_manager: intm_manager,
1414 index_options: IndexOptions::default(),
1415 inverted_index_config: InvertedIndexConfig::default(),
1416 fulltext_index_config: FulltextIndexConfig::default(),
1417 bloom_filter_index_config: BloomFilterConfig::default(),
1418 }
1419 .build(FileId::random())
1420 .await;
1421
1422 assert!(indexer.inverted_indexer.is_some());
1423 assert!(indexer.fulltext_indexer.is_some());
1424 assert!(indexer.bloom_filter_indexer.is_none());
1425 }
1426
1427 #[tokio::test]
1428 async fn test_build_indexer_zero_row_group() {
1429 let (dir, factory) =
1430 PuffinManagerFactory::new_for_test_async("test_build_indexer_zero_row_group_").await;
1431 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1432
1433 let metadata = mock_region_metadata(MetaConfig {
1434 with_inverted: true,
1435 with_fulltext: true,
1436 with_skipping_bloom: true,
1437 });
1438 let indexer = IndexerBuilderImpl {
1439 build_type: IndexBuildType::Flush,
1440 metadata,
1441 row_group_size: 0,
1442 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1443 intermediate_manager: intm_manager,
1444 index_options: IndexOptions::default(),
1445 inverted_index_config: InvertedIndexConfig::default(),
1446 fulltext_index_config: FulltextIndexConfig::default(),
1447 bloom_filter_index_config: BloomFilterConfig::default(),
1448 }
1449 .build(FileId::random())
1450 .await;
1451
1452 assert!(indexer.inverted_indexer.is_none());
1453 }
1454
1455 #[tokio::test]
1456 async fn test_index_build_task_sst_not_exist() {
1457 let env = SchedulerEnv::new().await;
1458 let (tx, _rx) = mpsc::channel(4);
1459 let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1460 let mut scheduler = env.mock_index_build_scheduler(4);
1461 let metadata = Arc::new(sst_region_metadata());
1462 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1463 let file_purger = Arc::new(NoopFilePurger {});
1464 let files = HashMap::new();
1465 let version_control =
1466 mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1467 let region_id = metadata.region_id;
1468 let indexer_builder = mock_indexer_builder(metadata, &env).await;
1469
1470 let task = IndexBuildTask {
1472 file_meta: FileMeta {
1473 region_id,
1474 file_id: FileId::random(),
1475 file_size: 100,
1476 ..Default::default()
1477 },
1478 reason: IndexBuildType::Flush,
1479 access_layer: env.access_layer.clone(),
1480 listener: WorkerListener::default(),
1481 manifest_ctx,
1482 write_cache: None,
1483 file_purger,
1484 indexer_builder,
1485 request_sender: tx,
1486 result_sender: result_tx,
1487 };
1488
1489 scheduler
1491 .schedule_build(&version_control, task)
1492 .await
1493 .unwrap();
1494 match result_rx.recv().await.unwrap() {
1495 Ok(outcome) => {
1496 if outcome == IndexBuildOutcome::Finished {
1497 panic!("Expect aborted result due to missing SST file")
1498 }
1499 }
1500 _ => panic!("Expect aborted result due to missing SST file"),
1501 }
1502 }
1503
1504 #[tokio::test]
1505 async fn test_index_build_task_sst_exist() {
1506 let env = SchedulerEnv::new().await;
1507 let mut scheduler = env.mock_index_build_scheduler(4);
1508 let metadata = Arc::new(sst_region_metadata());
1509 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1510 let region_id = metadata.region_id;
1511 let file_purger = Arc::new(NoopFilePurger {});
1512 let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
1513 let file_meta = FileMeta {
1514 region_id,
1515 file_id: sst_info.file_id,
1516 file_size: sst_info.file_size,
1517 index_file_size: sst_info.index_metadata.file_size,
1518 num_rows: sst_info.num_rows as u64,
1519 num_row_groups: sst_info.num_row_groups,
1520 ..Default::default()
1521 };
1522 let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1523 let version_control =
1524 mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1525 let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1526
1527 let (tx, mut rx) = mpsc::channel(4);
1529 let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1530 let task = IndexBuildTask {
1531 file_meta: file_meta.clone(),
1532 reason: IndexBuildType::Flush,
1533 access_layer: env.access_layer.clone(),
1534 listener: WorkerListener::default(),
1535 manifest_ctx,
1536 write_cache: None,
1537 file_purger,
1538 indexer_builder,
1539 request_sender: tx,
1540 result_sender: result_tx,
1541 };
1542
1543 scheduler
1544 .schedule_build(&version_control, task)
1545 .await
1546 .unwrap();
1547
1548 match result_rx.recv().await.unwrap() {
1550 Ok(outcome) => {
1551 assert_eq!(outcome, IndexBuildOutcome::Finished);
1552 }
1553 _ => panic!("Expect finished result"),
1554 }
1555
1556 let worker_req = rx.recv().await.unwrap().request;
1558 match worker_req {
1559 WorkerRequest::Background {
1560 region_id: req_region_id,
1561 notify: BackgroundNotify::IndexBuildFinished(finished),
1562 } => {
1563 assert_eq!(req_region_id, region_id);
1564 assert_eq!(finished.edit.files_to_add.len(), 1);
1565 let updated_meta = &finished.edit.files_to_add[0];
1566
1567 assert!(!updated_meta.available_indexes.is_empty());
1569 assert!(updated_meta.index_file_size > 0);
1570 assert_eq!(updated_meta.file_id, file_meta.file_id);
1571 }
1572 _ => panic!("Unexpected worker request: {:?}", worker_req),
1573 }
1574 }
1575
1576 async fn schedule_index_build_task_with_mode(build_mode: IndexBuildMode) {
1577 let env = SchedulerEnv::new().await;
1578 let mut scheduler = env.mock_index_build_scheduler(4);
1579 let metadata = Arc::new(sst_region_metadata());
1580 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1581 let file_purger = Arc::new(NoopFilePurger {});
1582 let region_id = metadata.region_id;
1583 let sst_info = mock_sst_file(metadata.clone(), &env, build_mode.clone()).await;
1584 let file_meta = FileMeta {
1585 region_id,
1586 file_id: sst_info.file_id,
1587 file_size: sst_info.file_size,
1588 index_file_size: sst_info.index_metadata.file_size,
1589 num_rows: sst_info.num_rows as u64,
1590 num_row_groups: sst_info.num_row_groups,
1591 ..Default::default()
1592 };
1593 let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1594 let version_control =
1595 mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1596 let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1597
1598 let (tx, _rx) = mpsc::channel(4);
1600 let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1601 let task = IndexBuildTask {
1602 file_meta: file_meta.clone(),
1603 reason: IndexBuildType::Flush,
1604 access_layer: env.access_layer.clone(),
1605 listener: WorkerListener::default(),
1606 manifest_ctx,
1607 write_cache: None,
1608 file_purger,
1609 indexer_builder,
1610 request_sender: tx,
1611 result_sender: result_tx,
1612 };
1613
1614 scheduler
1615 .schedule_build(&version_control, task)
1616 .await
1617 .unwrap();
1618
1619 let puffin_path = location::index_file_path(
1620 env.access_layer.table_dir(),
1621 RegionFileId::new(region_id, file_meta.file_id),
1622 env.access_layer.path_type(),
1623 );
1624
1625 if build_mode == IndexBuildMode::Async {
1626 assert!(
1628 !env.access_layer
1629 .object_store()
1630 .exists(&puffin_path)
1631 .await
1632 .unwrap()
1633 );
1634 } else {
1635 assert!(
1637 env.access_layer
1638 .object_store()
1639 .exists(&puffin_path)
1640 .await
1641 .unwrap()
1642 );
1643 }
1644
1645 match result_rx.recv().await.unwrap() {
1647 Ok(outcome) => {
1648 assert_eq!(outcome, IndexBuildOutcome::Finished);
1649 }
1650 _ => panic!("Expect finished result"),
1651 }
1652
1653 assert!(
1655 env.access_layer
1656 .object_store()
1657 .exists(&puffin_path)
1658 .await
1659 .unwrap()
1660 );
1661 }
1662
1663 #[tokio::test]
1664 async fn test_index_build_task_build_mode() {
1665 schedule_index_build_task_with_mode(IndexBuildMode::Async).await;
1666 schedule_index_build_task_with_mode(IndexBuildMode::Sync).await;
1667 }
1668
1669 #[tokio::test]
1670 async fn test_index_build_task_no_index() {
1671 let env = SchedulerEnv::new().await;
1672 let mut scheduler = env.mock_index_build_scheduler(4);
1673 let mut metadata = sst_region_metadata();
1674 metadata.column_metadatas.iter_mut().for_each(|col| {
1676 col.column_schema.set_inverted_index(false);
1677 let _ = col.column_schema.unset_skipping_options();
1678 });
1679 let region_id = metadata.region_id;
1680 let metadata = Arc::new(metadata);
1681 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1682 let file_purger = Arc::new(NoopFilePurger {});
1683 let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
1684 let file_meta = FileMeta {
1685 region_id,
1686 file_id: sst_info.file_id,
1687 file_size: sst_info.file_size,
1688 index_file_size: sst_info.index_metadata.file_size,
1689 num_rows: sst_info.num_rows as u64,
1690 num_row_groups: sst_info.num_row_groups,
1691 ..Default::default()
1692 };
1693 let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1694 let version_control =
1695 mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1696 let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1697
1698 let (tx, mut rx) = mpsc::channel(4);
1700 let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1701 let task = IndexBuildTask {
1702 file_meta: file_meta.clone(),
1703 reason: IndexBuildType::Flush,
1704 access_layer: env.access_layer.clone(),
1705 listener: WorkerListener::default(),
1706 manifest_ctx,
1707 write_cache: None,
1708 file_purger,
1709 indexer_builder,
1710 request_sender: tx,
1711 result_sender: result_tx,
1712 };
1713
1714 scheduler
1715 .schedule_build(&version_control, task)
1716 .await
1717 .unwrap();
1718
1719 match result_rx.recv().await.unwrap() {
1721 Ok(outcome) => {
1722 assert_eq!(outcome, IndexBuildOutcome::Finished);
1723 }
1724 _ => panic!("Expect finished result"),
1725 }
1726
1727 let _ = rx.recv().await.is_none();
1729 }
1730
1731 #[tokio::test]
1732 async fn test_index_build_task_with_write_cache() {
1733 let env = SchedulerEnv::new().await;
1734 let mut scheduler = env.mock_index_build_scheduler(4);
1735 let metadata = Arc::new(sst_region_metadata());
1736 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1737 let file_purger = Arc::new(NoopFilePurger {});
1738 let region_id = metadata.region_id;
1739
1740 let (dir, factory) = PuffinManagerFactory::new_for_test_async("test_write_cache").await;
1741 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1742
1743 let write_cache = Arc::new(
1745 WriteCache::new_fs(
1746 dir.path().to_str().unwrap(),
1747 ReadableSize::mb(10),
1748 None,
1749 None,
1750 factory,
1751 intm_manager,
1752 )
1753 .await
1754 .unwrap(),
1755 );
1756 let indexer_builder = Arc::new(IndexerBuilderImpl {
1758 build_type: IndexBuildType::Flush,
1759 metadata: metadata.clone(),
1760 row_group_size: 1024,
1761 puffin_manager: write_cache.build_puffin_manager().clone(),
1762 intermediate_manager: write_cache.intermediate_manager().clone(),
1763 index_options: IndexOptions::default(),
1764 inverted_index_config: InvertedIndexConfig::default(),
1765 fulltext_index_config: FulltextIndexConfig::default(),
1766 bloom_filter_index_config: BloomFilterConfig::default(),
1767 });
1768
1769 let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
1770 let file_meta = FileMeta {
1771 region_id,
1772 file_id: sst_info.file_id,
1773 file_size: sst_info.file_size,
1774 index_file_size: sst_info.index_metadata.file_size,
1775 num_rows: sst_info.num_rows as u64,
1776 num_row_groups: sst_info.num_row_groups,
1777 ..Default::default()
1778 };
1779 let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1780 let version_control =
1781 mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1782
1783 let (tx, mut _rx) = mpsc::channel(4);
1785 let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1786 let task = IndexBuildTask {
1787 file_meta: file_meta.clone(),
1788 reason: IndexBuildType::Flush,
1789 access_layer: env.access_layer.clone(),
1790 listener: WorkerListener::default(),
1791 manifest_ctx,
1792 write_cache: Some(write_cache.clone()),
1793 file_purger,
1794 indexer_builder,
1795 request_sender: tx,
1796 result_sender: result_tx,
1797 };
1798
1799 scheduler
1800 .schedule_build(&version_control, task)
1801 .await
1802 .unwrap();
1803
1804 match result_rx.recv().await.unwrap() {
1806 Ok(outcome) => {
1807 assert_eq!(outcome, IndexBuildOutcome::Finished);
1808 }
1809 _ => panic!("Expect finished result"),
1810 }
1811
1812 let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Puffin);
1814 assert!(write_cache.file_cache().contains_key(&index_key));
1815 }
1816
1817 async fn create_mock_task_for_schedule(
1818 env: &SchedulerEnv,
1819 file_id: FileId,
1820 region_id: RegionId,
1821 reason: IndexBuildType,
1822 ) -> IndexBuildTask {
1823 let metadata = Arc::new(sst_region_metadata());
1824 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1825 let file_purger = Arc::new(NoopFilePurger {});
1826 let indexer_builder = mock_indexer_builder(metadata, env).await;
1827 let (tx, _rx) = mpsc::channel(4);
1828 let (result_tx, _result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1829
1830 IndexBuildTask {
1831 file_meta: FileMeta {
1832 region_id,
1833 file_id,
1834 file_size: 100,
1835 ..Default::default()
1836 },
1837 reason,
1838 access_layer: env.access_layer.clone(),
1839 listener: WorkerListener::default(),
1840 manifest_ctx,
1841 write_cache: None,
1842 file_purger,
1843 indexer_builder,
1844 request_sender: tx,
1845 result_sender: result_tx,
1846 }
1847 }
1848
1849 #[tokio::test]
1850 async fn test_scheduler_comprehensive() {
1851 let env = SchedulerEnv::new().await;
1852 let mut scheduler = env.mock_index_build_scheduler(2);
1853 let metadata = Arc::new(sst_region_metadata());
1854 let region_id = metadata.region_id;
1855 let file_purger = Arc::new(NoopFilePurger {});
1856
1857 let file_id1 = FileId::random();
1859 let file_id2 = FileId::random();
1860 let file_id3 = FileId::random();
1861 let file_id4 = FileId::random();
1862 let file_id5 = FileId::random();
1863
1864 let mut files = HashMap::new();
1865 for file_id in [file_id1, file_id2, file_id3, file_id4, file_id5] {
1866 files.insert(
1867 file_id,
1868 FileMeta {
1869 region_id,
1870 file_id,
1871 file_size: 100,
1872 ..Default::default()
1873 },
1874 );
1875 }
1876
1877 let version_control = mock_version_control(metadata, file_purger, files).await;
1878
1879 let task1 =
1881 create_mock_task_for_schedule(&env, file_id1, region_id, IndexBuildType::Flush).await;
1882 assert!(
1883 scheduler
1884 .schedule_build(&version_control, task1)
1885 .await
1886 .is_ok()
1887 );
1888 assert!(scheduler.region_status.contains_key(®ion_id));
1889 let status = scheduler.region_status.get(®ion_id).unwrap();
1890 assert_eq!(status.building_files.len(), 1);
1891 assert!(status.building_files.contains(&file_id1));
1892
1893 let task1_dup =
1895 create_mock_task_for_schedule(&env, file_id1, region_id, IndexBuildType::Flush).await;
1896 scheduler
1897 .schedule_build(&version_control, task1_dup)
1898 .await
1899 .unwrap();
1900 let status = scheduler.region_status.get(®ion_id).unwrap();
1901 assert_eq!(status.building_files.len(), 1); let task2 =
1905 create_mock_task_for_schedule(&env, file_id2, region_id, IndexBuildType::Flush).await;
1906 scheduler
1907 .schedule_build(&version_control, task2)
1908 .await
1909 .unwrap();
1910 let status = scheduler.region_status.get(®ion_id).unwrap();
1911 assert_eq!(status.building_files.len(), 2); assert_eq!(status.pending_tasks.len(), 0);
1913
1914 let task3 =
1917 create_mock_task_for_schedule(&env, file_id3, region_id, IndexBuildType::Compact).await;
1918 let task4 =
1919 create_mock_task_for_schedule(&env, file_id4, region_id, IndexBuildType::SchemaChange)
1920 .await;
1921 let task5 =
1922 create_mock_task_for_schedule(&env, file_id5, region_id, IndexBuildType::Manual).await;
1923
1924 scheduler
1925 .schedule_build(&version_control, task3)
1926 .await
1927 .unwrap();
1928 scheduler
1929 .schedule_build(&version_control, task4)
1930 .await
1931 .unwrap();
1932 scheduler
1933 .schedule_build(&version_control, task5)
1934 .await
1935 .unwrap();
1936
1937 let status = scheduler.region_status.get(®ion_id).unwrap();
1938 assert_eq!(status.building_files.len(), 2); assert_eq!(status.pending_tasks.len(), 3); scheduler.on_task_stopped(region_id, file_id1, &version_control);
1943 let status = scheduler.region_status.get(®ion_id).unwrap();
1944 assert!(!status.building_files.contains(&file_id1));
1945 assert_eq!(status.building_files.len(), 2); assert_eq!(status.pending_tasks.len(), 2); assert!(status.building_files.contains(&file_id5));
1949
1950 scheduler.on_task_stopped(region_id, file_id2, &version_control);
1952 let status = scheduler.region_status.get(®ion_id).unwrap();
1953 assert_eq!(status.building_files.len(), 2);
1954 assert_eq!(status.pending_tasks.len(), 1); assert!(status.building_files.contains(&file_id4)); scheduler.on_task_stopped(region_id, file_id5, &version_control);
1959 scheduler.on_task_stopped(region_id, file_id4, &version_control);
1960
1961 let status = scheduler.region_status.get(®ion_id).unwrap();
1962 assert_eq!(status.building_files.len(), 1); assert_eq!(status.pending_tasks.len(), 0);
1964 assert!(status.building_files.contains(&file_id3));
1965
1966 scheduler.on_task_stopped(region_id, file_id3, &version_control);
1967
1968 assert!(!scheduler.region_status.contains_key(®ion_id));
1970
1971 let task6 =
1973 create_mock_task_for_schedule(&env, file_id1, region_id, IndexBuildType::Flush).await;
1974 let task7 =
1975 create_mock_task_for_schedule(&env, file_id2, region_id, IndexBuildType::Flush).await;
1976 let task8 =
1977 create_mock_task_for_schedule(&env, file_id3, region_id, IndexBuildType::Manual).await;
1978
1979 scheduler
1980 .schedule_build(&version_control, task6)
1981 .await
1982 .unwrap();
1983 scheduler
1984 .schedule_build(&version_control, task7)
1985 .await
1986 .unwrap();
1987 scheduler
1988 .schedule_build(&version_control, task8)
1989 .await
1990 .unwrap();
1991
1992 assert!(scheduler.region_status.contains_key(®ion_id));
1993 let status = scheduler.region_status.get(®ion_id).unwrap();
1994 assert_eq!(status.building_files.len(), 2);
1995 assert_eq!(status.pending_tasks.len(), 1);
1996
1997 scheduler.on_region_dropped(region_id).await;
1998 assert!(!scheduler.region_status.contains_key(®ion_id));
1999 }
2000}