1pub(crate) mod bloom_filter;
16pub(crate) mod fulltext_index;
17mod indexer;
18pub mod intermediate;
19pub(crate) mod inverted_index;
20pub mod puffin_manager;
21mod statistics;
22pub(crate) mod store;
23
24use std::cmp::Ordering;
25use std::collections::{BinaryHeap, HashMap, HashSet};
26use std::num::NonZeroUsize;
27use std::sync::Arc;
28
29use bloom_filter::creator::BloomFilterIndexer;
30use common_telemetry::{debug, error, info, warn};
31use datatypes::arrow::array::BinaryArray;
32use datatypes::arrow::record_batch::RecordBatch;
33use mito_codec::index::IndexValuesCodec;
34use mito_codec::row_converter::CompositeValues;
35use puffin_manager::SstPuffinManager;
36use smallvec::{SmallVec, smallvec};
37use snafu::{OptionExt, ResultExt};
38use statistics::{ByteCount, RowCount};
39use store_api::metadata::RegionMetadataRef;
40use store_api::storage::{ColumnId, FileId, RegionId};
41use strum::IntoStaticStr;
42use tokio::sync::mpsc::Sender;
43
44use crate::access_layer::{AccessLayerRef, FilePathProvider, OperationType, RegionFilePathFactory};
45use crate::cache::file_cache::{FileType, IndexKey};
46use crate::cache::write_cache::{UploadTracker, WriteCacheRef};
47use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
48use crate::error::{
49 BuildIndexAsyncSnafu, DecodeSnafu, Error, InvalidRecordBatchSnafu, RegionClosedSnafu,
50 RegionDroppedSnafu, RegionTruncatedSnafu, Result,
51};
52use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
53use crate::metrics::INDEX_CREATE_MEMORY_USAGE;
54use crate::read::{Batch, BatchReader};
55use crate::region::options::IndexOptions;
56use crate::region::version::VersionControlRef;
57use crate::region::{ManifestContextRef, RegionLeaderState};
58use crate::request::{
59 BackgroundNotify, IndexBuildFailed, IndexBuildFinished, IndexBuildStopped, WorkerRequest,
60 WorkerRequestWithTime,
61};
62use crate::schedule::scheduler::{Job, SchedulerRef};
63use crate::sst::file::{FileHandle, FileMeta, IndexType, RegionFileId};
64use crate::sst::file_purger::FilePurgerRef;
65use crate::sst::index::fulltext_index::creator::FulltextIndexer;
66use crate::sst::index::intermediate::IntermediateManager;
67use crate::sst::index::inverted_index::creator::InvertedIndexer;
68use crate::sst::parquet::SstInfo;
69use crate::sst::parquet::flat_format::primary_key_column_index;
70use crate::sst::parquet::format::PrimaryKeyArray;
71use crate::worker::WorkerListener;
72
73pub(crate) const TYPE_INVERTED_INDEX: &str = "inverted_index";
74pub(crate) const TYPE_FULLTEXT_INDEX: &str = "fulltext_index";
75pub(crate) const TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
76
77#[derive(Debug, Clone, Default)]
79pub struct IndexOutput {
80 pub file_size: u64,
82 pub inverted_index: InvertedIndexOutput,
84 pub fulltext_index: FulltextIndexOutput,
86 pub bloom_filter: BloomFilterOutput,
88}
89
90impl IndexOutput {
91 pub fn build_available_indexes(&self) -> SmallVec<[IndexType; 4]> {
92 let mut indexes = SmallVec::new();
93 if self.inverted_index.is_available() {
94 indexes.push(IndexType::InvertedIndex);
95 }
96 if self.fulltext_index.is_available() {
97 indexes.push(IndexType::FulltextIndex);
98 }
99 if self.bloom_filter.is_available() {
100 indexes.push(IndexType::BloomFilterIndex);
101 }
102 indexes
103 }
104}
105
106#[derive(Debug, Clone, Default)]
108pub struct IndexBaseOutput {
109 pub index_size: ByteCount,
111 pub row_count: RowCount,
113 pub columns: Vec<ColumnId>,
115}
116
117impl IndexBaseOutput {
118 pub fn is_available(&self) -> bool {
119 self.index_size > 0
120 }
121}
122
123pub type InvertedIndexOutput = IndexBaseOutput;
125pub type FulltextIndexOutput = IndexBaseOutput;
127pub type BloomFilterOutput = IndexBaseOutput;
129
130#[derive(Default)]
132pub struct Indexer {
133 file_id: FileId,
134 region_id: RegionId,
135 puffin_manager: Option<SstPuffinManager>,
136 inverted_indexer: Option<InvertedIndexer>,
137 last_mem_inverted_index: usize,
138 fulltext_indexer: Option<FulltextIndexer>,
139 last_mem_fulltext_index: usize,
140 bloom_filter_indexer: Option<BloomFilterIndexer>,
141 last_mem_bloom_filter: usize,
142 intermediate_manager: Option<IntermediateManager>,
143}
144
145impl Indexer {
146 pub async fn update(&mut self, batch: &mut Batch) {
148 self.do_update(batch).await;
149
150 self.flush_mem_metrics();
151 }
152
153 pub async fn update_flat(&mut self, batch: &RecordBatch) {
155 self.do_update_flat(batch).await;
156
157 self.flush_mem_metrics();
158 }
159
160 pub async fn finish(&mut self) -> IndexOutput {
162 let output = self.do_finish().await;
163
164 self.flush_mem_metrics();
165 output
166 }
167
168 pub async fn abort(&mut self) {
170 self.do_abort().await;
171
172 self.flush_mem_metrics();
173 }
174
175 fn flush_mem_metrics(&mut self) {
176 let inverted_mem = self
177 .inverted_indexer
178 .as_ref()
179 .map_or(0, |creator| creator.memory_usage());
180 INDEX_CREATE_MEMORY_USAGE
181 .with_label_values(&[TYPE_INVERTED_INDEX])
182 .add(inverted_mem as i64 - self.last_mem_inverted_index as i64);
183 self.last_mem_inverted_index = inverted_mem;
184
185 let fulltext_mem = self
186 .fulltext_indexer
187 .as_ref()
188 .map_or(0, |creator| creator.memory_usage());
189 INDEX_CREATE_MEMORY_USAGE
190 .with_label_values(&[TYPE_FULLTEXT_INDEX])
191 .add(fulltext_mem as i64 - self.last_mem_fulltext_index as i64);
192 self.last_mem_fulltext_index = fulltext_mem;
193
194 let bloom_filter_mem = self
195 .bloom_filter_indexer
196 .as_ref()
197 .map_or(0, |creator| creator.memory_usage());
198 INDEX_CREATE_MEMORY_USAGE
199 .with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
200 .add(bloom_filter_mem as i64 - self.last_mem_bloom_filter as i64);
201 self.last_mem_bloom_filter = bloom_filter_mem;
202 }
203}
204
205#[async_trait::async_trait]
206pub trait IndexerBuilder {
207 async fn build(&self, file_id: FileId) -> Indexer;
209}
210#[derive(Clone)]
211pub(crate) struct IndexerBuilderImpl {
212 pub(crate) build_type: IndexBuildType,
213 pub(crate) metadata: RegionMetadataRef,
214 pub(crate) row_group_size: usize,
215 pub(crate) puffin_manager: SstPuffinManager,
216 pub(crate) intermediate_manager: IntermediateManager,
217 pub(crate) index_options: IndexOptions,
218 pub(crate) inverted_index_config: InvertedIndexConfig,
219 pub(crate) fulltext_index_config: FulltextIndexConfig,
220 pub(crate) bloom_filter_index_config: BloomFilterConfig,
221}
222
223#[async_trait::async_trait]
224impl IndexerBuilder for IndexerBuilderImpl {
225 async fn build(&self, file_id: FileId) -> Indexer {
227 let mut indexer = Indexer {
228 file_id,
229 region_id: self.metadata.region_id,
230 ..Default::default()
231 };
232
233 indexer.inverted_indexer = self.build_inverted_indexer(file_id);
234 indexer.fulltext_indexer = self.build_fulltext_indexer(file_id).await;
235 indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(file_id);
236 indexer.intermediate_manager = Some(self.intermediate_manager.clone());
237 if indexer.inverted_indexer.is_none()
238 && indexer.fulltext_indexer.is_none()
239 && indexer.bloom_filter_indexer.is_none()
240 {
241 indexer.abort().await;
242 return Indexer::default();
243 }
244
245 indexer.puffin_manager = Some(self.puffin_manager.clone());
246 indexer
247 }
248}
249
250impl IndexerBuilderImpl {
251 fn build_inverted_indexer(&self, file_id: FileId) -> Option<InvertedIndexer> {
252 let create = match self.build_type {
253 IndexBuildType::Flush => self.inverted_index_config.create_on_flush.auto(),
254 IndexBuildType::Compact => self.inverted_index_config.create_on_compaction.auto(),
255 _ => true,
256 };
257
258 if !create {
259 debug!(
260 "Skip creating inverted index due to config, region_id: {}, file_id: {}",
261 self.metadata.region_id, file_id,
262 );
263 return None;
264 }
265
266 let indexed_column_ids = self.metadata.inverted_indexed_column_ids(
267 self.index_options.inverted_index.ignore_column_ids.iter(),
268 );
269 if indexed_column_ids.is_empty() {
270 debug!(
271 "No columns to be indexed, skip creating inverted index, region_id: {}, file_id: {}",
272 self.metadata.region_id, file_id,
273 );
274 return None;
275 }
276
277 let Some(mut segment_row_count) =
278 NonZeroUsize::new(self.index_options.inverted_index.segment_row_count)
279 else {
280 warn!(
281 "Segment row count is 0, skip creating index, region_id: {}, file_id: {}",
282 self.metadata.region_id, file_id,
283 );
284 return None;
285 };
286
287 let Some(row_group_size) = NonZeroUsize::new(self.row_group_size) else {
288 warn!(
289 "Row group size is 0, skip creating index, region_id: {}, file_id: {}",
290 self.metadata.region_id, file_id,
291 );
292 return None;
293 };
294
295 if row_group_size.get() % segment_row_count.get() != 0 {
297 segment_row_count = row_group_size;
298 }
299
300 let indexer = InvertedIndexer::new(
301 file_id,
302 &self.metadata,
303 self.intermediate_manager.clone(),
304 self.inverted_index_config.mem_threshold_on_create(),
305 segment_row_count,
306 indexed_column_ids,
307 );
308
309 Some(indexer)
310 }
311
312 async fn build_fulltext_indexer(&self, file_id: FileId) -> Option<FulltextIndexer> {
313 let create = match self.build_type {
314 IndexBuildType::Flush => self.fulltext_index_config.create_on_flush.auto(),
315 IndexBuildType::Compact => self.fulltext_index_config.create_on_compaction.auto(),
316 _ => true,
317 };
318
319 if !create {
320 debug!(
321 "Skip creating full-text index due to config, region_id: {}, file_id: {}",
322 self.metadata.region_id, file_id,
323 );
324 return None;
325 }
326
327 let mem_limit = self.fulltext_index_config.mem_threshold_on_create();
328 let creator = FulltextIndexer::new(
329 &self.metadata.region_id,
330 &file_id,
331 &self.intermediate_manager,
332 &self.metadata,
333 self.fulltext_index_config.compress,
334 mem_limit,
335 )
336 .await;
337
338 let err = match creator {
339 Ok(creator) => {
340 if creator.is_none() {
341 debug!(
342 "Skip creating full-text index due to no columns require indexing, region_id: {}, file_id: {}",
343 self.metadata.region_id, file_id,
344 );
345 }
346 return creator;
347 }
348 Err(err) => err,
349 };
350
351 if cfg!(any(test, feature = "test")) {
352 panic!(
353 "Failed to create full-text indexer, region_id: {}, file_id: {}, err: {:?}",
354 self.metadata.region_id, file_id, err
355 );
356 } else {
357 warn!(
358 err; "Failed to create full-text indexer, region_id: {}, file_id: {}",
359 self.metadata.region_id, file_id,
360 );
361 }
362
363 None
364 }
365
366 fn build_bloom_filter_indexer(&self, file_id: FileId) -> Option<BloomFilterIndexer> {
367 let create = match self.build_type {
368 IndexBuildType::Flush => self.bloom_filter_index_config.create_on_flush.auto(),
369 IndexBuildType::Compact => self.bloom_filter_index_config.create_on_compaction.auto(),
370 _ => true,
371 };
372
373 if !create {
374 debug!(
375 "Skip creating bloom filter due to config, region_id: {}, file_id: {}",
376 self.metadata.region_id, file_id,
377 );
378 return None;
379 }
380
381 let mem_limit = self.bloom_filter_index_config.mem_threshold_on_create();
382 let indexer = BloomFilterIndexer::new(
383 file_id,
384 &self.metadata,
385 self.intermediate_manager.clone(),
386 mem_limit,
387 );
388
389 let err = match indexer {
390 Ok(indexer) => {
391 if indexer.is_none() {
392 debug!(
393 "Skip creating bloom filter due to no columns require indexing, region_id: {}, file_id: {}",
394 self.metadata.region_id, file_id,
395 );
396 }
397 return indexer;
398 }
399 Err(err) => err,
400 };
401
402 if cfg!(any(test, feature = "test")) {
403 panic!(
404 "Failed to create bloom filter, region_id: {}, file_id: {}, err: {:?}",
405 self.metadata.region_id, file_id, err
406 );
407 } else {
408 warn!(
409 err; "Failed to create bloom filter, region_id: {}, file_id: {}",
410 self.metadata.region_id, file_id,
411 );
412 }
413
414 None
415 }
416}
417
418#[derive(Debug, Clone, IntoStaticStr)]
420pub enum IndexBuildType {
421 SchemaChange,
423 Flush,
425 Compact,
427 Manual,
429}
430
431impl IndexBuildType {
432 fn as_str(&self) -> &'static str {
433 self.into()
434 }
435
436 fn priority(&self) -> u8 {
438 match self {
439 IndexBuildType::Manual => 3,
440 IndexBuildType::SchemaChange => 2,
441 IndexBuildType::Flush => 1,
442 IndexBuildType::Compact => 0,
443 }
444 }
445}
446
447impl From<OperationType> for IndexBuildType {
448 fn from(op_type: OperationType) -> Self {
449 match op_type {
450 OperationType::Flush => IndexBuildType::Flush,
451 OperationType::Compact => IndexBuildType::Compact,
452 }
453 }
454}
455
456#[derive(Debug, Clone, PartialEq, Eq, Hash)]
458pub enum IndexBuildOutcome {
459 Finished,
460 Aborted(String),
461}
462
463pub type ResultMpscSender = Sender<Result<IndexBuildOutcome>>;
465
466#[derive(Clone)]
467pub struct IndexBuildTask {
468 pub file_meta: FileMeta,
470 pub reason: IndexBuildType,
471 pub access_layer: AccessLayerRef,
472 pub(crate) listener: WorkerListener,
473 pub(crate) manifest_ctx: ManifestContextRef,
474 pub write_cache: Option<WriteCacheRef>,
475 pub file_purger: FilePurgerRef,
476 pub indexer_builder: Arc<dyn IndexerBuilder + Send + Sync>,
479 pub(crate) request_sender: Sender<WorkerRequestWithTime>,
481 pub(crate) result_sender: ResultMpscSender,
483}
484
485impl std::fmt::Debug for IndexBuildTask {
486 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
487 f.debug_struct("IndexBuildTask")
488 .field("region_id", &self.file_meta.region_id)
489 .field("file_id", &self.file_meta.file_id)
490 .field("reason", &self.reason)
491 .finish()
492 }
493}
494
495impl IndexBuildTask {
496 pub async fn on_success(&self, outcome: IndexBuildOutcome) {
498 let _ = self.result_sender.send(Ok(outcome)).await;
499 }
500
501 pub async fn on_failure(&self, err: Arc<Error>) {
503 let _ = self
504 .result_sender
505 .send(Err(err.clone()).context(BuildIndexAsyncSnafu {
506 region_id: self.file_meta.region_id,
507 }))
508 .await;
509 }
510
511 fn into_index_build_job(mut self, version_control: VersionControlRef) -> Job {
512 Box::pin(async move {
513 self.do_index_build(version_control).await;
514 })
515 }
516
517 async fn do_index_build(&mut self, version_control: VersionControlRef) {
518 self.listener
519 .on_index_build_begin(RegionFileId::new(
520 self.file_meta.region_id,
521 self.file_meta.file_id,
522 ))
523 .await;
524 match self.index_build(version_control).await {
525 Ok(outcome) => self.on_success(outcome).await,
526 Err(e) => {
527 warn!(
528 e; "Index build task failed, region: {}, file_id: {}",
529 self.file_meta.region_id, self.file_meta.file_id,
530 );
531 self.on_failure(e.into()).await
532 }
533 }
534 let worker_request = WorkerRequest::Background {
535 region_id: self.file_meta.region_id,
536 notify: BackgroundNotify::IndexBuildStopped(IndexBuildStopped {
537 region_id: self.file_meta.region_id,
538 file_id: self.file_meta.file_id,
539 }),
540 };
541 let _ = self
542 .request_sender
543 .send(WorkerRequestWithTime::new(worker_request))
544 .await;
545 }
546
547 async fn check_sst_file_exists(&self, version_control: &VersionControlRef) -> bool {
549 let file_id = self.file_meta.file_id;
550 let level = self.file_meta.level;
551 let version = version_control.current().version;
553
554 let Some(level_files) = version.ssts.levels().get(level as usize) else {
555 warn!(
556 "File id {} not found in level {} for index build, region: {}",
557 file_id, level, self.file_meta.region_id
558 );
559 return false;
560 };
561
562 match level_files.files.get(&file_id) {
563 Some(handle) if !handle.is_deleted() && !handle.compacting() => {
564 true
568 }
569 _ => {
570 warn!(
571 "File id {} not found in region version for index build, region: {}",
572 file_id, self.file_meta.region_id
573 );
574 false
575 }
576 }
577 }
578
579 async fn index_build(
580 &mut self,
581 version_control: VersionControlRef,
582 ) -> Result<IndexBuildOutcome> {
583 let index_file_id = if self.file_meta.index_file_size > 0 {
584 FileId::random()
586 } else {
587 self.file_meta.file_id
588 };
589 let mut indexer = self.indexer_builder.build(index_file_id).await;
590
591 if !self.check_sst_file_exists(&version_control).await {
593 indexer.abort().await;
595 self.listener
596 .on_index_build_abort(RegionFileId::new(
597 self.file_meta.region_id,
598 self.file_meta.file_id,
599 ))
600 .await;
601 return Ok(IndexBuildOutcome::Aborted(format!(
602 "SST file not found during index build, region: {}, file_id: {}",
603 self.file_meta.region_id, self.file_meta.file_id
604 )));
605 }
606
607 let mut parquet_reader = self
608 .access_layer
609 .read_sst(FileHandle::new(
610 self.file_meta.clone(),
611 self.file_purger.clone(),
612 ))
613 .build()
614 .await?;
615
616 loop {
618 match parquet_reader.next_batch().await {
619 Ok(Some(batch)) => {
620 indexer.update(&mut batch.clone()).await;
621 }
622 Ok(None) => break,
623 Err(e) => {
624 indexer.abort().await;
625 return Err(e);
626 }
627 }
628 }
629 let index_output = indexer.finish().await;
630
631 if index_output.file_size > 0 {
632 if !self.check_sst_file_exists(&version_control).await {
634 indexer.abort().await;
636 self.listener
637 .on_index_build_abort(RegionFileId::new(
638 self.file_meta.region_id,
639 self.file_meta.file_id,
640 ))
641 .await;
642 return Ok(IndexBuildOutcome::Aborted(format!(
643 "SST file not found during index build, region: {}, file_id: {}",
644 self.file_meta.region_id, self.file_meta.file_id
645 )));
646 }
647
648 self.maybe_upload_index_file(index_output.clone(), index_file_id)
650 .await?;
651
652 let worker_request = match self.update_manifest(index_output, index_file_id).await {
653 Ok(edit) => {
654 let index_build_finished = IndexBuildFinished {
655 region_id: self.file_meta.region_id,
656 edit,
657 };
658 WorkerRequest::Background {
659 region_id: self.file_meta.region_id,
660 notify: BackgroundNotify::IndexBuildFinished(index_build_finished),
661 }
662 }
663 Err(e) => {
664 let err = Arc::new(e);
665 WorkerRequest::Background {
666 region_id: self.file_meta.region_id,
667 notify: BackgroundNotify::IndexBuildFailed(IndexBuildFailed { err }),
668 }
669 }
670 };
671
672 let _ = self
673 .request_sender
674 .send(WorkerRequestWithTime::new(worker_request))
675 .await;
676 }
677 Ok(IndexBuildOutcome::Finished)
678 }
679
680 async fn maybe_upload_index_file(
681 &self,
682 output: IndexOutput,
683 index_file_id: FileId,
684 ) -> Result<()> {
685 if let Some(write_cache) = &self.write_cache {
686 let file_id = self.file_meta.file_id;
687 let region_id = self.file_meta.region_id;
688 let remote_store = self.access_layer.object_store();
689 let mut upload_tracker = UploadTracker::new(region_id);
690 let mut err = None;
691 let puffin_key = IndexKey::new(region_id, index_file_id, FileType::Puffin);
692 let puffin_path = RegionFilePathFactory::new(
693 self.access_layer.table_dir().to_string(),
694 self.access_layer.path_type(),
695 )
696 .build_index_file_path(RegionFileId::new(region_id, file_id));
697 if let Err(e) = write_cache
698 .upload(puffin_key, &puffin_path, remote_store)
699 .await
700 {
701 err = Some(e);
702 }
703 upload_tracker.push_uploaded_file(puffin_path);
704 if let Some(err) = err {
705 upload_tracker
707 .clean(
708 &smallvec![SstInfo {
709 file_id,
710 index_metadata: output,
711 ..Default::default()
712 }],
713 &write_cache.file_cache(),
714 remote_store,
715 )
716 .await;
717 return Err(err);
718 }
719 } else {
720 debug!("write cache is not available, skip uploading index file");
721 }
722 Ok(())
723 }
724
725 async fn update_manifest(
726 &mut self,
727 output: IndexOutput,
728 index_file_id: FileId,
729 ) -> Result<RegionEdit> {
730 self.file_meta.available_indexes = output.build_available_indexes();
731 self.file_meta.index_file_size = output.file_size;
732 self.file_meta.index_file_id = Some(index_file_id);
733 let edit = RegionEdit {
734 files_to_add: vec![self.file_meta.clone()],
735 files_to_remove: vec![],
736 timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
737 flushed_sequence: None,
738 flushed_entry_id: None,
739 committed_sequence: None,
740 compaction_time_window: None,
741 };
742 let version = self
743 .manifest_ctx
744 .update_manifest(
745 RegionLeaderState::Writable,
746 RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())),
747 )
748 .await?;
749 info!(
750 "Successfully update manifest version to {version}, region: {}, reason: {}",
751 self.file_meta.region_id,
752 self.reason.as_str()
753 );
754 Ok(edit)
755 }
756}
757
758impl PartialEq for IndexBuildTask {
759 fn eq(&self, other: &Self) -> bool {
760 self.reason.priority() == other.reason.priority()
761 }
762}
763
764impl Eq for IndexBuildTask {}
765
766impl PartialOrd for IndexBuildTask {
767 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
768 Some(self.cmp(other))
769 }
770}
771
772impl Ord for IndexBuildTask {
773 fn cmp(&self, other: &Self) -> Ordering {
774 self.reason.priority().cmp(&other.reason.priority())
775 }
776}
777
778pub struct IndexBuildStatus {
780 pub region_id: RegionId,
781 pub building_files: HashSet<FileId>,
782 pub pending_tasks: BinaryHeap<IndexBuildTask>,
783}
784
785impl IndexBuildStatus {
786 pub fn new(region_id: RegionId) -> Self {
787 IndexBuildStatus {
788 region_id,
789 building_files: HashSet::new(),
790 pending_tasks: BinaryHeap::new(),
791 }
792 }
793
794 async fn on_failure(self, err: Arc<Error>) {
795 for task in self.pending_tasks {
796 task.on_failure(err.clone()).await;
797 }
798 }
799}
800
801pub struct IndexBuildScheduler {
802 scheduler: SchedulerRef,
804 region_status: HashMap<RegionId, IndexBuildStatus>,
806 files_limit: usize,
808}
809
810impl IndexBuildScheduler {
812 pub fn new(scheduler: SchedulerRef, files_limit: usize) -> Self {
813 IndexBuildScheduler {
814 scheduler,
815 region_status: HashMap::new(),
816 files_limit,
817 }
818 }
819
820 pub(crate) async fn schedule_build(
821 &mut self,
822 version_control: &VersionControlRef,
823 task: IndexBuildTask,
824 ) -> Result<()> {
825 let status = self
826 .region_status
827 .entry(task.file_meta.region_id)
828 .or_insert_with(|| IndexBuildStatus::new(task.file_meta.region_id));
829
830 if status.building_files.contains(&task.file_meta.file_id) {
831 let region_file_id =
832 RegionFileId::new(task.file_meta.region_id, task.file_meta.file_id);
833 debug!(
834 "Aborting index build task since index is already being built for region file {:?}",
835 region_file_id
836 );
837 task.on_success(IndexBuildOutcome::Aborted(format!(
838 "Index is already being built for region file {:?}",
839 region_file_id
840 )))
841 .await;
842 task.listener.on_index_build_abort(region_file_id).await;
843 return Ok(());
844 }
845
846 status.pending_tasks.push(task);
847
848 self.schedule_next_build_batch(version_control);
849 Ok(())
850 }
851
852 fn schedule_next_build_batch(&mut self, version_control: &VersionControlRef) {
854 let mut building_count = 0;
855 for status in self.region_status.values() {
856 building_count += status.building_files.len();
857 }
858
859 while building_count < self.files_limit {
860 if let Some(task) = self.find_next_task() {
861 let region_id = task.file_meta.region_id;
862 let file_id = task.file_meta.file_id;
863 let job = task.into_index_build_job(version_control.clone());
864 if self.scheduler.schedule(job).is_ok() {
865 if let Some(status) = self.region_status.get_mut(®ion_id) {
866 status.building_files.insert(file_id);
867 building_count += 1;
868 status
869 .pending_tasks
870 .retain(|t| t.file_meta.file_id != file_id);
871 } else {
872 error!(
873 "Region status not found when scheduling index build task, region: {}",
874 region_id
875 );
876 }
877 } else {
878 error!(
879 "Failed to schedule index build job, region: {}, file_id: {}",
880 region_id, file_id
881 );
882 }
883 } else {
884 break;
886 }
887 }
888 }
889
890 fn find_next_task(&self) -> Option<IndexBuildTask> {
892 self.region_status
893 .iter()
894 .filter_map(|(_, status)| status.pending_tasks.peek())
895 .max()
896 .cloned()
897 }
898
899 pub(crate) fn on_task_stopped(
900 &mut self,
901 region_id: RegionId,
902 file_id: FileId,
903 version_control: &VersionControlRef,
904 ) {
905 if let Some(status) = self.region_status.get_mut(®ion_id) {
906 status.building_files.remove(&file_id);
907 if status.building_files.is_empty() && status.pending_tasks.is_empty() {
908 self.region_status.remove(®ion_id);
910 }
911 }
912
913 self.schedule_next_build_batch(version_control);
914 }
915
916 pub(crate) async fn on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
917 error!(
918 err; "Index build scheduler encountered failure for region {}, removing all pending tasks.",
919 region_id
920 );
921 let Some(status) = self.region_status.remove(®ion_id) else {
922 return;
923 };
924 status.on_failure(err).await;
925 }
926
927 pub(crate) async fn on_region_dropped(&mut self, region_id: RegionId) {
929 self.remove_region_on_failure(
930 region_id,
931 Arc::new(RegionDroppedSnafu { region_id }.build()),
932 )
933 .await;
934 }
935
936 pub(crate) async fn on_region_closed(&mut self, region_id: RegionId) {
938 self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()))
939 .await;
940 }
941
942 pub(crate) async fn on_region_truncated(&mut self, region_id: RegionId) {
944 self.remove_region_on_failure(
945 region_id,
946 Arc::new(RegionTruncatedSnafu { region_id }.build()),
947 )
948 .await;
949 }
950
951 async fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
952 let Some(status) = self.region_status.remove(®ion_id) else {
953 return;
954 };
955 status.on_failure(err).await;
956 }
957}
958
959pub(crate) fn decode_primary_keys_with_counts(
962 batch: &RecordBatch,
963 codec: &IndexValuesCodec,
964) -> Result<Vec<(CompositeValues, usize)>> {
965 let primary_key_index = primary_key_column_index(batch.num_columns());
966 let pk_dict_array = batch
967 .column(primary_key_index)
968 .as_any()
969 .downcast_ref::<PrimaryKeyArray>()
970 .context(InvalidRecordBatchSnafu {
971 reason: "Primary key column is not a dictionary array",
972 })?;
973 let pk_values_array = pk_dict_array
974 .values()
975 .as_any()
976 .downcast_ref::<BinaryArray>()
977 .context(InvalidRecordBatchSnafu {
978 reason: "Primary key values are not binary array",
979 })?;
980 let keys = pk_dict_array.keys();
981
982 let mut result: Vec<(CompositeValues, usize)> = Vec::new();
984 let mut prev_key: Option<u32> = None;
985
986 for i in 0..keys.len() {
987 let current_key = keys.value(i);
988
989 if let Some(prev) = prev_key
991 && prev == current_key
992 {
993 result.last_mut().unwrap().1 += 1;
995 continue;
996 }
997
998 let pk_bytes = pk_values_array.value(current_key as usize);
1000 let decoded_value = codec.decoder().decode(pk_bytes).context(DecodeSnafu)?;
1001
1002 result.push((decoded_value, 1));
1003 prev_key = Some(current_key);
1004 }
1005
1006 Ok(result)
1007}
1008
1009#[cfg(test)]
1010mod tests {
1011 use std::sync::Arc;
1012
1013 use api::v1::SemanticType;
1014 use common_base::readable_size::ReadableSize;
1015 use datafusion_common::HashMap;
1016 use datatypes::data_type::ConcreteDataType;
1017 use datatypes::schema::{
1018 ColumnSchema, FulltextOptions, SkippingIndexOptions, SkippingIndexType,
1019 };
1020 use object_store::ObjectStore;
1021 use object_store::services::Memory;
1022 use puffin_manager::PuffinManagerFactory;
1023 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1024 use tokio::sync::mpsc;
1025
1026 use super::*;
1027 use crate::access_layer::{FilePathProvider, Metrics, SstWriteRequest, WriteType};
1028 use crate::cache::write_cache::WriteCache;
1029 use crate::config::{FulltextIndexConfig, IndexBuildMode, MitoConfig, Mode};
1030 use crate::memtable::time_partition::TimePartitions;
1031 use crate::region::version::{VersionBuilder, VersionControl};
1032 use crate::sst::file::RegionFileId;
1033 use crate::sst::file_purger::NoopFilePurger;
1034 use crate::sst::location;
1035 use crate::sst::parquet::WriteOptions;
1036 use crate::test_util::memtable_util::EmptyMemtableBuilder;
1037 use crate::test_util::scheduler_util::SchedulerEnv;
1038 use crate::test_util::sst_util::{new_batch_by_range, new_source, sst_region_metadata};
1039
1040 struct MetaConfig {
1041 with_inverted: bool,
1042 with_fulltext: bool,
1043 with_skipping_bloom: bool,
1044 }
1045
1046 fn mock_region_metadata(
1047 MetaConfig {
1048 with_inverted,
1049 with_fulltext,
1050 with_skipping_bloom,
1051 }: MetaConfig,
1052 ) -> RegionMetadataRef {
1053 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
1054 let mut column_schema = ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false);
1055 if with_inverted {
1056 column_schema = column_schema.with_inverted_index(true);
1057 }
1058 builder
1059 .push_column_metadata(ColumnMetadata {
1060 column_schema,
1061 semantic_type: SemanticType::Field,
1062 column_id: 1,
1063 })
1064 .push_column_metadata(ColumnMetadata {
1065 column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
1066 semantic_type: SemanticType::Field,
1067 column_id: 2,
1068 })
1069 .push_column_metadata(ColumnMetadata {
1070 column_schema: ColumnSchema::new(
1071 "c",
1072 ConcreteDataType::timestamp_millisecond_datatype(),
1073 false,
1074 ),
1075 semantic_type: SemanticType::Timestamp,
1076 column_id: 3,
1077 });
1078
1079 if with_fulltext {
1080 let column_schema =
1081 ColumnSchema::new("text", ConcreteDataType::string_datatype(), true)
1082 .with_fulltext_options(FulltextOptions {
1083 enable: true,
1084 ..Default::default()
1085 })
1086 .unwrap();
1087
1088 let column = ColumnMetadata {
1089 column_schema,
1090 semantic_type: SemanticType::Field,
1091 column_id: 4,
1092 };
1093
1094 builder.push_column_metadata(column);
1095 }
1096
1097 if with_skipping_bloom {
1098 let column_schema =
1099 ColumnSchema::new("bloom", ConcreteDataType::string_datatype(), false)
1100 .with_skipping_options(SkippingIndexOptions::new_unchecked(
1101 42,
1102 0.01,
1103 SkippingIndexType::BloomFilter,
1104 ))
1105 .unwrap();
1106
1107 let column = ColumnMetadata {
1108 column_schema,
1109 semantic_type: SemanticType::Field,
1110 column_id: 5,
1111 };
1112
1113 builder.push_column_metadata(column);
1114 }
1115
1116 Arc::new(builder.build().unwrap())
1117 }
1118
1119 fn mock_object_store() -> ObjectStore {
1120 ObjectStore::new(Memory::default()).unwrap().finish()
1121 }
1122
1123 async fn mock_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
1124 IntermediateManager::init_fs(path).await.unwrap()
1125 }
1126 struct NoopPathProvider;
1127
1128 impl FilePathProvider for NoopPathProvider {
1129 fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
1130 unreachable!()
1131 }
1132
1133 fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
1134 unreachable!()
1135 }
1136 }
1137
1138 async fn mock_sst_file(
1139 metadata: RegionMetadataRef,
1140 env: &SchedulerEnv,
1141 build_mode: IndexBuildMode,
1142 ) -> SstInfo {
1143 let source = new_source(&[
1144 new_batch_by_range(&["a", "d"], 0, 60),
1145 new_batch_by_range(&["b", "f"], 0, 40),
1146 new_batch_by_range(&["b", "h"], 100, 200),
1147 ]);
1148 let mut index_config = MitoConfig::default().index;
1149 index_config.build_mode = build_mode;
1150 let write_request = SstWriteRequest {
1151 op_type: OperationType::Flush,
1152 metadata: metadata.clone(),
1153 source: either::Left(source),
1154 storage: None,
1155 max_sequence: None,
1156 cache_manager: Default::default(),
1157 index_options: IndexOptions::default(),
1158 index_config,
1159 inverted_index_config: Default::default(),
1160 fulltext_index_config: Default::default(),
1161 bloom_filter_index_config: Default::default(),
1162 };
1163 let mut metrics = Metrics::new(WriteType::Flush);
1164 env.access_layer
1165 .write_sst(write_request, &WriteOptions::default(), &mut metrics)
1166 .await
1167 .unwrap()
1168 .remove(0)
1169 }
1170
1171 async fn mock_version_control(
1172 metadata: RegionMetadataRef,
1173 file_purger: FilePurgerRef,
1174 files: HashMap<FileId, FileMeta>,
1175 ) -> VersionControlRef {
1176 let mutable = Arc::new(TimePartitions::new(
1177 metadata.clone(),
1178 Arc::new(EmptyMemtableBuilder::default()),
1179 0,
1180 None,
1181 ));
1182 let version_builder = VersionBuilder::new(metadata, mutable)
1183 .add_files(file_purger, files.values().cloned())
1184 .build();
1185 Arc::new(VersionControl::new(version_builder))
1186 }
1187
1188 async fn mock_indexer_builder(
1189 metadata: RegionMetadataRef,
1190 env: &SchedulerEnv,
1191 ) -> Arc<dyn IndexerBuilder + Send + Sync> {
1192 let (dir, factory) = PuffinManagerFactory::new_for_test_async("mock_indexer_builder").await;
1193 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1194 let puffin_manager = factory.build(
1195 env.access_layer.object_store().clone(),
1196 RegionFilePathFactory::new(
1197 env.access_layer.table_dir().to_string(),
1198 env.access_layer.path_type(),
1199 ),
1200 );
1201 Arc::new(IndexerBuilderImpl {
1202 build_type: IndexBuildType::Flush,
1203 metadata,
1204 row_group_size: 1024,
1205 puffin_manager,
1206 intermediate_manager: intm_manager,
1207 index_options: IndexOptions::default(),
1208 inverted_index_config: InvertedIndexConfig::default(),
1209 fulltext_index_config: FulltextIndexConfig::default(),
1210 bloom_filter_index_config: BloomFilterConfig::default(),
1211 })
1212 }
1213
1214 #[tokio::test]
1215 async fn test_build_indexer_basic() {
1216 let (dir, factory) =
1217 PuffinManagerFactory::new_for_test_async("test_build_indexer_basic_").await;
1218 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1219
1220 let metadata = mock_region_metadata(MetaConfig {
1221 with_inverted: true,
1222 with_fulltext: true,
1223 with_skipping_bloom: true,
1224 });
1225 let indexer = IndexerBuilderImpl {
1226 build_type: IndexBuildType::Flush,
1227 metadata,
1228 row_group_size: 1024,
1229 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1230 intermediate_manager: intm_manager,
1231 index_options: IndexOptions::default(),
1232 inverted_index_config: InvertedIndexConfig::default(),
1233 fulltext_index_config: FulltextIndexConfig::default(),
1234 bloom_filter_index_config: BloomFilterConfig::default(),
1235 }
1236 .build(FileId::random())
1237 .await;
1238
1239 assert!(indexer.inverted_indexer.is_some());
1240 assert!(indexer.fulltext_indexer.is_some());
1241 assert!(indexer.bloom_filter_indexer.is_some());
1242 }
1243
1244 #[tokio::test]
1245 async fn test_build_indexer_disable_create() {
1246 let (dir, factory) =
1247 PuffinManagerFactory::new_for_test_async("test_build_indexer_disable_create_").await;
1248 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1249
1250 let metadata = mock_region_metadata(MetaConfig {
1251 with_inverted: true,
1252 with_fulltext: true,
1253 with_skipping_bloom: true,
1254 });
1255 let indexer = IndexerBuilderImpl {
1256 build_type: IndexBuildType::Flush,
1257 metadata: metadata.clone(),
1258 row_group_size: 1024,
1259 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1260 intermediate_manager: intm_manager.clone(),
1261 index_options: IndexOptions::default(),
1262 inverted_index_config: InvertedIndexConfig {
1263 create_on_flush: Mode::Disable,
1264 ..Default::default()
1265 },
1266 fulltext_index_config: FulltextIndexConfig::default(),
1267 bloom_filter_index_config: BloomFilterConfig::default(),
1268 }
1269 .build(FileId::random())
1270 .await;
1271
1272 assert!(indexer.inverted_indexer.is_none());
1273 assert!(indexer.fulltext_indexer.is_some());
1274 assert!(indexer.bloom_filter_indexer.is_some());
1275
1276 let indexer = IndexerBuilderImpl {
1277 build_type: IndexBuildType::Compact,
1278 metadata: metadata.clone(),
1279 row_group_size: 1024,
1280 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1281 intermediate_manager: intm_manager.clone(),
1282 index_options: IndexOptions::default(),
1283 inverted_index_config: InvertedIndexConfig::default(),
1284 fulltext_index_config: FulltextIndexConfig {
1285 create_on_compaction: Mode::Disable,
1286 ..Default::default()
1287 },
1288 bloom_filter_index_config: BloomFilterConfig::default(),
1289 }
1290 .build(FileId::random())
1291 .await;
1292
1293 assert!(indexer.inverted_indexer.is_some());
1294 assert!(indexer.fulltext_indexer.is_none());
1295 assert!(indexer.bloom_filter_indexer.is_some());
1296
1297 let indexer = IndexerBuilderImpl {
1298 build_type: IndexBuildType::Compact,
1299 metadata,
1300 row_group_size: 1024,
1301 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1302 intermediate_manager: intm_manager,
1303 index_options: IndexOptions::default(),
1304 inverted_index_config: InvertedIndexConfig::default(),
1305 fulltext_index_config: FulltextIndexConfig::default(),
1306 bloom_filter_index_config: BloomFilterConfig {
1307 create_on_compaction: Mode::Disable,
1308 ..Default::default()
1309 },
1310 }
1311 .build(FileId::random())
1312 .await;
1313
1314 assert!(indexer.inverted_indexer.is_some());
1315 assert!(indexer.fulltext_indexer.is_some());
1316 assert!(indexer.bloom_filter_indexer.is_none());
1317 }
1318
1319 #[tokio::test]
1320 async fn test_build_indexer_no_required() {
1321 let (dir, factory) =
1322 PuffinManagerFactory::new_for_test_async("test_build_indexer_no_required_").await;
1323 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1324
1325 let metadata = mock_region_metadata(MetaConfig {
1326 with_inverted: false,
1327 with_fulltext: true,
1328 with_skipping_bloom: true,
1329 });
1330 let indexer = IndexerBuilderImpl {
1331 build_type: IndexBuildType::Flush,
1332 metadata: metadata.clone(),
1333 row_group_size: 1024,
1334 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1335 intermediate_manager: intm_manager.clone(),
1336 index_options: IndexOptions::default(),
1337 inverted_index_config: InvertedIndexConfig::default(),
1338 fulltext_index_config: FulltextIndexConfig::default(),
1339 bloom_filter_index_config: BloomFilterConfig::default(),
1340 }
1341 .build(FileId::random())
1342 .await;
1343
1344 assert!(indexer.inverted_indexer.is_none());
1345 assert!(indexer.fulltext_indexer.is_some());
1346 assert!(indexer.bloom_filter_indexer.is_some());
1347
1348 let metadata = mock_region_metadata(MetaConfig {
1349 with_inverted: true,
1350 with_fulltext: false,
1351 with_skipping_bloom: true,
1352 });
1353 let indexer = IndexerBuilderImpl {
1354 build_type: IndexBuildType::Flush,
1355 metadata: metadata.clone(),
1356 row_group_size: 1024,
1357 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1358 intermediate_manager: intm_manager.clone(),
1359 index_options: IndexOptions::default(),
1360 inverted_index_config: InvertedIndexConfig::default(),
1361 fulltext_index_config: FulltextIndexConfig::default(),
1362 bloom_filter_index_config: BloomFilterConfig::default(),
1363 }
1364 .build(FileId::random())
1365 .await;
1366
1367 assert!(indexer.inverted_indexer.is_some());
1368 assert!(indexer.fulltext_indexer.is_none());
1369 assert!(indexer.bloom_filter_indexer.is_some());
1370
1371 let metadata = mock_region_metadata(MetaConfig {
1372 with_inverted: true,
1373 with_fulltext: true,
1374 with_skipping_bloom: false,
1375 });
1376 let indexer = IndexerBuilderImpl {
1377 build_type: IndexBuildType::Flush,
1378 metadata: metadata.clone(),
1379 row_group_size: 1024,
1380 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1381 intermediate_manager: intm_manager,
1382 index_options: IndexOptions::default(),
1383 inverted_index_config: InvertedIndexConfig::default(),
1384 fulltext_index_config: FulltextIndexConfig::default(),
1385 bloom_filter_index_config: BloomFilterConfig::default(),
1386 }
1387 .build(FileId::random())
1388 .await;
1389
1390 assert!(indexer.inverted_indexer.is_some());
1391 assert!(indexer.fulltext_indexer.is_some());
1392 assert!(indexer.bloom_filter_indexer.is_none());
1393 }
1394
1395 #[tokio::test]
1396 async fn test_build_indexer_zero_row_group() {
1397 let (dir, factory) =
1398 PuffinManagerFactory::new_for_test_async("test_build_indexer_zero_row_group_").await;
1399 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1400
1401 let metadata = mock_region_metadata(MetaConfig {
1402 with_inverted: true,
1403 with_fulltext: true,
1404 with_skipping_bloom: true,
1405 });
1406 let indexer = IndexerBuilderImpl {
1407 build_type: IndexBuildType::Flush,
1408 metadata,
1409 row_group_size: 0,
1410 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1411 intermediate_manager: intm_manager,
1412 index_options: IndexOptions::default(),
1413 inverted_index_config: InvertedIndexConfig::default(),
1414 fulltext_index_config: FulltextIndexConfig::default(),
1415 bloom_filter_index_config: BloomFilterConfig::default(),
1416 }
1417 .build(FileId::random())
1418 .await;
1419
1420 assert!(indexer.inverted_indexer.is_none());
1421 }
1422
1423 #[tokio::test]
1424 async fn test_index_build_task_sst_not_exist() {
1425 let env = SchedulerEnv::new().await;
1426 let (tx, _rx) = mpsc::channel(4);
1427 let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1428 let mut scheduler = env.mock_index_build_scheduler(4);
1429 let metadata = Arc::new(sst_region_metadata());
1430 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1431 let file_purger = Arc::new(NoopFilePurger {});
1432 let files = HashMap::new();
1433 let version_control =
1434 mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1435 let region_id = metadata.region_id;
1436 let indexer_builder = mock_indexer_builder(metadata, &env).await;
1437
1438 let task = IndexBuildTask {
1440 file_meta: FileMeta {
1441 region_id,
1442 file_id: FileId::random(),
1443 file_size: 100,
1444 ..Default::default()
1445 },
1446 reason: IndexBuildType::Flush,
1447 access_layer: env.access_layer.clone(),
1448 listener: WorkerListener::default(),
1449 manifest_ctx,
1450 write_cache: None,
1451 file_purger,
1452 indexer_builder,
1453 request_sender: tx,
1454 result_sender: result_tx,
1455 };
1456
1457 scheduler
1459 .schedule_build(&version_control, task)
1460 .await
1461 .unwrap();
1462 match result_rx.recv().await.unwrap() {
1463 Ok(outcome) => {
1464 if outcome == IndexBuildOutcome::Finished {
1465 panic!("Expect aborted result due to missing SST file")
1466 }
1467 }
1468 _ => panic!("Expect aborted result due to missing SST file"),
1469 }
1470 }
1471
1472 #[tokio::test]
1473 async fn test_index_build_task_sst_exist() {
1474 let env = SchedulerEnv::new().await;
1475 let mut scheduler = env.mock_index_build_scheduler(4);
1476 let metadata = Arc::new(sst_region_metadata());
1477 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1478 let region_id = metadata.region_id;
1479 let file_purger = Arc::new(NoopFilePurger {});
1480 let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
1481 let file_meta = FileMeta {
1482 region_id,
1483 file_id: sst_info.file_id,
1484 file_size: sst_info.file_size,
1485 index_file_size: sst_info.index_metadata.file_size,
1486 num_rows: sst_info.num_rows as u64,
1487 num_row_groups: sst_info.num_row_groups,
1488 ..Default::default()
1489 };
1490 let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1491 let version_control =
1492 mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1493 let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1494
1495 let (tx, mut rx) = mpsc::channel(4);
1497 let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1498 let task = IndexBuildTask {
1499 file_meta: file_meta.clone(),
1500 reason: IndexBuildType::Flush,
1501 access_layer: env.access_layer.clone(),
1502 listener: WorkerListener::default(),
1503 manifest_ctx,
1504 write_cache: None,
1505 file_purger,
1506 indexer_builder,
1507 request_sender: tx,
1508 result_sender: result_tx,
1509 };
1510
1511 scheduler
1512 .schedule_build(&version_control, task)
1513 .await
1514 .unwrap();
1515
1516 match result_rx.recv().await.unwrap() {
1518 Ok(outcome) => {
1519 assert_eq!(outcome, IndexBuildOutcome::Finished);
1520 }
1521 _ => panic!("Expect finished result"),
1522 }
1523
1524 let worker_req = rx.recv().await.unwrap().request;
1526 match worker_req {
1527 WorkerRequest::Background {
1528 region_id: req_region_id,
1529 notify: BackgroundNotify::IndexBuildFinished(finished),
1530 } => {
1531 assert_eq!(req_region_id, region_id);
1532 assert_eq!(finished.edit.files_to_add.len(), 1);
1533 let updated_meta = &finished.edit.files_to_add[0];
1534
1535 assert!(!updated_meta.available_indexes.is_empty());
1537 assert!(updated_meta.index_file_size > 0);
1538 assert_eq!(updated_meta.file_id, file_meta.file_id);
1539 }
1540 _ => panic!("Unexpected worker request: {:?}", worker_req),
1541 }
1542 }
1543
1544 async fn schedule_index_build_task_with_mode(build_mode: IndexBuildMode) {
1545 let env = SchedulerEnv::new().await;
1546 let mut scheduler = env.mock_index_build_scheduler(4);
1547 let metadata = Arc::new(sst_region_metadata());
1548 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1549 let file_purger = Arc::new(NoopFilePurger {});
1550 let region_id = metadata.region_id;
1551 let sst_info = mock_sst_file(metadata.clone(), &env, build_mode.clone()).await;
1552 let file_meta = FileMeta {
1553 region_id,
1554 file_id: sst_info.file_id,
1555 file_size: sst_info.file_size,
1556 index_file_size: sst_info.index_metadata.file_size,
1557 num_rows: sst_info.num_rows as u64,
1558 num_row_groups: sst_info.num_row_groups,
1559 ..Default::default()
1560 };
1561 let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1562 let version_control =
1563 mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1564 let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1565
1566 let (tx, _rx) = mpsc::channel(4);
1568 let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1569 let task = IndexBuildTask {
1570 file_meta: file_meta.clone(),
1571 reason: IndexBuildType::Flush,
1572 access_layer: env.access_layer.clone(),
1573 listener: WorkerListener::default(),
1574 manifest_ctx,
1575 write_cache: None,
1576 file_purger,
1577 indexer_builder,
1578 request_sender: tx,
1579 result_sender: result_tx,
1580 };
1581
1582 scheduler
1583 .schedule_build(&version_control, task)
1584 .await
1585 .unwrap();
1586
1587 let puffin_path = location::index_file_path(
1588 env.access_layer.table_dir(),
1589 RegionFileId::new(region_id, file_meta.file_id),
1590 env.access_layer.path_type(),
1591 );
1592
1593 if build_mode == IndexBuildMode::Async {
1594 assert!(
1596 !env.access_layer
1597 .object_store()
1598 .exists(&puffin_path)
1599 .await
1600 .unwrap()
1601 );
1602 } else {
1603 assert!(
1605 env.access_layer
1606 .object_store()
1607 .exists(&puffin_path)
1608 .await
1609 .unwrap()
1610 );
1611 }
1612
1613 match result_rx.recv().await.unwrap() {
1615 Ok(outcome) => {
1616 assert_eq!(outcome, IndexBuildOutcome::Finished);
1617 }
1618 _ => panic!("Expect finished result"),
1619 }
1620
1621 assert!(
1623 env.access_layer
1624 .object_store()
1625 .exists(&puffin_path)
1626 .await
1627 .unwrap()
1628 );
1629 }
1630
1631 #[tokio::test]
1632 async fn test_index_build_task_build_mode() {
1633 schedule_index_build_task_with_mode(IndexBuildMode::Async).await;
1634 schedule_index_build_task_with_mode(IndexBuildMode::Sync).await;
1635 }
1636
1637 #[tokio::test]
1638 async fn test_index_build_task_no_index() {
1639 let env = SchedulerEnv::new().await;
1640 let mut scheduler = env.mock_index_build_scheduler(4);
1641 let mut metadata = sst_region_metadata();
1642 metadata.column_metadatas.iter_mut().for_each(|col| {
1644 col.column_schema.set_inverted_index(false);
1645 let _ = col.column_schema.unset_skipping_options();
1646 });
1647 let region_id = metadata.region_id;
1648 let metadata = Arc::new(metadata);
1649 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1650 let file_purger = Arc::new(NoopFilePurger {});
1651 let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
1652 let file_meta = FileMeta {
1653 region_id,
1654 file_id: sst_info.file_id,
1655 file_size: sst_info.file_size,
1656 index_file_size: sst_info.index_metadata.file_size,
1657 num_rows: sst_info.num_rows as u64,
1658 num_row_groups: sst_info.num_row_groups,
1659 ..Default::default()
1660 };
1661 let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1662 let version_control =
1663 mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1664 let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1665
1666 let (tx, mut rx) = mpsc::channel(4);
1668 let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1669 let task = IndexBuildTask {
1670 file_meta: file_meta.clone(),
1671 reason: IndexBuildType::Flush,
1672 access_layer: env.access_layer.clone(),
1673 listener: WorkerListener::default(),
1674 manifest_ctx,
1675 write_cache: None,
1676 file_purger,
1677 indexer_builder,
1678 request_sender: tx,
1679 result_sender: result_tx,
1680 };
1681
1682 scheduler
1683 .schedule_build(&version_control, task)
1684 .await
1685 .unwrap();
1686
1687 match result_rx.recv().await.unwrap() {
1689 Ok(outcome) => {
1690 assert_eq!(outcome, IndexBuildOutcome::Finished);
1691 }
1692 _ => panic!("Expect finished result"),
1693 }
1694
1695 let _ = rx.recv().await.is_none();
1697 }
1698
1699 #[tokio::test]
1700 async fn test_index_build_task_with_write_cache() {
1701 let env = SchedulerEnv::new().await;
1702 let mut scheduler = env.mock_index_build_scheduler(4);
1703 let metadata = Arc::new(sst_region_metadata());
1704 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1705 let file_purger = Arc::new(NoopFilePurger {});
1706 let region_id = metadata.region_id;
1707
1708 let (dir, factory) = PuffinManagerFactory::new_for_test_async("test_write_cache").await;
1709 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1710
1711 let write_cache = Arc::new(
1713 WriteCache::new_fs(
1714 dir.path().to_str().unwrap(),
1715 ReadableSize::mb(10),
1716 None,
1717 factory,
1718 intm_manager,
1719 )
1720 .await
1721 .unwrap(),
1722 );
1723 let indexer_builder = Arc::new(IndexerBuilderImpl {
1725 build_type: IndexBuildType::Flush,
1726 metadata: metadata.clone(),
1727 row_group_size: 1024,
1728 puffin_manager: write_cache.build_puffin_manager().clone(),
1729 intermediate_manager: write_cache.intermediate_manager().clone(),
1730 index_options: IndexOptions::default(),
1731 inverted_index_config: InvertedIndexConfig::default(),
1732 fulltext_index_config: FulltextIndexConfig::default(),
1733 bloom_filter_index_config: BloomFilterConfig::default(),
1734 });
1735
1736 let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
1737 let file_meta = FileMeta {
1738 region_id,
1739 file_id: sst_info.file_id,
1740 file_size: sst_info.file_size,
1741 index_file_size: sst_info.index_metadata.file_size,
1742 num_rows: sst_info.num_rows as u64,
1743 num_row_groups: sst_info.num_row_groups,
1744 ..Default::default()
1745 };
1746 let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1747 let version_control =
1748 mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1749
1750 let (tx, mut _rx) = mpsc::channel(4);
1752 let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1753 let task = IndexBuildTask {
1754 file_meta: file_meta.clone(),
1755 reason: IndexBuildType::Flush,
1756 access_layer: env.access_layer.clone(),
1757 listener: WorkerListener::default(),
1758 manifest_ctx,
1759 write_cache: Some(write_cache.clone()),
1760 file_purger,
1761 indexer_builder,
1762 request_sender: tx,
1763 result_sender: result_tx,
1764 };
1765
1766 scheduler
1767 .schedule_build(&version_control, task)
1768 .await
1769 .unwrap();
1770
1771 match result_rx.recv().await.unwrap() {
1773 Ok(outcome) => {
1774 assert_eq!(outcome, IndexBuildOutcome::Finished);
1775 }
1776 _ => panic!("Expect finished result"),
1777 }
1778
1779 let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Puffin);
1781 assert!(write_cache.file_cache().contains_key(&index_key));
1782 }
1783
1784 async fn create_mock_task_for_schedule(
1785 env: &SchedulerEnv,
1786 file_id: FileId,
1787 region_id: RegionId,
1788 reason: IndexBuildType,
1789 ) -> IndexBuildTask {
1790 let metadata = Arc::new(sst_region_metadata());
1791 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1792 let file_purger = Arc::new(NoopFilePurger {});
1793 let indexer_builder = mock_indexer_builder(metadata, env).await;
1794 let (tx, _rx) = mpsc::channel(4);
1795 let (result_tx, _result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1796
1797 IndexBuildTask {
1798 file_meta: FileMeta {
1799 region_id,
1800 file_id,
1801 file_size: 100,
1802 ..Default::default()
1803 },
1804 reason,
1805 access_layer: env.access_layer.clone(),
1806 listener: WorkerListener::default(),
1807 manifest_ctx,
1808 write_cache: None,
1809 file_purger,
1810 indexer_builder,
1811 request_sender: tx,
1812 result_sender: result_tx,
1813 }
1814 }
1815
1816 #[tokio::test]
1817 async fn test_scheduler_comprehensive() {
1818 let env = SchedulerEnv::new().await;
1819 let mut scheduler = env.mock_index_build_scheduler(2);
1820 let metadata = Arc::new(sst_region_metadata());
1821 let region_id = metadata.region_id;
1822 let file_purger = Arc::new(NoopFilePurger {});
1823
1824 let file_id1 = FileId::random();
1826 let file_id2 = FileId::random();
1827 let file_id3 = FileId::random();
1828 let file_id4 = FileId::random();
1829 let file_id5 = FileId::random();
1830
1831 let mut files = HashMap::new();
1832 for file_id in [file_id1, file_id2, file_id3, file_id4, file_id5] {
1833 files.insert(
1834 file_id,
1835 FileMeta {
1836 region_id,
1837 file_id,
1838 file_size: 100,
1839 ..Default::default()
1840 },
1841 );
1842 }
1843
1844 let version_control = mock_version_control(metadata, file_purger, files).await;
1845
1846 let task1 =
1848 create_mock_task_for_schedule(&env, file_id1, region_id, IndexBuildType::Flush).await;
1849 assert!(
1850 scheduler
1851 .schedule_build(&version_control, task1)
1852 .await
1853 .is_ok()
1854 );
1855 assert!(scheduler.region_status.contains_key(®ion_id));
1856 let status = scheduler.region_status.get(®ion_id).unwrap();
1857 assert_eq!(status.building_files.len(), 1);
1858 assert!(status.building_files.contains(&file_id1));
1859
1860 let task1_dup =
1862 create_mock_task_for_schedule(&env, file_id1, region_id, IndexBuildType::Flush).await;
1863 scheduler
1864 .schedule_build(&version_control, task1_dup)
1865 .await
1866 .unwrap();
1867 let status = scheduler.region_status.get(®ion_id).unwrap();
1868 assert_eq!(status.building_files.len(), 1); let task2 =
1872 create_mock_task_for_schedule(&env, file_id2, region_id, IndexBuildType::Flush).await;
1873 scheduler
1874 .schedule_build(&version_control, task2)
1875 .await
1876 .unwrap();
1877 let status = scheduler.region_status.get(®ion_id).unwrap();
1878 assert_eq!(status.building_files.len(), 2); assert_eq!(status.pending_tasks.len(), 0);
1880
1881 let task3 =
1884 create_mock_task_for_schedule(&env, file_id3, region_id, IndexBuildType::Compact).await;
1885 let task4 =
1886 create_mock_task_for_schedule(&env, file_id4, region_id, IndexBuildType::SchemaChange)
1887 .await;
1888 let task5 =
1889 create_mock_task_for_schedule(&env, file_id5, region_id, IndexBuildType::Manual).await;
1890
1891 scheduler
1892 .schedule_build(&version_control, task3)
1893 .await
1894 .unwrap();
1895 scheduler
1896 .schedule_build(&version_control, task4)
1897 .await
1898 .unwrap();
1899 scheduler
1900 .schedule_build(&version_control, task5)
1901 .await
1902 .unwrap();
1903
1904 let status = scheduler.region_status.get(®ion_id).unwrap();
1905 assert_eq!(status.building_files.len(), 2); assert_eq!(status.pending_tasks.len(), 3); scheduler.on_task_stopped(region_id, file_id1, &version_control);
1910 let status = scheduler.region_status.get(®ion_id).unwrap();
1911 assert!(!status.building_files.contains(&file_id1));
1912 assert_eq!(status.building_files.len(), 2); assert_eq!(status.pending_tasks.len(), 2); assert!(status.building_files.contains(&file_id5));
1916
1917 scheduler.on_task_stopped(region_id, file_id2, &version_control);
1919 let status = scheduler.region_status.get(®ion_id).unwrap();
1920 assert_eq!(status.building_files.len(), 2);
1921 assert_eq!(status.pending_tasks.len(), 1); assert!(status.building_files.contains(&file_id4)); scheduler.on_task_stopped(region_id, file_id5, &version_control);
1926 scheduler.on_task_stopped(region_id, file_id4, &version_control);
1927
1928 let status = scheduler.region_status.get(®ion_id).unwrap();
1929 assert_eq!(status.building_files.len(), 1); assert_eq!(status.pending_tasks.len(), 0);
1931 assert!(status.building_files.contains(&file_id3));
1932
1933 scheduler.on_task_stopped(region_id, file_id3, &version_control);
1934
1935 assert!(!scheduler.region_status.contains_key(®ion_id));
1937
1938 let task6 =
1940 create_mock_task_for_schedule(&env, file_id1, region_id, IndexBuildType::Flush).await;
1941 let task7 =
1942 create_mock_task_for_schedule(&env, file_id2, region_id, IndexBuildType::Flush).await;
1943 let task8 =
1944 create_mock_task_for_schedule(&env, file_id3, region_id, IndexBuildType::Manual).await;
1945
1946 scheduler
1947 .schedule_build(&version_control, task6)
1948 .await
1949 .unwrap();
1950 scheduler
1951 .schedule_build(&version_control, task7)
1952 .await
1953 .unwrap();
1954 scheduler
1955 .schedule_build(&version_control, task8)
1956 .await
1957 .unwrap();
1958
1959 assert!(scheduler.region_status.contains_key(®ion_id));
1960 let status = scheduler.region_status.get(®ion_id).unwrap();
1961 assert_eq!(status.building_files.len(), 2);
1962 assert_eq!(status.pending_tasks.len(), 1);
1963
1964 scheduler.on_region_dropped(region_id).await;
1965 assert!(!scheduler.region_status.contains_key(®ion_id));
1966 }
1967}