mito2/sst/
index.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod bloom_filter;
16pub(crate) mod fulltext_index;
17mod indexer;
18pub mod intermediate;
19pub(crate) mod inverted_index;
20pub mod puffin_manager;
21mod statistics;
22pub(crate) mod store;
23
24use std::cmp::Ordering;
25use std::collections::{BinaryHeap, HashMap, HashSet};
26use std::num::NonZeroUsize;
27use std::sync::Arc;
28
29use bloom_filter::creator::BloomFilterIndexer;
30use common_telemetry::{debug, error, info, warn};
31use datatypes::arrow::array::BinaryArray;
32use datatypes::arrow::record_batch::RecordBatch;
33use mito_codec::index::IndexValuesCodec;
34use mito_codec::row_converter::CompositeValues;
35use object_store::ObjectStore;
36use puffin_manager::SstPuffinManager;
37use smallvec::{SmallVec, smallvec};
38use snafu::{OptionExt, ResultExt};
39use statistics::{ByteCount, RowCount};
40use store_api::metadata::RegionMetadataRef;
41use store_api::storage::{ColumnId, FileId, RegionId};
42use strum::IntoStaticStr;
43use tokio::sync::mpsc::Sender;
44
45use crate::access_layer::{AccessLayerRef, FilePathProvider, OperationType, RegionFilePathFactory};
46use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
47use crate::cache::write_cache::{UploadTracker, WriteCacheRef};
48use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
49use crate::error::{
50    BuildIndexAsyncSnafu, DecodeSnafu, Error, InvalidRecordBatchSnafu, RegionClosedSnafu,
51    RegionDroppedSnafu, RegionTruncatedSnafu, Result,
52};
53use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
54use crate::metrics::INDEX_CREATE_MEMORY_USAGE;
55use crate::read::{Batch, BatchReader};
56use crate::region::options::IndexOptions;
57use crate::region::version::VersionControlRef;
58use crate::region::{ManifestContextRef, RegionLeaderState};
59use crate::request::{
60    BackgroundNotify, IndexBuildFailed, IndexBuildFinished, IndexBuildStopped, WorkerRequest,
61    WorkerRequestWithTime,
62};
63use crate::schedule::scheduler::{Job, SchedulerRef};
64use crate::sst::file::{
65    ColumnIndexMetadata, FileHandle, FileMeta, IndexType, IndexTypes, RegionFileId, RegionIndexId,
66};
67use crate::sst::file_purger::FilePurgerRef;
68use crate::sst::index::fulltext_index::creator::FulltextIndexer;
69use crate::sst::index::intermediate::IntermediateManager;
70use crate::sst::index::inverted_index::creator::InvertedIndexer;
71use crate::sst::parquet::SstInfo;
72use crate::sst::parquet::flat_format::primary_key_column_index;
73use crate::sst::parquet::format::PrimaryKeyArray;
74use crate::worker::WorkerListener;
75
76pub(crate) const TYPE_INVERTED_INDEX: &str = "inverted_index";
77pub(crate) const TYPE_FULLTEXT_INDEX: &str = "fulltext_index";
78pub(crate) const TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
79
80/// Triggers background download of an index file to the local cache.
81pub(crate) fn trigger_index_background_download(
82    file_cache: Option<&FileCacheRef>,
83    file_id: &RegionIndexId,
84    file_size_hint: Option<u64>,
85    path_factory: &RegionFilePathFactory,
86    object_store: &ObjectStore,
87) {
88    if let (Some(file_cache), Some(file_size)) = (file_cache, file_size_hint) {
89        let index_key = IndexKey::new(
90            file_id.region_id(),
91            file_id.file_id(),
92            FileType::Puffin(file_id.version),
93        );
94        let remote_path = path_factory.build_index_file_path(file_id.file_id);
95        file_cache.maybe_download_background(
96            index_key,
97            remote_path,
98            object_store.clone(),
99            file_size,
100        );
101    }
102}
103
104/// Output of the index creation.
105#[derive(Debug, Clone, Default)]
106pub struct IndexOutput {
107    /// Size of the file.
108    pub file_size: u64,
109    /// Index version.
110    pub version: u64,
111    /// Inverted index output.
112    pub inverted_index: InvertedIndexOutput,
113    /// Fulltext index output.
114    pub fulltext_index: FulltextIndexOutput,
115    /// Bloom filter output.
116    pub bloom_filter: BloomFilterOutput,
117}
118
119impl IndexOutput {
120    pub fn build_available_indexes(&self) -> SmallVec<[IndexType; 4]> {
121        let mut indexes = SmallVec::new();
122        if self.inverted_index.is_available() {
123            indexes.push(IndexType::InvertedIndex);
124        }
125        if self.fulltext_index.is_available() {
126            indexes.push(IndexType::FulltextIndex);
127        }
128        if self.bloom_filter.is_available() {
129            indexes.push(IndexType::BloomFilterIndex);
130        }
131        indexes
132    }
133
134    pub fn build_indexes(&self) -> Vec<ColumnIndexMetadata> {
135        let mut map: HashMap<ColumnId, IndexTypes> = HashMap::new();
136
137        if self.inverted_index.is_available() {
138            for &col in &self.inverted_index.columns {
139                map.entry(col).or_default().push(IndexType::InvertedIndex);
140            }
141        }
142        if self.fulltext_index.is_available() {
143            for &col in &self.fulltext_index.columns {
144                map.entry(col).or_default().push(IndexType::FulltextIndex);
145            }
146        }
147        if self.bloom_filter.is_available() {
148            for &col in &self.bloom_filter.columns {
149                map.entry(col)
150                    .or_default()
151                    .push(IndexType::BloomFilterIndex);
152            }
153        }
154
155        map.into_iter()
156            .map(|(column_id, created_indexes)| ColumnIndexMetadata {
157                column_id,
158                created_indexes,
159            })
160            .collect::<Vec<_>>()
161    }
162}
163
164/// Base output of the index creation.
165#[derive(Debug, Clone, Default)]
166pub struct IndexBaseOutput {
167    /// Size of the index.
168    pub index_size: ByteCount,
169    /// Number of rows in the index.
170    pub row_count: RowCount,
171    /// Available columns in the index.
172    pub columns: Vec<ColumnId>,
173}
174
175impl IndexBaseOutput {
176    pub fn is_available(&self) -> bool {
177        self.index_size > 0
178    }
179}
180
181/// Output of the inverted index creation.
182pub type InvertedIndexOutput = IndexBaseOutput;
183/// Output of the fulltext index creation.
184pub type FulltextIndexOutput = IndexBaseOutput;
185/// Output of the bloom filter creation.
186pub type BloomFilterOutput = IndexBaseOutput;
187
188/// The index creator that hides the error handling details.
189#[derive(Default)]
190pub struct Indexer {
191    file_id: FileId,
192    region_id: RegionId,
193    index_version: u64,
194    puffin_manager: Option<SstPuffinManager>,
195    write_cache_enabled: bool,
196    inverted_indexer: Option<InvertedIndexer>,
197    last_mem_inverted_index: usize,
198    fulltext_indexer: Option<FulltextIndexer>,
199    last_mem_fulltext_index: usize,
200    bloom_filter_indexer: Option<BloomFilterIndexer>,
201    last_mem_bloom_filter: usize,
202    intermediate_manager: Option<IntermediateManager>,
203}
204
205impl Indexer {
206    /// Updates the index with the given batch.
207    pub async fn update(&mut self, batch: &mut Batch) {
208        self.do_update(batch).await;
209
210        self.flush_mem_metrics();
211    }
212
213    /// Updates the index with the given flat format RecordBatch.
214    pub async fn update_flat(&mut self, batch: &RecordBatch) {
215        self.do_update_flat(batch).await;
216
217        self.flush_mem_metrics();
218    }
219
220    /// Finalizes the index creation.
221    pub async fn finish(&mut self) -> IndexOutput {
222        let output = self.do_finish().await;
223
224        self.flush_mem_metrics();
225        output
226    }
227
228    /// Aborts the index creation.
229    pub async fn abort(&mut self) {
230        self.do_abort().await;
231
232        self.flush_mem_metrics();
233    }
234
235    fn flush_mem_metrics(&mut self) {
236        let inverted_mem = self
237            .inverted_indexer
238            .as_ref()
239            .map_or(0, |creator| creator.memory_usage());
240        INDEX_CREATE_MEMORY_USAGE
241            .with_label_values(&[TYPE_INVERTED_INDEX])
242            .add(inverted_mem as i64 - self.last_mem_inverted_index as i64);
243        self.last_mem_inverted_index = inverted_mem;
244
245        let fulltext_mem = self
246            .fulltext_indexer
247            .as_ref()
248            .map_or(0, |creator| creator.memory_usage());
249        INDEX_CREATE_MEMORY_USAGE
250            .with_label_values(&[TYPE_FULLTEXT_INDEX])
251            .add(fulltext_mem as i64 - self.last_mem_fulltext_index as i64);
252        self.last_mem_fulltext_index = fulltext_mem;
253
254        let bloom_filter_mem = self
255            .bloom_filter_indexer
256            .as_ref()
257            .map_or(0, |creator| creator.memory_usage());
258        INDEX_CREATE_MEMORY_USAGE
259            .with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
260            .add(bloom_filter_mem as i64 - self.last_mem_bloom_filter as i64);
261        self.last_mem_bloom_filter = bloom_filter_mem;
262    }
263}
264
265#[async_trait::async_trait]
266pub trait IndexerBuilder {
267    /// Builds indexer of given file id to [index_file_path].
268    async fn build(&self, file_id: FileId, index_version: u64) -> Indexer;
269}
270#[derive(Clone)]
271pub(crate) struct IndexerBuilderImpl {
272    pub(crate) build_type: IndexBuildType,
273    pub(crate) metadata: RegionMetadataRef,
274    pub(crate) row_group_size: usize,
275    pub(crate) puffin_manager: SstPuffinManager,
276    pub(crate) write_cache_enabled: bool,
277    pub(crate) intermediate_manager: IntermediateManager,
278    pub(crate) index_options: IndexOptions,
279    pub(crate) inverted_index_config: InvertedIndexConfig,
280    pub(crate) fulltext_index_config: FulltextIndexConfig,
281    pub(crate) bloom_filter_index_config: BloomFilterConfig,
282}
283
284#[async_trait::async_trait]
285impl IndexerBuilder for IndexerBuilderImpl {
286    /// Sanity check for arguments and create a new [Indexer] if arguments are valid.
287    async fn build(&self, file_id: FileId, index_version: u64) -> Indexer {
288        let mut indexer = Indexer {
289            file_id,
290            region_id: self.metadata.region_id,
291            index_version,
292            write_cache_enabled: self.write_cache_enabled,
293            ..Default::default()
294        };
295
296        indexer.inverted_indexer = self.build_inverted_indexer(file_id);
297        indexer.fulltext_indexer = self.build_fulltext_indexer(file_id).await;
298        indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(file_id);
299        indexer.intermediate_manager = Some(self.intermediate_manager.clone());
300        if indexer.inverted_indexer.is_none()
301            && indexer.fulltext_indexer.is_none()
302            && indexer.bloom_filter_indexer.is_none()
303        {
304            indexer.abort().await;
305            return Indexer::default();
306        }
307
308        indexer.puffin_manager = Some(self.puffin_manager.clone());
309        indexer
310    }
311}
312
313impl IndexerBuilderImpl {
314    fn build_inverted_indexer(&self, file_id: FileId) -> Option<InvertedIndexer> {
315        let create = match self.build_type {
316            IndexBuildType::Flush => self.inverted_index_config.create_on_flush.auto(),
317            IndexBuildType::Compact => self.inverted_index_config.create_on_compaction.auto(),
318            _ => true,
319        };
320
321        if !create {
322            debug!(
323                "Skip creating inverted index due to config, region_id: {}, file_id: {}",
324                self.metadata.region_id, file_id,
325            );
326            return None;
327        }
328
329        let indexed_column_ids = self.metadata.inverted_indexed_column_ids(
330            self.index_options.inverted_index.ignore_column_ids.iter(),
331        );
332        if indexed_column_ids.is_empty() {
333            debug!(
334                "No columns to be indexed, skip creating inverted index, region_id: {}, file_id: {}",
335                self.metadata.region_id, file_id,
336            );
337            return None;
338        }
339
340        let Some(mut segment_row_count) =
341            NonZeroUsize::new(self.index_options.inverted_index.segment_row_count)
342        else {
343            warn!(
344                "Segment row count is 0, skip creating index, region_id: {}, file_id: {}",
345                self.metadata.region_id, file_id,
346            );
347            return None;
348        };
349
350        let Some(row_group_size) = NonZeroUsize::new(self.row_group_size) else {
351            warn!(
352                "Row group size is 0, skip creating index, region_id: {}, file_id: {}",
353                self.metadata.region_id, file_id,
354            );
355            return None;
356        };
357
358        // if segment row count not aligned with row group size, adjust it to be aligned.
359        if row_group_size.get() % segment_row_count.get() != 0 {
360            segment_row_count = row_group_size;
361        }
362
363        let indexer = InvertedIndexer::new(
364            file_id,
365            &self.metadata,
366            self.intermediate_manager.clone(),
367            self.inverted_index_config.mem_threshold_on_create(),
368            segment_row_count,
369            indexed_column_ids,
370        );
371
372        Some(indexer)
373    }
374
375    async fn build_fulltext_indexer(&self, file_id: FileId) -> Option<FulltextIndexer> {
376        let create = match self.build_type {
377            IndexBuildType::Flush => self.fulltext_index_config.create_on_flush.auto(),
378            IndexBuildType::Compact => self.fulltext_index_config.create_on_compaction.auto(),
379            _ => true,
380        };
381
382        if !create {
383            debug!(
384                "Skip creating full-text index due to config, region_id: {}, file_id: {}",
385                self.metadata.region_id, file_id,
386            );
387            return None;
388        }
389
390        let mem_limit = self.fulltext_index_config.mem_threshold_on_create();
391        let creator = FulltextIndexer::new(
392            &self.metadata.region_id,
393            &file_id,
394            &self.intermediate_manager,
395            &self.metadata,
396            self.fulltext_index_config.compress,
397            mem_limit,
398        )
399        .await;
400
401        let err = match creator {
402            Ok(creator) => {
403                if creator.is_none() {
404                    debug!(
405                        "Skip creating full-text index due to no columns require indexing, region_id: {}, file_id: {}",
406                        self.metadata.region_id, file_id,
407                    );
408                }
409                return creator;
410            }
411            Err(err) => err,
412        };
413
414        if cfg!(any(test, feature = "test")) {
415            panic!(
416                "Failed to create full-text indexer, region_id: {}, file_id: {}, err: {:?}",
417                self.metadata.region_id, file_id, err
418            );
419        } else {
420            warn!(
421                err; "Failed to create full-text indexer, region_id: {}, file_id: {}",
422                self.metadata.region_id, file_id,
423            );
424        }
425
426        None
427    }
428
429    fn build_bloom_filter_indexer(&self, file_id: FileId) -> Option<BloomFilterIndexer> {
430        let create = match self.build_type {
431            IndexBuildType::Flush => self.bloom_filter_index_config.create_on_flush.auto(),
432            IndexBuildType::Compact => self.bloom_filter_index_config.create_on_compaction.auto(),
433            _ => true,
434        };
435
436        if !create {
437            debug!(
438                "Skip creating bloom filter due to config, region_id: {}, file_id: {}",
439                self.metadata.region_id, file_id,
440            );
441            return None;
442        }
443
444        let mem_limit = self.bloom_filter_index_config.mem_threshold_on_create();
445        let indexer = BloomFilterIndexer::new(
446            file_id,
447            &self.metadata,
448            self.intermediate_manager.clone(),
449            mem_limit,
450        );
451
452        let err = match indexer {
453            Ok(indexer) => {
454                if indexer.is_none() {
455                    debug!(
456                        "Skip creating bloom filter due to no columns require indexing, region_id: {}, file_id: {}",
457                        self.metadata.region_id, file_id,
458                    );
459                }
460                return indexer;
461            }
462            Err(err) => err,
463        };
464
465        if cfg!(any(test, feature = "test")) {
466            panic!(
467                "Failed to create bloom filter, region_id: {}, file_id: {}, err: {:?}",
468                self.metadata.region_id, file_id, err
469            );
470        } else {
471            warn!(
472                err; "Failed to create bloom filter, region_id: {}, file_id: {}",
473                self.metadata.region_id, file_id,
474            );
475        }
476
477        None
478    }
479}
480
481/// Type of an index build task.
482#[derive(Debug, Clone, IntoStaticStr, PartialEq)]
483pub enum IndexBuildType {
484    /// Build index when schema change.
485    SchemaChange,
486    /// Create or update index after flush.
487    Flush,
488    /// Create or update index after compact.
489    Compact,
490    /// Manually build index.
491    Manual,
492}
493
494impl IndexBuildType {
495    fn as_str(&self) -> &'static str {
496        self.into()
497    }
498
499    // Higher value means higher priority.
500    fn priority(&self) -> u8 {
501        match self {
502            IndexBuildType::Manual => 3,
503            IndexBuildType::SchemaChange => 2,
504            IndexBuildType::Flush => 1,
505            IndexBuildType::Compact => 0,
506        }
507    }
508}
509
510impl From<OperationType> for IndexBuildType {
511    fn from(op_type: OperationType) -> Self {
512        match op_type {
513            OperationType::Flush => IndexBuildType::Flush,
514            OperationType::Compact => IndexBuildType::Compact,
515        }
516    }
517}
518
519/// Outcome of an index build task.
520#[derive(Debug, Clone, PartialEq, Eq, Hash)]
521pub enum IndexBuildOutcome {
522    Finished,
523    Aborted(String),
524}
525
526/// Mpsc output result sender.
527pub type ResultMpscSender = Sender<Result<IndexBuildOutcome>>;
528
529#[derive(Clone)]
530pub struct IndexBuildTask {
531    /// The SST file handle to build index for.
532    pub file: FileHandle,
533    /// The file meta to build index for.
534    pub file_meta: FileMeta,
535    pub reason: IndexBuildType,
536    pub access_layer: AccessLayerRef,
537    pub(crate) listener: WorkerListener,
538    pub(crate) manifest_ctx: ManifestContextRef,
539    pub write_cache: Option<WriteCacheRef>,
540    pub file_purger: FilePurgerRef,
541    /// When write cache is enabled, the indexer builder should be built from the write cache.
542    /// Otherwise, it should be built from the access layer.
543    pub indexer_builder: Arc<dyn IndexerBuilder + Send + Sync>,
544    /// Request sender to notify the region worker.
545    pub(crate) request_sender: Sender<WorkerRequestWithTime>,
546    /// Index build result sender.
547    pub(crate) result_sender: ResultMpscSender,
548}
549
550impl std::fmt::Debug for IndexBuildTask {
551    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
552        f.debug_struct("IndexBuildTask")
553            .field("region_id", &self.file_meta.region_id)
554            .field("file_id", &self.file_meta.file_id)
555            .field("reason", &self.reason)
556            .finish()
557    }
558}
559
560impl IndexBuildTask {
561    /// Notify the caller the job is success.
562    pub async fn on_success(&self, outcome: IndexBuildOutcome) {
563        let _ = self.result_sender.send(Ok(outcome)).await;
564    }
565
566    /// Send index build error to waiter.
567    pub async fn on_failure(&self, err: Arc<Error>) {
568        let _ = self
569            .result_sender
570            .send(Err(err.clone()).context(BuildIndexAsyncSnafu {
571                region_id: self.file_meta.region_id,
572            }))
573            .await;
574    }
575
576    fn into_index_build_job(mut self, version_control: VersionControlRef) -> Job {
577        Box::pin(async move {
578            self.do_index_build(version_control).await;
579        })
580    }
581
582    async fn do_index_build(&mut self, version_control: VersionControlRef) {
583        self.listener
584            .on_index_build_begin(RegionFileId::new(
585                self.file_meta.region_id,
586                self.file_meta.file_id,
587            ))
588            .await;
589        match self.index_build(version_control).await {
590            Ok(outcome) => self.on_success(outcome).await,
591            Err(e) => {
592                warn!(
593                    e; "Index build task failed, region: {}, file_id: {}",
594                    self.file_meta.region_id, self.file_meta.file_id,
595                );
596                self.on_failure(e.into()).await
597            }
598        }
599        let worker_request = WorkerRequest::Background {
600            region_id: self.file_meta.region_id,
601            notify: BackgroundNotify::IndexBuildStopped(IndexBuildStopped {
602                region_id: self.file_meta.region_id,
603                file_id: self.file_meta.file_id,
604            }),
605        };
606        let _ = self
607            .request_sender
608            .send(WorkerRequestWithTime::new(worker_request))
609            .await;
610    }
611
612    // Checks if the SST file still exists in object store and version to avoid conflict with compaction.
613    async fn check_sst_file_exists(&self, version_control: &VersionControlRef) -> bool {
614        let file_id = self.file_meta.file_id;
615        let level = self.file_meta.level;
616        // We should check current version instead of the version when the job is created.
617        let version = version_control.current().version;
618
619        let Some(level_files) = version.ssts.levels().get(level as usize) else {
620            warn!(
621                "File id {} not found in level {} for index build, region: {}",
622                file_id, level, self.file_meta.region_id
623            );
624            return false;
625        };
626
627        match level_files.files.get(&file_id) {
628            Some(handle) if !handle.is_deleted() && !handle.compacting() => {
629                // If the file's metadata is present in the current version, the physical SST file
630                // is guaranteed to exist on object store. The file purger removes the physical
631                // file only after its metadata is removed from the version.
632                true
633            }
634            _ => {
635                warn!(
636                    "File id {} not found in region version for index build, region: {}",
637                    file_id, self.file_meta.region_id
638                );
639                false
640            }
641        }
642    }
643
644    async fn index_build(
645        &mut self,
646        version_control: VersionControlRef,
647    ) -> Result<IndexBuildOutcome> {
648        // Determine the new index version
649        let new_index_version = if self.file_meta.index_file_size > 0 {
650            // Increment version if index file exists to avoid overwrite.
651            self.file_meta.index_version + 1
652        } else {
653            0 // Default version for new index files
654        };
655
656        // Use the same file_id but with new version for index file
657        let index_file_id = self.file_meta.file_id;
658        let mut indexer = self
659            .indexer_builder
660            .build(index_file_id, new_index_version)
661            .await;
662
663        // Check SST file existence before building index to avoid failure of parquet reader.
664        if !self.check_sst_file_exists(&version_control).await {
665            // Calls abort to clean up index files.
666            indexer.abort().await;
667            self.listener
668                .on_index_build_abort(RegionFileId::new(
669                    self.file_meta.region_id,
670                    self.file_meta.file_id,
671                ))
672                .await;
673            return Ok(IndexBuildOutcome::Aborted(format!(
674                "SST file not found during index build, region: {}, file_id: {}",
675                self.file_meta.region_id, self.file_meta.file_id
676            )));
677        }
678
679        let mut parquet_reader = self
680            .access_layer
681            .read_sst(self.file.clone()) // use the latest file handle instead of creating a new one
682            .build()
683            .await?;
684
685        // TODO(SNC123): optimize index batch
686        loop {
687            match parquet_reader.next_batch().await {
688                Ok(Some(mut batch)) => {
689                    indexer.update(&mut batch).await;
690                }
691                Ok(None) => break,
692                Err(e) => {
693                    indexer.abort().await;
694                    return Err(e);
695                }
696            }
697        }
698        let index_output = indexer.finish().await;
699
700        if index_output.file_size > 0 {
701            // Check SST file existence again after building index.
702            if !self.check_sst_file_exists(&version_control).await {
703                // Calls abort to clean up index files.
704                indexer.abort().await;
705                self.listener
706                    .on_index_build_abort(RegionFileId::new(
707                        self.file_meta.region_id,
708                        self.file_meta.file_id,
709                    ))
710                    .await;
711                return Ok(IndexBuildOutcome::Aborted(format!(
712                    "SST file not found during index build, region: {}, file_id: {}",
713                    self.file_meta.region_id, self.file_meta.file_id
714                )));
715            }
716
717            // Upload index file if write cache is enabled.
718            self.maybe_upload_index_file(index_output.clone(), index_file_id, new_index_version)
719                .await?;
720
721            let worker_request = match self.update_manifest(index_output, new_index_version).await {
722                Ok(edit) => {
723                    let index_build_finished = IndexBuildFinished {
724                        region_id: self.file_meta.region_id,
725                        edit,
726                    };
727                    WorkerRequest::Background {
728                        region_id: self.file_meta.region_id,
729                        notify: BackgroundNotify::IndexBuildFinished(index_build_finished),
730                    }
731                }
732                Err(e) => {
733                    let err = Arc::new(e);
734                    WorkerRequest::Background {
735                        region_id: self.file_meta.region_id,
736                        notify: BackgroundNotify::IndexBuildFailed(IndexBuildFailed { err }),
737                    }
738                }
739            };
740
741            let _ = self
742                .request_sender
743                .send(WorkerRequestWithTime::new(worker_request))
744                .await;
745        }
746        Ok(IndexBuildOutcome::Finished)
747    }
748
749    async fn maybe_upload_index_file(
750        &self,
751        output: IndexOutput,
752        index_file_id: FileId,
753        index_version: u64,
754    ) -> Result<()> {
755        if let Some(write_cache) = &self.write_cache {
756            let file_id = self.file_meta.file_id;
757            let region_id = self.file_meta.region_id;
758            let remote_store = self.access_layer.object_store();
759            let mut upload_tracker = UploadTracker::new(region_id);
760            let mut err = None;
761            let puffin_key =
762                IndexKey::new(region_id, index_file_id, FileType::Puffin(output.version));
763            let index_id = RegionIndexId::new(RegionFileId::new(region_id, file_id), index_version);
764            let puffin_path = RegionFilePathFactory::new(
765                self.access_layer.table_dir().to_string(),
766                self.access_layer.path_type(),
767            )
768            .build_index_file_path_with_version(index_id);
769            if let Err(e) = write_cache
770                .upload(puffin_key, &puffin_path, remote_store)
771                .await
772            {
773                err = Some(e);
774            }
775            upload_tracker.push_uploaded_file(puffin_path);
776            if let Some(err) = err {
777                // Cleans index files on failure.
778                upload_tracker
779                    .clean(
780                        &smallvec![SstInfo {
781                            file_id,
782                            index_metadata: output,
783                            ..Default::default()
784                        }],
785                        &write_cache.file_cache(),
786                        remote_store,
787                    )
788                    .await;
789                return Err(err);
790            }
791        } else {
792            debug!("write cache is not available, skip uploading index file");
793        }
794        Ok(())
795    }
796
797    async fn update_manifest(
798        &mut self,
799        output: IndexOutput,
800        new_index_version: u64,
801    ) -> Result<RegionEdit> {
802        self.file_meta.available_indexes = output.build_available_indexes();
803        self.file_meta.indexes = output.build_indexes();
804        self.file_meta.index_file_size = output.file_size;
805        self.file_meta.index_version = new_index_version;
806        let edit = RegionEdit {
807            files_to_add: vec![self.file_meta.clone()],
808            files_to_remove: vec![],
809            timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
810            flushed_sequence: None,
811            flushed_entry_id: None,
812            committed_sequence: None,
813            compaction_time_window: None,
814        };
815        let version = self
816            .manifest_ctx
817            .update_manifest(
818                RegionLeaderState::Writable,
819                RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())),
820                false,
821            )
822            .await?;
823        info!(
824            "Successfully update manifest version to {version}, region: {}, reason: {}",
825            self.file_meta.region_id,
826            self.reason.as_str()
827        );
828        Ok(edit)
829    }
830}
831
832impl PartialEq for IndexBuildTask {
833    fn eq(&self, other: &Self) -> bool {
834        self.reason.priority() == other.reason.priority()
835    }
836}
837
838impl Eq for IndexBuildTask {}
839
840impl PartialOrd for IndexBuildTask {
841    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
842        Some(self.cmp(other))
843    }
844}
845
846impl Ord for IndexBuildTask {
847    fn cmp(&self, other: &Self) -> Ordering {
848        self.reason.priority().cmp(&other.reason.priority())
849    }
850}
851
852/// Tracks the index build status of a region scheduled by the [IndexBuildScheduler].
853pub struct IndexBuildStatus {
854    pub region_id: RegionId,
855    pub building_files: HashSet<FileId>,
856    pub pending_tasks: BinaryHeap<IndexBuildTask>,
857}
858
859impl IndexBuildStatus {
860    pub fn new(region_id: RegionId) -> Self {
861        IndexBuildStatus {
862            region_id,
863            building_files: HashSet::new(),
864            pending_tasks: BinaryHeap::new(),
865        }
866    }
867
868    async fn on_failure(self, err: Arc<Error>) {
869        for task in self.pending_tasks {
870            task.on_failure(err.clone()).await;
871        }
872    }
873}
874
875pub struct IndexBuildScheduler {
876    /// Background job scheduler.
877    scheduler: SchedulerRef,
878    /// Tracks regions need to build index.
879    region_status: HashMap<RegionId, IndexBuildStatus>,
880    /// Limit of files allowed to build index concurrently for a region.
881    files_limit: usize,
882}
883
884/// Manager background index build tasks of a worker.
885impl IndexBuildScheduler {
886    pub fn new(scheduler: SchedulerRef, files_limit: usize) -> Self {
887        IndexBuildScheduler {
888            scheduler,
889            region_status: HashMap::new(),
890            files_limit,
891        }
892    }
893
894    pub(crate) async fn schedule_build(
895        &mut self,
896        version_control: &VersionControlRef,
897        task: IndexBuildTask,
898    ) -> Result<()> {
899        let status = self
900            .region_status
901            .entry(task.file_meta.region_id)
902            .or_insert_with(|| IndexBuildStatus::new(task.file_meta.region_id));
903
904        if status.building_files.contains(&task.file_meta.file_id) {
905            let region_file_id =
906                RegionFileId::new(task.file_meta.region_id, task.file_meta.file_id);
907            debug!(
908                "Aborting index build task since index is already being built for region file {:?}",
909                region_file_id
910            );
911            task.on_success(IndexBuildOutcome::Aborted(format!(
912                "Index is already being built for region file {:?}",
913                region_file_id
914            )))
915            .await;
916            task.listener.on_index_build_abort(region_file_id).await;
917            return Ok(());
918        }
919
920        status.pending_tasks.push(task);
921
922        self.schedule_next_build_batch(version_control);
923        Ok(())
924    }
925
926    /// Schedule tasks until reaching the files limit or no more tasks.
927    fn schedule_next_build_batch(&mut self, version_control: &VersionControlRef) {
928        let mut building_count = 0;
929        for status in self.region_status.values() {
930            building_count += status.building_files.len();
931        }
932
933        while building_count < self.files_limit {
934            if let Some(task) = self.find_next_task() {
935                let region_id = task.file_meta.region_id;
936                let file_id = task.file_meta.file_id;
937                let job = task.into_index_build_job(version_control.clone());
938                if self.scheduler.schedule(job).is_ok() {
939                    if let Some(status) = self.region_status.get_mut(&region_id) {
940                        status.building_files.insert(file_id);
941                        building_count += 1;
942                        status
943                            .pending_tasks
944                            .retain(|t| t.file_meta.file_id != file_id);
945                    } else {
946                        error!(
947                            "Region status not found when scheduling index build task, region: {}",
948                            region_id
949                        );
950                    }
951                } else {
952                    error!(
953                        "Failed to schedule index build job, region: {}, file_id: {}",
954                        region_id, file_id
955                    );
956                }
957            } else {
958                // No more tasks to schedule.
959                break;
960            }
961        }
962    }
963
964    /// Find the next task which has the highest priority to run.
965    fn find_next_task(&self) -> Option<IndexBuildTask> {
966        self.region_status
967            .iter()
968            .filter_map(|(_, status)| status.pending_tasks.peek())
969            .max()
970            .cloned()
971    }
972
973    pub(crate) fn on_task_stopped(
974        &mut self,
975        region_id: RegionId,
976        file_id: FileId,
977        version_control: &VersionControlRef,
978    ) {
979        if let Some(status) = self.region_status.get_mut(&region_id) {
980            status.building_files.remove(&file_id);
981            if status.building_files.is_empty() && status.pending_tasks.is_empty() {
982                // No more tasks for this region, remove it.
983                self.region_status.remove(&region_id);
984            }
985        }
986
987        self.schedule_next_build_batch(version_control);
988    }
989
990    pub(crate) async fn on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
991        error!(
992            err; "Index build scheduler encountered failure for region {}, removing all pending tasks.",
993            region_id
994        );
995        let Some(status) = self.region_status.remove(&region_id) else {
996            return;
997        };
998        status.on_failure(err).await;
999    }
1000
1001    /// Notifies the scheduler that the region is dropped.
1002    pub(crate) async fn on_region_dropped(&mut self, region_id: RegionId) {
1003        self.remove_region_on_failure(
1004            region_id,
1005            Arc::new(RegionDroppedSnafu { region_id }.build()),
1006        )
1007        .await;
1008    }
1009
1010    /// Notifies the scheduler that the region is closed.
1011    pub(crate) async fn on_region_closed(&mut self, region_id: RegionId) {
1012        self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()))
1013            .await;
1014    }
1015
1016    /// Notifies the scheduler that the region is truncated.
1017    pub(crate) async fn on_region_truncated(&mut self, region_id: RegionId) {
1018        self.remove_region_on_failure(
1019            region_id,
1020            Arc::new(RegionTruncatedSnafu { region_id }.build()),
1021        )
1022        .await;
1023    }
1024
1025    async fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
1026        let Some(status) = self.region_status.remove(&region_id) else {
1027            return;
1028        };
1029        status.on_failure(err).await;
1030    }
1031}
1032
1033/// Decodes primary keys from a flat format RecordBatch.
1034/// Returns a list of (decoded_pk_value, count) tuples where count is the number of occurrences.
1035pub(crate) fn decode_primary_keys_with_counts(
1036    batch: &RecordBatch,
1037    codec: &IndexValuesCodec,
1038) -> Result<Vec<(CompositeValues, usize)>> {
1039    let primary_key_index = primary_key_column_index(batch.num_columns());
1040    let pk_dict_array = batch
1041        .column(primary_key_index)
1042        .as_any()
1043        .downcast_ref::<PrimaryKeyArray>()
1044        .context(InvalidRecordBatchSnafu {
1045            reason: "Primary key column is not a dictionary array",
1046        })?;
1047    let pk_values_array = pk_dict_array
1048        .values()
1049        .as_any()
1050        .downcast_ref::<BinaryArray>()
1051        .context(InvalidRecordBatchSnafu {
1052            reason: "Primary key values are not binary array",
1053        })?;
1054    let keys = pk_dict_array.keys();
1055
1056    // Decodes primary keys and count consecutive occurrences
1057    let mut result: Vec<(CompositeValues, usize)> = Vec::new();
1058    let mut prev_key: Option<u32> = None;
1059
1060    for i in 0..keys.len() {
1061        let current_key = keys.value(i);
1062
1063        // Checks if current key is the same as previous key
1064        if let Some(prev) = prev_key
1065            && prev == current_key
1066        {
1067            // Safety: We already have a key in the result vector.
1068            result.last_mut().unwrap().1 += 1;
1069            continue;
1070        }
1071
1072        // New key, decodes it.
1073        let pk_bytes = pk_values_array.value(current_key as usize);
1074        let decoded_value = codec.decoder().decode(pk_bytes).context(DecodeSnafu)?;
1075
1076        result.push((decoded_value, 1));
1077        prev_key = Some(current_key);
1078    }
1079
1080    Ok(result)
1081}
1082
1083#[cfg(test)]
1084mod tests {
1085    use std::sync::Arc;
1086
1087    use api::v1::SemanticType;
1088    use common_base::readable_size::ReadableSize;
1089    use datafusion_common::HashMap;
1090    use datatypes::data_type::ConcreteDataType;
1091    use datatypes::schema::{
1092        ColumnSchema, FulltextOptions, SkippingIndexOptions, SkippingIndexType,
1093    };
1094    use object_store::ObjectStore;
1095    use object_store::services::Memory;
1096    use puffin_manager::PuffinManagerFactory;
1097    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1098    use tokio::sync::mpsc;
1099
1100    use super::*;
1101    use crate::access_layer::{FilePathProvider, Metrics, SstWriteRequest, WriteType};
1102    use crate::cache::write_cache::WriteCache;
1103    use crate::config::{FulltextIndexConfig, IndexBuildMode, MitoConfig, Mode};
1104    use crate::memtable::time_partition::TimePartitions;
1105    use crate::region::version::{VersionBuilder, VersionControl};
1106    use crate::sst::file::RegionFileId;
1107    use crate::sst::file_purger::NoopFilePurger;
1108    use crate::sst::location;
1109    use crate::sst::parquet::WriteOptions;
1110    use crate::test_util::memtable_util::EmptyMemtableBuilder;
1111    use crate::test_util::scheduler_util::SchedulerEnv;
1112    use crate::test_util::sst_util::{new_batch_by_range, new_source, sst_region_metadata};
1113
1114    struct MetaConfig {
1115        with_inverted: bool,
1116        with_fulltext: bool,
1117        with_skipping_bloom: bool,
1118    }
1119
1120    fn mock_region_metadata(
1121        MetaConfig {
1122            with_inverted,
1123            with_fulltext,
1124            with_skipping_bloom,
1125        }: MetaConfig,
1126    ) -> RegionMetadataRef {
1127        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
1128        let mut column_schema = ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false);
1129        if with_inverted {
1130            column_schema = column_schema.with_inverted_index(true);
1131        }
1132        builder
1133            .push_column_metadata(ColumnMetadata {
1134                column_schema,
1135                semantic_type: SemanticType::Field,
1136                column_id: 1,
1137            })
1138            .push_column_metadata(ColumnMetadata {
1139                column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
1140                semantic_type: SemanticType::Field,
1141                column_id: 2,
1142            })
1143            .push_column_metadata(ColumnMetadata {
1144                column_schema: ColumnSchema::new(
1145                    "c",
1146                    ConcreteDataType::timestamp_millisecond_datatype(),
1147                    false,
1148                ),
1149                semantic_type: SemanticType::Timestamp,
1150                column_id: 3,
1151            });
1152
1153        if with_fulltext {
1154            let column_schema =
1155                ColumnSchema::new("text", ConcreteDataType::string_datatype(), true)
1156                    .with_fulltext_options(FulltextOptions {
1157                        enable: true,
1158                        ..Default::default()
1159                    })
1160                    .unwrap();
1161
1162            let column = ColumnMetadata {
1163                column_schema,
1164                semantic_type: SemanticType::Field,
1165                column_id: 4,
1166            };
1167
1168            builder.push_column_metadata(column);
1169        }
1170
1171        if with_skipping_bloom {
1172            let column_schema =
1173                ColumnSchema::new("bloom", ConcreteDataType::string_datatype(), false)
1174                    .with_skipping_options(SkippingIndexOptions::new_unchecked(
1175                        42,
1176                        0.01,
1177                        SkippingIndexType::BloomFilter,
1178                    ))
1179                    .unwrap();
1180
1181            let column = ColumnMetadata {
1182                column_schema,
1183                semantic_type: SemanticType::Field,
1184                column_id: 5,
1185            };
1186
1187            builder.push_column_metadata(column);
1188        }
1189
1190        Arc::new(builder.build().unwrap())
1191    }
1192
1193    fn mock_object_store() -> ObjectStore {
1194        ObjectStore::new(Memory::default()).unwrap().finish()
1195    }
1196
1197    async fn mock_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
1198        IntermediateManager::init_fs(path).await.unwrap()
1199    }
1200    struct NoopPathProvider;
1201
1202    impl FilePathProvider for NoopPathProvider {
1203        fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
1204            unreachable!()
1205        }
1206
1207        fn build_index_file_path_with_version(&self, _index_id: RegionIndexId) -> String {
1208            unreachable!()
1209        }
1210
1211        fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
1212            unreachable!()
1213        }
1214    }
1215
1216    async fn mock_sst_file(
1217        metadata: RegionMetadataRef,
1218        env: &SchedulerEnv,
1219        build_mode: IndexBuildMode,
1220    ) -> SstInfo {
1221        let source = new_source(&[
1222            new_batch_by_range(&["a", "d"], 0, 60),
1223            new_batch_by_range(&["b", "f"], 0, 40),
1224            new_batch_by_range(&["b", "h"], 100, 200),
1225        ]);
1226        let mut index_config = MitoConfig::default().index;
1227        index_config.build_mode = build_mode;
1228        let write_request = SstWriteRequest {
1229            op_type: OperationType::Flush,
1230            metadata: metadata.clone(),
1231            source: either::Left(source),
1232            storage: None,
1233            max_sequence: None,
1234            cache_manager: Default::default(),
1235            index_options: IndexOptions::default(),
1236            index_config,
1237            inverted_index_config: Default::default(),
1238            fulltext_index_config: Default::default(),
1239            bloom_filter_index_config: Default::default(),
1240        };
1241        let mut metrics = Metrics::new(WriteType::Flush);
1242        env.access_layer
1243            .write_sst(write_request, &WriteOptions::default(), &mut metrics)
1244            .await
1245            .unwrap()
1246            .remove(0)
1247    }
1248
1249    async fn mock_version_control(
1250        metadata: RegionMetadataRef,
1251        file_purger: FilePurgerRef,
1252        files: HashMap<FileId, FileMeta>,
1253    ) -> VersionControlRef {
1254        let mutable = Arc::new(TimePartitions::new(
1255            metadata.clone(),
1256            Arc::new(EmptyMemtableBuilder::default()),
1257            0,
1258            None,
1259        ));
1260        let version_builder = VersionBuilder::new(metadata, mutable)
1261            .add_files(file_purger, files.values().cloned())
1262            .build();
1263        Arc::new(VersionControl::new(version_builder))
1264    }
1265
1266    async fn mock_indexer_builder(
1267        metadata: RegionMetadataRef,
1268        env: &SchedulerEnv,
1269    ) -> Arc<dyn IndexerBuilder + Send + Sync> {
1270        let (dir, factory) = PuffinManagerFactory::new_for_test_async("mock_indexer_builder").await;
1271        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1272        let puffin_manager = factory.build(
1273            env.access_layer.object_store().clone(),
1274            RegionFilePathFactory::new(
1275                env.access_layer.table_dir().to_string(),
1276                env.access_layer.path_type(),
1277            ),
1278        );
1279        Arc::new(IndexerBuilderImpl {
1280            build_type: IndexBuildType::Flush,
1281            metadata,
1282            row_group_size: 1024,
1283            puffin_manager,
1284            write_cache_enabled: false,
1285            intermediate_manager: intm_manager,
1286            index_options: IndexOptions::default(),
1287            inverted_index_config: InvertedIndexConfig::default(),
1288            fulltext_index_config: FulltextIndexConfig::default(),
1289            bloom_filter_index_config: BloomFilterConfig::default(),
1290        })
1291    }
1292
1293    #[tokio::test]
1294    async fn test_build_indexer_basic() {
1295        let (dir, factory) =
1296            PuffinManagerFactory::new_for_test_async("test_build_indexer_basic_").await;
1297        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1298
1299        let metadata = mock_region_metadata(MetaConfig {
1300            with_inverted: true,
1301            with_fulltext: true,
1302            with_skipping_bloom: true,
1303        });
1304        let indexer = IndexerBuilderImpl {
1305            build_type: IndexBuildType::Flush,
1306            metadata,
1307            row_group_size: 1024,
1308            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1309            write_cache_enabled: false,
1310            intermediate_manager: intm_manager,
1311            index_options: IndexOptions::default(),
1312            inverted_index_config: InvertedIndexConfig::default(),
1313            fulltext_index_config: FulltextIndexConfig::default(),
1314            bloom_filter_index_config: BloomFilterConfig::default(),
1315        }
1316        .build(FileId::random(), 0)
1317        .await;
1318
1319        assert!(indexer.inverted_indexer.is_some());
1320        assert!(indexer.fulltext_indexer.is_some());
1321        assert!(indexer.bloom_filter_indexer.is_some());
1322    }
1323
1324    #[tokio::test]
1325    async fn test_build_indexer_disable_create() {
1326        let (dir, factory) =
1327            PuffinManagerFactory::new_for_test_async("test_build_indexer_disable_create_").await;
1328        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1329
1330        let metadata = mock_region_metadata(MetaConfig {
1331            with_inverted: true,
1332            with_fulltext: true,
1333            with_skipping_bloom: true,
1334        });
1335        let indexer = IndexerBuilderImpl {
1336            build_type: IndexBuildType::Flush,
1337            metadata: metadata.clone(),
1338            row_group_size: 1024,
1339            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1340            write_cache_enabled: false,
1341            intermediate_manager: intm_manager.clone(),
1342            index_options: IndexOptions::default(),
1343            inverted_index_config: InvertedIndexConfig {
1344                create_on_flush: Mode::Disable,
1345                ..Default::default()
1346            },
1347            fulltext_index_config: FulltextIndexConfig::default(),
1348            bloom_filter_index_config: BloomFilterConfig::default(),
1349        }
1350        .build(FileId::random(), 0)
1351        .await;
1352
1353        assert!(indexer.inverted_indexer.is_none());
1354        assert!(indexer.fulltext_indexer.is_some());
1355        assert!(indexer.bloom_filter_indexer.is_some());
1356
1357        let indexer = IndexerBuilderImpl {
1358            build_type: IndexBuildType::Compact,
1359            metadata: metadata.clone(),
1360            row_group_size: 1024,
1361            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1362            write_cache_enabled: false,
1363            intermediate_manager: intm_manager.clone(),
1364            index_options: IndexOptions::default(),
1365            inverted_index_config: InvertedIndexConfig::default(),
1366            fulltext_index_config: FulltextIndexConfig {
1367                create_on_compaction: Mode::Disable,
1368                ..Default::default()
1369            },
1370            bloom_filter_index_config: BloomFilterConfig::default(),
1371        }
1372        .build(FileId::random(), 0)
1373        .await;
1374
1375        assert!(indexer.inverted_indexer.is_some());
1376        assert!(indexer.fulltext_indexer.is_none());
1377        assert!(indexer.bloom_filter_indexer.is_some());
1378
1379        let indexer = IndexerBuilderImpl {
1380            build_type: IndexBuildType::Compact,
1381            metadata,
1382            row_group_size: 1024,
1383            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1384            write_cache_enabled: false,
1385            intermediate_manager: intm_manager,
1386            index_options: IndexOptions::default(),
1387            inverted_index_config: InvertedIndexConfig::default(),
1388            fulltext_index_config: FulltextIndexConfig::default(),
1389            bloom_filter_index_config: BloomFilterConfig {
1390                create_on_compaction: Mode::Disable,
1391                ..Default::default()
1392            },
1393        }
1394        .build(FileId::random(), 0)
1395        .await;
1396
1397        assert!(indexer.inverted_indexer.is_some());
1398        assert!(indexer.fulltext_indexer.is_some());
1399        assert!(indexer.bloom_filter_indexer.is_none());
1400    }
1401
1402    #[tokio::test]
1403    async fn test_build_indexer_no_required() {
1404        let (dir, factory) =
1405            PuffinManagerFactory::new_for_test_async("test_build_indexer_no_required_").await;
1406        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1407
1408        let metadata = mock_region_metadata(MetaConfig {
1409            with_inverted: false,
1410            with_fulltext: true,
1411            with_skipping_bloom: true,
1412        });
1413        let indexer = IndexerBuilderImpl {
1414            build_type: IndexBuildType::Flush,
1415            metadata: metadata.clone(),
1416            row_group_size: 1024,
1417            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1418            write_cache_enabled: false,
1419            intermediate_manager: intm_manager.clone(),
1420            index_options: IndexOptions::default(),
1421            inverted_index_config: InvertedIndexConfig::default(),
1422            fulltext_index_config: FulltextIndexConfig::default(),
1423            bloom_filter_index_config: BloomFilterConfig::default(),
1424        }
1425        .build(FileId::random(), 0)
1426        .await;
1427
1428        assert!(indexer.inverted_indexer.is_none());
1429        assert!(indexer.fulltext_indexer.is_some());
1430        assert!(indexer.bloom_filter_indexer.is_some());
1431
1432        let metadata = mock_region_metadata(MetaConfig {
1433            with_inverted: true,
1434            with_fulltext: false,
1435            with_skipping_bloom: true,
1436        });
1437        let indexer = IndexerBuilderImpl {
1438            build_type: IndexBuildType::Flush,
1439            metadata: metadata.clone(),
1440            row_group_size: 1024,
1441            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1442            write_cache_enabled: false,
1443            intermediate_manager: intm_manager.clone(),
1444            index_options: IndexOptions::default(),
1445            inverted_index_config: InvertedIndexConfig::default(),
1446            fulltext_index_config: FulltextIndexConfig::default(),
1447            bloom_filter_index_config: BloomFilterConfig::default(),
1448        }
1449        .build(FileId::random(), 0)
1450        .await;
1451
1452        assert!(indexer.inverted_indexer.is_some());
1453        assert!(indexer.fulltext_indexer.is_none());
1454        assert!(indexer.bloom_filter_indexer.is_some());
1455
1456        let metadata = mock_region_metadata(MetaConfig {
1457            with_inverted: true,
1458            with_fulltext: true,
1459            with_skipping_bloom: false,
1460        });
1461        let indexer = IndexerBuilderImpl {
1462            build_type: IndexBuildType::Flush,
1463            metadata: metadata.clone(),
1464            row_group_size: 1024,
1465            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1466            write_cache_enabled: false,
1467            intermediate_manager: intm_manager,
1468            index_options: IndexOptions::default(),
1469            inverted_index_config: InvertedIndexConfig::default(),
1470            fulltext_index_config: FulltextIndexConfig::default(),
1471            bloom_filter_index_config: BloomFilterConfig::default(),
1472        }
1473        .build(FileId::random(), 0)
1474        .await;
1475
1476        assert!(indexer.inverted_indexer.is_some());
1477        assert!(indexer.fulltext_indexer.is_some());
1478        assert!(indexer.bloom_filter_indexer.is_none());
1479    }
1480
1481    #[tokio::test]
1482    async fn test_build_indexer_zero_row_group() {
1483        let (dir, factory) =
1484            PuffinManagerFactory::new_for_test_async("test_build_indexer_zero_row_group_").await;
1485        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1486
1487        let metadata = mock_region_metadata(MetaConfig {
1488            with_inverted: true,
1489            with_fulltext: true,
1490            with_skipping_bloom: true,
1491        });
1492        let indexer = IndexerBuilderImpl {
1493            build_type: IndexBuildType::Flush,
1494            metadata,
1495            row_group_size: 0,
1496            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1497            write_cache_enabled: false,
1498            intermediate_manager: intm_manager,
1499            index_options: IndexOptions::default(),
1500            inverted_index_config: InvertedIndexConfig::default(),
1501            fulltext_index_config: FulltextIndexConfig::default(),
1502            bloom_filter_index_config: BloomFilterConfig::default(),
1503        }
1504        .build(FileId::random(), 0)
1505        .await;
1506
1507        assert!(indexer.inverted_indexer.is_none());
1508    }
1509
1510    #[tokio::test]
1511    async fn test_index_build_task_sst_not_exist() {
1512        let env = SchedulerEnv::new().await;
1513        let (tx, _rx) = mpsc::channel(4);
1514        let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1515        let mut scheduler = env.mock_index_build_scheduler(4);
1516        let metadata = Arc::new(sst_region_metadata());
1517        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1518        let file_purger = Arc::new(NoopFilePurger {});
1519        let files = HashMap::new();
1520        let version_control =
1521            mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1522        let region_id = metadata.region_id;
1523        let indexer_builder = mock_indexer_builder(metadata, &env).await;
1524
1525        let file_meta = FileMeta {
1526            region_id,
1527            file_id: FileId::random(),
1528            file_size: 100,
1529            ..Default::default()
1530        };
1531
1532        let file = FileHandle::new(file_meta.clone(), file_purger.clone());
1533
1534        // Create mock task.
1535        let task = IndexBuildTask {
1536            file,
1537            file_meta,
1538            reason: IndexBuildType::Flush,
1539            access_layer: env.access_layer.clone(),
1540            listener: WorkerListener::default(),
1541            manifest_ctx,
1542            write_cache: None,
1543            file_purger,
1544            indexer_builder,
1545            request_sender: tx,
1546            result_sender: result_tx,
1547        };
1548
1549        // Schedule the build task and check result.
1550        scheduler
1551            .schedule_build(&version_control, task)
1552            .await
1553            .unwrap();
1554        match result_rx.recv().await.unwrap() {
1555            Ok(outcome) => {
1556                if outcome == IndexBuildOutcome::Finished {
1557                    panic!("Expect aborted result due to missing SST file")
1558                }
1559            }
1560            _ => panic!("Expect aborted result due to missing SST file"),
1561        }
1562    }
1563
1564    #[tokio::test]
1565    async fn test_index_build_task_sst_exist() {
1566        let env = SchedulerEnv::new().await;
1567        let mut scheduler = env.mock_index_build_scheduler(4);
1568        let metadata = Arc::new(sst_region_metadata());
1569        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1570        let region_id = metadata.region_id;
1571        let file_purger = Arc::new(NoopFilePurger {});
1572        let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
1573        let file_meta = FileMeta {
1574            region_id,
1575            file_id: sst_info.file_id,
1576            file_size: sst_info.file_size,
1577            max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
1578            index_file_size: sst_info.index_metadata.file_size,
1579            num_rows: sst_info.num_rows as u64,
1580            num_row_groups: sst_info.num_row_groups,
1581            ..Default::default()
1582        };
1583        let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1584        let version_control =
1585            mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1586        let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1587
1588        let file = FileHandle::new(file_meta.clone(), file_purger.clone());
1589
1590        // Create mock task.
1591        let (tx, mut rx) = mpsc::channel(4);
1592        let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1593        let task = IndexBuildTask {
1594            file,
1595            file_meta: file_meta.clone(),
1596            reason: IndexBuildType::Flush,
1597            access_layer: env.access_layer.clone(),
1598            listener: WorkerListener::default(),
1599            manifest_ctx,
1600            write_cache: None,
1601            file_purger,
1602            indexer_builder,
1603            request_sender: tx,
1604            result_sender: result_tx,
1605        };
1606
1607        scheduler
1608            .schedule_build(&version_control, task)
1609            .await
1610            .unwrap();
1611
1612        // The task should finish successfully.
1613        match result_rx.recv().await.unwrap() {
1614            Ok(outcome) => {
1615                assert_eq!(outcome, IndexBuildOutcome::Finished);
1616            }
1617            _ => panic!("Expect finished result"),
1618        }
1619
1620        // A notification should be sent to the worker to update the manifest.
1621        let worker_req = rx.recv().await.unwrap().request;
1622        match worker_req {
1623            WorkerRequest::Background {
1624                region_id: req_region_id,
1625                notify: BackgroundNotify::IndexBuildFinished(finished),
1626            } => {
1627                assert_eq!(req_region_id, region_id);
1628                assert_eq!(finished.edit.files_to_add.len(), 1);
1629                let updated_meta = &finished.edit.files_to_add[0];
1630
1631                // The mock indexer builder creates all index types.
1632                assert!(!updated_meta.available_indexes.is_empty());
1633                assert!(updated_meta.index_file_size > 0);
1634                assert_eq!(updated_meta.file_id, file_meta.file_id);
1635            }
1636            _ => panic!("Unexpected worker request: {:?}", worker_req),
1637        }
1638    }
1639
1640    async fn schedule_index_build_task_with_mode(build_mode: IndexBuildMode) {
1641        let env = SchedulerEnv::new().await;
1642        let mut scheduler = env.mock_index_build_scheduler(4);
1643        let metadata = Arc::new(sst_region_metadata());
1644        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1645        let file_purger = Arc::new(NoopFilePurger {});
1646        let region_id = metadata.region_id;
1647        let sst_info = mock_sst_file(metadata.clone(), &env, build_mode.clone()).await;
1648        let file_meta = FileMeta {
1649            region_id,
1650            file_id: sst_info.file_id,
1651            file_size: sst_info.file_size,
1652            max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
1653            index_file_size: sst_info.index_metadata.file_size,
1654            num_rows: sst_info.num_rows as u64,
1655            num_row_groups: sst_info.num_row_groups,
1656            ..Default::default()
1657        };
1658        let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1659        let version_control =
1660            mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1661        let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1662
1663        let file = FileHandle::new(file_meta.clone(), file_purger.clone());
1664
1665        // Create mock task.
1666        let (tx, _rx) = mpsc::channel(4);
1667        let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1668        let task = IndexBuildTask {
1669            file,
1670            file_meta: file_meta.clone(),
1671            reason: IndexBuildType::Flush,
1672            access_layer: env.access_layer.clone(),
1673            listener: WorkerListener::default(),
1674            manifest_ctx,
1675            write_cache: None,
1676            file_purger,
1677            indexer_builder,
1678            request_sender: tx,
1679            result_sender: result_tx,
1680        };
1681
1682        scheduler
1683            .schedule_build(&version_control, task)
1684            .await
1685            .unwrap();
1686
1687        let puffin_path = location::index_file_path(
1688            env.access_layer.table_dir(),
1689            RegionIndexId::new(RegionFileId::new(region_id, file_meta.file_id), 0),
1690            env.access_layer.path_type(),
1691        );
1692
1693        if build_mode == IndexBuildMode::Async {
1694            // The index file should not exist before the task finishes.
1695            assert!(
1696                !env.access_layer
1697                    .object_store()
1698                    .exists(&puffin_path)
1699                    .await
1700                    .unwrap()
1701            );
1702        } else {
1703            // The index file should exist before the task finishes.
1704            assert!(
1705                env.access_layer
1706                    .object_store()
1707                    .exists(&puffin_path)
1708                    .await
1709                    .unwrap()
1710            );
1711        }
1712
1713        // The task should finish successfully.
1714        match result_rx.recv().await.unwrap() {
1715            Ok(outcome) => {
1716                assert_eq!(outcome, IndexBuildOutcome::Finished);
1717            }
1718            _ => panic!("Expect finished result"),
1719        }
1720
1721        // The index file should exist after the task finishes.
1722        assert!(
1723            env.access_layer
1724                .object_store()
1725                .exists(&puffin_path)
1726                .await
1727                .unwrap()
1728        );
1729    }
1730
1731    #[tokio::test]
1732    async fn test_index_build_task_build_mode() {
1733        schedule_index_build_task_with_mode(IndexBuildMode::Async).await;
1734        schedule_index_build_task_with_mode(IndexBuildMode::Sync).await;
1735    }
1736
1737    #[tokio::test]
1738    async fn test_index_build_task_no_index() {
1739        let env = SchedulerEnv::new().await;
1740        let mut scheduler = env.mock_index_build_scheduler(4);
1741        let mut metadata = sst_region_metadata();
1742        // Unset indexes in metadata to simulate no index scenario.
1743        metadata.column_metadatas.iter_mut().for_each(|col| {
1744            col.column_schema.set_inverted_index(false);
1745            let _ = col.column_schema.unset_skipping_options();
1746        });
1747        let region_id = metadata.region_id;
1748        let metadata = Arc::new(metadata);
1749        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1750        let file_purger = Arc::new(NoopFilePurger {});
1751        let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
1752        let file_meta = FileMeta {
1753            region_id,
1754            file_id: sst_info.file_id,
1755            file_size: sst_info.file_size,
1756            max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
1757            index_file_size: sst_info.index_metadata.file_size,
1758            num_rows: sst_info.num_rows as u64,
1759            num_row_groups: sst_info.num_row_groups,
1760            ..Default::default()
1761        };
1762        let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1763        let version_control =
1764            mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1765        let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1766
1767        let file = FileHandle::new(file_meta.clone(), file_purger.clone());
1768
1769        // Create mock task.
1770        let (tx, mut rx) = mpsc::channel(4);
1771        let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1772        let task = IndexBuildTask {
1773            file,
1774            file_meta: file_meta.clone(),
1775            reason: IndexBuildType::Flush,
1776            access_layer: env.access_layer.clone(),
1777            listener: WorkerListener::default(),
1778            manifest_ctx,
1779            write_cache: None,
1780            file_purger,
1781            indexer_builder,
1782            request_sender: tx,
1783            result_sender: result_tx,
1784        };
1785
1786        scheduler
1787            .schedule_build(&version_control, task)
1788            .await
1789            .unwrap();
1790
1791        // The task should finish successfully.
1792        match result_rx.recv().await.unwrap() {
1793            Ok(outcome) => {
1794                assert_eq!(outcome, IndexBuildOutcome::Finished);
1795            }
1796            _ => panic!("Expect finished result"),
1797        }
1798
1799        // No index is built, so no notification should be sent to the worker.
1800        let _ = rx.recv().await.is_none();
1801    }
1802
1803    #[tokio::test]
1804    async fn test_index_build_task_with_write_cache() {
1805        let env = SchedulerEnv::new().await;
1806        let mut scheduler = env.mock_index_build_scheduler(4);
1807        let metadata = Arc::new(sst_region_metadata());
1808        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1809        let file_purger = Arc::new(NoopFilePurger {});
1810        let region_id = metadata.region_id;
1811
1812        let (dir, factory) = PuffinManagerFactory::new_for_test_async("test_write_cache").await;
1813        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1814
1815        // Create mock write cache
1816        let write_cache = Arc::new(
1817            WriteCache::new_fs(
1818                dir.path().to_str().unwrap(),
1819                ReadableSize::mb(10),
1820                None,
1821                None,
1822                true, // enable_background_worker
1823                factory,
1824                intm_manager,
1825                ReadableSize::mb(10),
1826            )
1827            .await
1828            .unwrap(),
1829        );
1830        // Indexer builder built from write cache.
1831        let indexer_builder = Arc::new(IndexerBuilderImpl {
1832            build_type: IndexBuildType::Flush,
1833            metadata: metadata.clone(),
1834            row_group_size: 1024,
1835            puffin_manager: write_cache.build_puffin_manager().clone(),
1836            write_cache_enabled: true,
1837            intermediate_manager: write_cache.intermediate_manager().clone(),
1838            index_options: IndexOptions::default(),
1839            inverted_index_config: InvertedIndexConfig::default(),
1840            fulltext_index_config: FulltextIndexConfig::default(),
1841            bloom_filter_index_config: BloomFilterConfig::default(),
1842        });
1843
1844        let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
1845        let file_meta = FileMeta {
1846            region_id,
1847            file_id: sst_info.file_id,
1848            file_size: sst_info.file_size,
1849            index_file_size: sst_info.index_metadata.file_size,
1850            num_rows: sst_info.num_rows as u64,
1851            num_row_groups: sst_info.num_row_groups,
1852            ..Default::default()
1853        };
1854        let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1855        let version_control =
1856            mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1857
1858        let file = FileHandle::new(file_meta.clone(), file_purger.clone());
1859
1860        // Create mock task.
1861        let (tx, mut _rx) = mpsc::channel(4);
1862        let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1863        let task = IndexBuildTask {
1864            file,
1865            file_meta: file_meta.clone(),
1866            reason: IndexBuildType::Flush,
1867            access_layer: env.access_layer.clone(),
1868            listener: WorkerListener::default(),
1869            manifest_ctx,
1870            write_cache: Some(write_cache.clone()),
1871            file_purger,
1872            indexer_builder,
1873            request_sender: tx,
1874            result_sender: result_tx,
1875        };
1876
1877        scheduler
1878            .schedule_build(&version_control, task)
1879            .await
1880            .unwrap();
1881
1882        // The task should finish successfully.
1883        match result_rx.recv().await.unwrap() {
1884            Ok(outcome) => {
1885                assert_eq!(outcome, IndexBuildOutcome::Finished);
1886            }
1887            _ => panic!("Expect finished result"),
1888        }
1889
1890        // The write cache should contain the uploaded index file.
1891        let index_key = IndexKey::new(
1892            region_id,
1893            file_meta.file_id,
1894            FileType::Puffin(sst_info.index_metadata.version),
1895        );
1896        assert!(write_cache.file_cache().contains_key(&index_key));
1897    }
1898
1899    async fn create_mock_task_for_schedule(
1900        env: &SchedulerEnv,
1901        file_id: FileId,
1902        region_id: RegionId,
1903        reason: IndexBuildType,
1904    ) -> IndexBuildTask {
1905        let metadata = Arc::new(sst_region_metadata());
1906        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1907        let file_purger = Arc::new(NoopFilePurger {});
1908        let indexer_builder = mock_indexer_builder(metadata, env).await;
1909        let (tx, _rx) = mpsc::channel(4);
1910        let (result_tx, _result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1911
1912        let file_meta = FileMeta {
1913            region_id,
1914            file_id,
1915            file_size: 100,
1916            ..Default::default()
1917        };
1918
1919        let file = FileHandle::new(file_meta.clone(), file_purger.clone());
1920
1921        IndexBuildTask {
1922            file,
1923            file_meta,
1924            reason,
1925            access_layer: env.access_layer.clone(),
1926            listener: WorkerListener::default(),
1927            manifest_ctx,
1928            write_cache: None,
1929            file_purger,
1930            indexer_builder,
1931            request_sender: tx,
1932            result_sender: result_tx,
1933        }
1934    }
1935
1936    #[tokio::test]
1937    async fn test_scheduler_comprehensive() {
1938        let env = SchedulerEnv::new().await;
1939        let mut scheduler = env.mock_index_build_scheduler(2);
1940        let metadata = Arc::new(sst_region_metadata());
1941        let region_id = metadata.region_id;
1942        let file_purger = Arc::new(NoopFilePurger {});
1943
1944        // Prepare multiple files for testing
1945        let file_id1 = FileId::random();
1946        let file_id2 = FileId::random();
1947        let file_id3 = FileId::random();
1948        let file_id4 = FileId::random();
1949        let file_id5 = FileId::random();
1950
1951        let mut files = HashMap::new();
1952        for file_id in [file_id1, file_id2, file_id3, file_id4, file_id5] {
1953            files.insert(
1954                file_id,
1955                FileMeta {
1956                    region_id,
1957                    file_id,
1958                    file_size: 100,
1959                    ..Default::default()
1960                },
1961            );
1962        }
1963
1964        let version_control = mock_version_control(metadata, file_purger, files).await;
1965
1966        // Test 1: Basic scheduling
1967        let task1 =
1968            create_mock_task_for_schedule(&env, file_id1, region_id, IndexBuildType::Flush).await;
1969        assert!(
1970            scheduler
1971                .schedule_build(&version_control, task1)
1972                .await
1973                .is_ok()
1974        );
1975        assert!(scheduler.region_status.contains_key(&region_id));
1976        let status = scheduler.region_status.get(&region_id).unwrap();
1977        assert_eq!(status.building_files.len(), 1);
1978        assert!(status.building_files.contains(&file_id1));
1979
1980        // Test 2: Duplicate file scheduling (should be skipped)
1981        let task1_dup =
1982            create_mock_task_for_schedule(&env, file_id1, region_id, IndexBuildType::Flush).await;
1983        scheduler
1984            .schedule_build(&version_control, task1_dup)
1985            .await
1986            .unwrap();
1987        let status = scheduler.region_status.get(&region_id).unwrap();
1988        assert_eq!(status.building_files.len(), 1); // Still only one
1989
1990        // Test 3: Fill up to limit (2 building tasks)
1991        let task2 =
1992            create_mock_task_for_schedule(&env, file_id2, region_id, IndexBuildType::Flush).await;
1993        scheduler
1994            .schedule_build(&version_control, task2)
1995            .await
1996            .unwrap();
1997        let status = scheduler.region_status.get(&region_id).unwrap();
1998        assert_eq!(status.building_files.len(), 2); // Reached limit
1999        assert_eq!(status.pending_tasks.len(), 0);
2000
2001        // Test 4: Add tasks with different priorities to pending queue
2002        // Now all new tasks will be pending since we reached the limit
2003        let task3 =
2004            create_mock_task_for_schedule(&env, file_id3, region_id, IndexBuildType::Compact).await;
2005        let task4 =
2006            create_mock_task_for_schedule(&env, file_id4, region_id, IndexBuildType::SchemaChange)
2007                .await;
2008        let task5 =
2009            create_mock_task_for_schedule(&env, file_id5, region_id, IndexBuildType::Manual).await;
2010
2011        scheduler
2012            .schedule_build(&version_control, task3)
2013            .await
2014            .unwrap();
2015        scheduler
2016            .schedule_build(&version_control, task4)
2017            .await
2018            .unwrap();
2019        scheduler
2020            .schedule_build(&version_control, task5)
2021            .await
2022            .unwrap();
2023
2024        let status = scheduler.region_status.get(&region_id).unwrap();
2025        assert_eq!(status.building_files.len(), 2); // Still at limit
2026        assert_eq!(status.pending_tasks.len(), 3); // Three pending
2027
2028        // Test 5: Task completion triggers scheduling next highest priority task (Manual)
2029        scheduler.on_task_stopped(region_id, file_id1, &version_control);
2030        let status = scheduler.region_status.get(&region_id).unwrap();
2031        assert!(!status.building_files.contains(&file_id1));
2032        assert_eq!(status.building_files.len(), 2); // Should schedule next task
2033        assert_eq!(status.pending_tasks.len(), 2); // One less pending
2034        // The highest priority task (Manual) should now be building
2035        assert!(status.building_files.contains(&file_id5));
2036
2037        // Test 6: Complete another task, should schedule SchemaChange (second highest priority)
2038        scheduler.on_task_stopped(region_id, file_id2, &version_control);
2039        let status = scheduler.region_status.get(&region_id).unwrap();
2040        assert_eq!(status.building_files.len(), 2);
2041        assert_eq!(status.pending_tasks.len(), 1); // One less pending
2042        assert!(status.building_files.contains(&file_id4)); // SchemaChange should be building
2043
2044        // Test 7: Complete remaining tasks and cleanup
2045        scheduler.on_task_stopped(region_id, file_id5, &version_control);
2046        scheduler.on_task_stopped(region_id, file_id4, &version_control);
2047
2048        let status = scheduler.region_status.get(&region_id).unwrap();
2049        assert_eq!(status.building_files.len(), 1); // Last task (Compact) should be building
2050        assert_eq!(status.pending_tasks.len(), 0);
2051        assert!(status.building_files.contains(&file_id3));
2052
2053        scheduler.on_task_stopped(region_id, file_id3, &version_control);
2054
2055        // Region should be removed when all tasks complete
2056        assert!(!scheduler.region_status.contains_key(&region_id));
2057
2058        // Test 8: Region dropped with pending tasks
2059        let task6 =
2060            create_mock_task_for_schedule(&env, file_id1, region_id, IndexBuildType::Flush).await;
2061        let task7 =
2062            create_mock_task_for_schedule(&env, file_id2, region_id, IndexBuildType::Flush).await;
2063        let task8 =
2064            create_mock_task_for_schedule(&env, file_id3, region_id, IndexBuildType::Manual).await;
2065
2066        scheduler
2067            .schedule_build(&version_control, task6)
2068            .await
2069            .unwrap();
2070        scheduler
2071            .schedule_build(&version_control, task7)
2072            .await
2073            .unwrap();
2074        scheduler
2075            .schedule_build(&version_control, task8)
2076            .await
2077            .unwrap();
2078
2079        assert!(scheduler.region_status.contains_key(&region_id));
2080        let status = scheduler.region_status.get(&region_id).unwrap();
2081        assert_eq!(status.building_files.len(), 2);
2082        assert_eq!(status.pending_tasks.len(), 1);
2083
2084        scheduler.on_region_dropped(region_id).await;
2085        assert!(!scheduler.region_status.contains_key(&region_id));
2086    }
2087}