mito2/sst/
index.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod bloom_filter;
16pub(crate) mod fulltext_index;
17mod indexer;
18pub mod intermediate;
19pub(crate) mod inverted_index;
20pub mod puffin_manager;
21mod statistics;
22pub(crate) mod store;
23
24use std::cmp::Ordering;
25use std::collections::{BinaryHeap, HashMap, HashSet};
26use std::num::NonZeroUsize;
27use std::sync::Arc;
28
29use bloom_filter::creator::BloomFilterIndexer;
30use common_telemetry::{debug, error, info, warn};
31use datatypes::arrow::array::BinaryArray;
32use datatypes::arrow::record_batch::RecordBatch;
33use mito_codec::index::IndexValuesCodec;
34use mito_codec::row_converter::CompositeValues;
35use puffin_manager::SstPuffinManager;
36use smallvec::{SmallVec, smallvec};
37use snafu::{OptionExt, ResultExt};
38use statistics::{ByteCount, RowCount};
39use store_api::metadata::RegionMetadataRef;
40use store_api::storage::{ColumnId, FileId, RegionId};
41use strum::IntoStaticStr;
42use tokio::sync::mpsc::Sender;
43
44use crate::access_layer::{AccessLayerRef, FilePathProvider, OperationType, RegionFilePathFactory};
45use crate::cache::file_cache::{FileType, IndexKey};
46use crate::cache::write_cache::{UploadTracker, WriteCacheRef};
47use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
48use crate::error::{
49    BuildIndexAsyncSnafu, DecodeSnafu, Error, InvalidRecordBatchSnafu, RegionClosedSnafu,
50    RegionDroppedSnafu, RegionTruncatedSnafu, Result,
51};
52use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
53use crate::metrics::INDEX_CREATE_MEMORY_USAGE;
54use crate::read::{Batch, BatchReader};
55use crate::region::options::IndexOptions;
56use crate::region::version::VersionControlRef;
57use crate::region::{ManifestContextRef, RegionLeaderState};
58use crate::request::{
59    BackgroundNotify, IndexBuildFailed, IndexBuildFinished, IndexBuildStopped, WorkerRequest,
60    WorkerRequestWithTime,
61};
62use crate::schedule::scheduler::{Job, SchedulerRef};
63use crate::sst::file::{
64    ColumnIndexMetadata, FileHandle, FileMeta, IndexType, IndexTypes, RegionFileId, RegionIndexId,
65};
66use crate::sst::file_purger::FilePurgerRef;
67use crate::sst::index::fulltext_index::creator::FulltextIndexer;
68use crate::sst::index::intermediate::IntermediateManager;
69use crate::sst::index::inverted_index::creator::InvertedIndexer;
70use crate::sst::parquet::SstInfo;
71use crate::sst::parquet::flat_format::primary_key_column_index;
72use crate::sst::parquet::format::PrimaryKeyArray;
73use crate::worker::WorkerListener;
74
75pub(crate) const TYPE_INVERTED_INDEX: &str = "inverted_index";
76pub(crate) const TYPE_FULLTEXT_INDEX: &str = "fulltext_index";
77pub(crate) const TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
78
79/// Output of the index creation.
80#[derive(Debug, Clone, Default)]
81pub struct IndexOutput {
82    /// Size of the file.
83    pub file_size: u64,
84    /// Index version.
85    pub version: u64,
86    /// Inverted index output.
87    pub inverted_index: InvertedIndexOutput,
88    /// Fulltext index output.
89    pub fulltext_index: FulltextIndexOutput,
90    /// Bloom filter output.
91    pub bloom_filter: BloomFilterOutput,
92}
93
94impl IndexOutput {
95    pub fn build_available_indexes(&self) -> SmallVec<[IndexType; 4]> {
96        let mut indexes = SmallVec::new();
97        if self.inverted_index.is_available() {
98            indexes.push(IndexType::InvertedIndex);
99        }
100        if self.fulltext_index.is_available() {
101            indexes.push(IndexType::FulltextIndex);
102        }
103        if self.bloom_filter.is_available() {
104            indexes.push(IndexType::BloomFilterIndex);
105        }
106        indexes
107    }
108
109    pub fn build_indexes(&self) -> Vec<ColumnIndexMetadata> {
110        let mut map: HashMap<ColumnId, IndexTypes> = HashMap::new();
111
112        if self.inverted_index.is_available() {
113            for &col in &self.inverted_index.columns {
114                map.entry(col).or_default().push(IndexType::InvertedIndex);
115            }
116        }
117        if self.fulltext_index.is_available() {
118            for &col in &self.fulltext_index.columns {
119                map.entry(col).or_default().push(IndexType::FulltextIndex);
120            }
121        }
122        if self.bloom_filter.is_available() {
123            for &col in &self.bloom_filter.columns {
124                map.entry(col)
125                    .or_default()
126                    .push(IndexType::BloomFilterIndex);
127            }
128        }
129
130        map.into_iter()
131            .map(|(column_id, created_indexes)| ColumnIndexMetadata {
132                column_id,
133                created_indexes,
134            })
135            .collect::<Vec<_>>()
136    }
137}
138
139/// Base output of the index creation.
140#[derive(Debug, Clone, Default)]
141pub struct IndexBaseOutput {
142    /// Size of the index.
143    pub index_size: ByteCount,
144    /// Number of rows in the index.
145    pub row_count: RowCount,
146    /// Available columns in the index.
147    pub columns: Vec<ColumnId>,
148}
149
150impl IndexBaseOutput {
151    pub fn is_available(&self) -> bool {
152        self.index_size > 0
153    }
154}
155
156/// Output of the inverted index creation.
157pub type InvertedIndexOutput = IndexBaseOutput;
158/// Output of the fulltext index creation.
159pub type FulltextIndexOutput = IndexBaseOutput;
160/// Output of the bloom filter creation.
161pub type BloomFilterOutput = IndexBaseOutput;
162
163/// The index creator that hides the error handling details.
164#[derive(Default)]
165pub struct Indexer {
166    file_id: FileId,
167    region_id: RegionId,
168    index_version: u64,
169    puffin_manager: Option<SstPuffinManager>,
170    write_cache_enabled: bool,
171    inverted_indexer: Option<InvertedIndexer>,
172    last_mem_inverted_index: usize,
173    fulltext_indexer: Option<FulltextIndexer>,
174    last_mem_fulltext_index: usize,
175    bloom_filter_indexer: Option<BloomFilterIndexer>,
176    last_mem_bloom_filter: usize,
177    intermediate_manager: Option<IntermediateManager>,
178}
179
180impl Indexer {
181    /// Updates the index with the given batch.
182    pub async fn update(&mut self, batch: &mut Batch) {
183        self.do_update(batch).await;
184
185        self.flush_mem_metrics();
186    }
187
188    /// Updates the index with the given flat format RecordBatch.
189    pub async fn update_flat(&mut self, batch: &RecordBatch) {
190        self.do_update_flat(batch).await;
191
192        self.flush_mem_metrics();
193    }
194
195    /// Finalizes the index creation.
196    pub async fn finish(&mut self) -> IndexOutput {
197        let output = self.do_finish().await;
198
199        self.flush_mem_metrics();
200        output
201    }
202
203    /// Aborts the index creation.
204    pub async fn abort(&mut self) {
205        self.do_abort().await;
206
207        self.flush_mem_metrics();
208    }
209
210    fn flush_mem_metrics(&mut self) {
211        let inverted_mem = self
212            .inverted_indexer
213            .as_ref()
214            .map_or(0, |creator| creator.memory_usage());
215        INDEX_CREATE_MEMORY_USAGE
216            .with_label_values(&[TYPE_INVERTED_INDEX])
217            .add(inverted_mem as i64 - self.last_mem_inverted_index as i64);
218        self.last_mem_inverted_index = inverted_mem;
219
220        let fulltext_mem = self
221            .fulltext_indexer
222            .as_ref()
223            .map_or(0, |creator| creator.memory_usage());
224        INDEX_CREATE_MEMORY_USAGE
225            .with_label_values(&[TYPE_FULLTEXT_INDEX])
226            .add(fulltext_mem as i64 - self.last_mem_fulltext_index as i64);
227        self.last_mem_fulltext_index = fulltext_mem;
228
229        let bloom_filter_mem = self
230            .bloom_filter_indexer
231            .as_ref()
232            .map_or(0, |creator| creator.memory_usage());
233        INDEX_CREATE_MEMORY_USAGE
234            .with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
235            .add(bloom_filter_mem as i64 - self.last_mem_bloom_filter as i64);
236        self.last_mem_bloom_filter = bloom_filter_mem;
237    }
238}
239
240#[async_trait::async_trait]
241pub trait IndexerBuilder {
242    /// Builds indexer of given file id to [index_file_path].
243    async fn build(&self, file_id: FileId, index_version: u64) -> Indexer;
244}
245#[derive(Clone)]
246pub(crate) struct IndexerBuilderImpl {
247    pub(crate) build_type: IndexBuildType,
248    pub(crate) metadata: RegionMetadataRef,
249    pub(crate) row_group_size: usize,
250    pub(crate) puffin_manager: SstPuffinManager,
251    pub(crate) write_cache_enabled: bool,
252    pub(crate) intermediate_manager: IntermediateManager,
253    pub(crate) index_options: IndexOptions,
254    pub(crate) inverted_index_config: InvertedIndexConfig,
255    pub(crate) fulltext_index_config: FulltextIndexConfig,
256    pub(crate) bloom_filter_index_config: BloomFilterConfig,
257}
258
259#[async_trait::async_trait]
260impl IndexerBuilder for IndexerBuilderImpl {
261    /// Sanity check for arguments and create a new [Indexer] if arguments are valid.
262    async fn build(&self, file_id: FileId, index_version: u64) -> Indexer {
263        let mut indexer = Indexer {
264            file_id,
265            region_id: self.metadata.region_id,
266            index_version,
267            write_cache_enabled: self.write_cache_enabled,
268            ..Default::default()
269        };
270
271        indexer.inverted_indexer = self.build_inverted_indexer(file_id);
272        indexer.fulltext_indexer = self.build_fulltext_indexer(file_id).await;
273        indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(file_id);
274        indexer.intermediate_manager = Some(self.intermediate_manager.clone());
275        if indexer.inverted_indexer.is_none()
276            && indexer.fulltext_indexer.is_none()
277            && indexer.bloom_filter_indexer.is_none()
278        {
279            indexer.abort().await;
280            return Indexer::default();
281        }
282
283        indexer.puffin_manager = Some(self.puffin_manager.clone());
284        indexer
285    }
286}
287
288impl IndexerBuilderImpl {
289    fn build_inverted_indexer(&self, file_id: FileId) -> Option<InvertedIndexer> {
290        let create = match self.build_type {
291            IndexBuildType::Flush => self.inverted_index_config.create_on_flush.auto(),
292            IndexBuildType::Compact => self.inverted_index_config.create_on_compaction.auto(),
293            _ => true,
294        };
295
296        if !create {
297            debug!(
298                "Skip creating inverted index due to config, region_id: {}, file_id: {}",
299                self.metadata.region_id, file_id,
300            );
301            return None;
302        }
303
304        let indexed_column_ids = self.metadata.inverted_indexed_column_ids(
305            self.index_options.inverted_index.ignore_column_ids.iter(),
306        );
307        if indexed_column_ids.is_empty() {
308            debug!(
309                "No columns to be indexed, skip creating inverted index, region_id: {}, file_id: {}",
310                self.metadata.region_id, file_id,
311            );
312            return None;
313        }
314
315        let Some(mut segment_row_count) =
316            NonZeroUsize::new(self.index_options.inverted_index.segment_row_count)
317        else {
318            warn!(
319                "Segment row count is 0, skip creating index, region_id: {}, file_id: {}",
320                self.metadata.region_id, file_id,
321            );
322            return None;
323        };
324
325        let Some(row_group_size) = NonZeroUsize::new(self.row_group_size) else {
326            warn!(
327                "Row group size is 0, skip creating index, region_id: {}, file_id: {}",
328                self.metadata.region_id, file_id,
329            );
330            return None;
331        };
332
333        // if segment row count not aligned with row group size, adjust it to be aligned.
334        if row_group_size.get() % segment_row_count.get() != 0 {
335            segment_row_count = row_group_size;
336        }
337
338        let indexer = InvertedIndexer::new(
339            file_id,
340            &self.metadata,
341            self.intermediate_manager.clone(),
342            self.inverted_index_config.mem_threshold_on_create(),
343            segment_row_count,
344            indexed_column_ids,
345        );
346
347        Some(indexer)
348    }
349
350    async fn build_fulltext_indexer(&self, file_id: FileId) -> Option<FulltextIndexer> {
351        let create = match self.build_type {
352            IndexBuildType::Flush => self.fulltext_index_config.create_on_flush.auto(),
353            IndexBuildType::Compact => self.fulltext_index_config.create_on_compaction.auto(),
354            _ => true,
355        };
356
357        if !create {
358            debug!(
359                "Skip creating full-text index due to config, region_id: {}, file_id: {}",
360                self.metadata.region_id, file_id,
361            );
362            return None;
363        }
364
365        let mem_limit = self.fulltext_index_config.mem_threshold_on_create();
366        let creator = FulltextIndexer::new(
367            &self.metadata.region_id,
368            &file_id,
369            &self.intermediate_manager,
370            &self.metadata,
371            self.fulltext_index_config.compress,
372            mem_limit,
373        )
374        .await;
375
376        let err = match creator {
377            Ok(creator) => {
378                if creator.is_none() {
379                    debug!(
380                        "Skip creating full-text index due to no columns require indexing, region_id: {}, file_id: {}",
381                        self.metadata.region_id, file_id,
382                    );
383                }
384                return creator;
385            }
386            Err(err) => err,
387        };
388
389        if cfg!(any(test, feature = "test")) {
390            panic!(
391                "Failed to create full-text indexer, region_id: {}, file_id: {}, err: {:?}",
392                self.metadata.region_id, file_id, err
393            );
394        } else {
395            warn!(
396                err; "Failed to create full-text indexer, region_id: {}, file_id: {}",
397                self.metadata.region_id, file_id,
398            );
399        }
400
401        None
402    }
403
404    fn build_bloom_filter_indexer(&self, file_id: FileId) -> Option<BloomFilterIndexer> {
405        let create = match self.build_type {
406            IndexBuildType::Flush => self.bloom_filter_index_config.create_on_flush.auto(),
407            IndexBuildType::Compact => self.bloom_filter_index_config.create_on_compaction.auto(),
408            _ => true,
409        };
410
411        if !create {
412            debug!(
413                "Skip creating bloom filter due to config, region_id: {}, file_id: {}",
414                self.metadata.region_id, file_id,
415            );
416            return None;
417        }
418
419        let mem_limit = self.bloom_filter_index_config.mem_threshold_on_create();
420        let indexer = BloomFilterIndexer::new(
421            file_id,
422            &self.metadata,
423            self.intermediate_manager.clone(),
424            mem_limit,
425        );
426
427        let err = match indexer {
428            Ok(indexer) => {
429                if indexer.is_none() {
430                    debug!(
431                        "Skip creating bloom filter due to no columns require indexing, region_id: {}, file_id: {}",
432                        self.metadata.region_id, file_id,
433                    );
434                }
435                return indexer;
436            }
437            Err(err) => err,
438        };
439
440        if cfg!(any(test, feature = "test")) {
441            panic!(
442                "Failed to create bloom filter, region_id: {}, file_id: {}, err: {:?}",
443                self.metadata.region_id, file_id, err
444            );
445        } else {
446            warn!(
447                err; "Failed to create bloom filter, region_id: {}, file_id: {}",
448                self.metadata.region_id, file_id,
449            );
450        }
451
452        None
453    }
454}
455
456/// Type of an index build task.
457#[derive(Debug, Clone, IntoStaticStr, PartialEq)]
458pub enum IndexBuildType {
459    /// Build index when schema change.
460    SchemaChange,
461    /// Create or update index after flush.
462    Flush,
463    /// Create or update index after compact.
464    Compact,
465    /// Manually build index.
466    Manual,
467}
468
469impl IndexBuildType {
470    fn as_str(&self) -> &'static str {
471        self.into()
472    }
473
474    // Higher value means higher priority.
475    fn priority(&self) -> u8 {
476        match self {
477            IndexBuildType::Manual => 3,
478            IndexBuildType::SchemaChange => 2,
479            IndexBuildType::Flush => 1,
480            IndexBuildType::Compact => 0,
481        }
482    }
483}
484
485impl From<OperationType> for IndexBuildType {
486    fn from(op_type: OperationType) -> Self {
487        match op_type {
488            OperationType::Flush => IndexBuildType::Flush,
489            OperationType::Compact => IndexBuildType::Compact,
490        }
491    }
492}
493
494/// Outcome of an index build task.
495#[derive(Debug, Clone, PartialEq, Eq, Hash)]
496pub enum IndexBuildOutcome {
497    Finished,
498    Aborted(String),
499}
500
501/// Mpsc output result sender.
502pub type ResultMpscSender = Sender<Result<IndexBuildOutcome>>;
503
504#[derive(Clone)]
505pub struct IndexBuildTask {
506    /// The file meta to build index for.
507    pub file_meta: FileMeta,
508    pub reason: IndexBuildType,
509    pub access_layer: AccessLayerRef,
510    pub(crate) listener: WorkerListener,
511    pub(crate) manifest_ctx: ManifestContextRef,
512    pub write_cache: Option<WriteCacheRef>,
513    pub file_purger: FilePurgerRef,
514    /// When write cache is enabled, the indexer builder should be built from the write cache.
515    /// Otherwise, it should be built from the access layer.
516    pub indexer_builder: Arc<dyn IndexerBuilder + Send + Sync>,
517    /// Request sender to notify the region worker.
518    pub(crate) request_sender: Sender<WorkerRequestWithTime>,
519    /// Index build result sender.
520    pub(crate) result_sender: ResultMpscSender,
521}
522
523impl std::fmt::Debug for IndexBuildTask {
524    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
525        f.debug_struct("IndexBuildTask")
526            .field("region_id", &self.file_meta.region_id)
527            .field("file_id", &self.file_meta.file_id)
528            .field("reason", &self.reason)
529            .finish()
530    }
531}
532
533impl IndexBuildTask {
534    /// Notify the caller the job is success.
535    pub async fn on_success(&self, outcome: IndexBuildOutcome) {
536        let _ = self.result_sender.send(Ok(outcome)).await;
537    }
538
539    /// Send index build error to waiter.
540    pub async fn on_failure(&self, err: Arc<Error>) {
541        let _ = self
542            .result_sender
543            .send(Err(err.clone()).context(BuildIndexAsyncSnafu {
544                region_id: self.file_meta.region_id,
545            }))
546            .await;
547    }
548
549    fn into_index_build_job(mut self, version_control: VersionControlRef) -> Job {
550        Box::pin(async move {
551            self.do_index_build(version_control).await;
552        })
553    }
554
555    async fn do_index_build(&mut self, version_control: VersionControlRef) {
556        self.listener
557            .on_index_build_begin(RegionFileId::new(
558                self.file_meta.region_id,
559                self.file_meta.file_id,
560            ))
561            .await;
562        match self.index_build(version_control).await {
563            Ok(outcome) => self.on_success(outcome).await,
564            Err(e) => {
565                warn!(
566                    e; "Index build task failed, region: {}, file_id: {}",
567                    self.file_meta.region_id, self.file_meta.file_id,
568                );
569                self.on_failure(e.into()).await
570            }
571        }
572        let worker_request = WorkerRequest::Background {
573            region_id: self.file_meta.region_id,
574            notify: BackgroundNotify::IndexBuildStopped(IndexBuildStopped {
575                region_id: self.file_meta.region_id,
576                file_id: self.file_meta.file_id,
577            }),
578        };
579        let _ = self
580            .request_sender
581            .send(WorkerRequestWithTime::new(worker_request))
582            .await;
583    }
584
585    // Checks if the SST file still exists in object store and version to avoid conflict with compaction.
586    async fn check_sst_file_exists(&self, version_control: &VersionControlRef) -> bool {
587        let file_id = self.file_meta.file_id;
588        let level = self.file_meta.level;
589        // We should check current version instead of the version when the job is created.
590        let version = version_control.current().version;
591
592        let Some(level_files) = version.ssts.levels().get(level as usize) else {
593            warn!(
594                "File id {} not found in level {} for index build, region: {}",
595                file_id, level, self.file_meta.region_id
596            );
597            return false;
598        };
599
600        match level_files.files.get(&file_id) {
601            Some(handle) if !handle.is_deleted() && !handle.compacting() => {
602                // If the file's metadata is present in the current version, the physical SST file
603                // is guaranteed to exist on object store. The file purger removes the physical
604                // file only after its metadata is removed from the version.
605                true
606            }
607            _ => {
608                warn!(
609                    "File id {} not found in region version for index build, region: {}",
610                    file_id, self.file_meta.region_id
611                );
612                false
613            }
614        }
615    }
616
617    async fn index_build(
618        &mut self,
619        version_control: VersionControlRef,
620    ) -> Result<IndexBuildOutcome> {
621        // Determine the new index version
622        let new_index_version = if self.file_meta.index_file_size > 0 {
623            // Increment version if index file exists to avoid overwrite.
624            self.file_meta.index_version + 1
625        } else {
626            0 // Default version for new index files
627        };
628
629        // Use the same file_id but with new version for index file
630        let index_file_id = self.file_meta.file_id;
631        let mut indexer = self
632            .indexer_builder
633            .build(index_file_id, new_index_version)
634            .await;
635
636        // Check SST file existence before building index to avoid failure of parquet reader.
637        if !self.check_sst_file_exists(&version_control).await {
638            // Calls abort to clean up index files.
639            indexer.abort().await;
640            self.listener
641                .on_index_build_abort(RegionFileId::new(
642                    self.file_meta.region_id,
643                    self.file_meta.file_id,
644                ))
645                .await;
646            return Ok(IndexBuildOutcome::Aborted(format!(
647                "SST file not found during index build, region: {}, file_id: {}",
648                self.file_meta.region_id, self.file_meta.file_id
649            )));
650        }
651
652        let mut parquet_reader = self
653            .access_layer
654            .read_sst(FileHandle::new(
655                self.file_meta.clone(),
656                self.file_purger.clone(),
657            ))
658            .build()
659            .await?;
660
661        // TODO(SNC123): optimize index batch
662        loop {
663            match parquet_reader.next_batch().await {
664                Ok(Some(batch)) => {
665                    indexer.update(&mut batch.clone()).await;
666                }
667                Ok(None) => break,
668                Err(e) => {
669                    indexer.abort().await;
670                    return Err(e);
671                }
672            }
673        }
674        let index_output = indexer.finish().await;
675
676        if index_output.file_size > 0 {
677            // Check SST file existence again after building index.
678            if !self.check_sst_file_exists(&version_control).await {
679                // Calls abort to clean up index files.
680                indexer.abort().await;
681                self.listener
682                    .on_index_build_abort(RegionFileId::new(
683                        self.file_meta.region_id,
684                        self.file_meta.file_id,
685                    ))
686                    .await;
687                return Ok(IndexBuildOutcome::Aborted(format!(
688                    "SST file not found during index build, region: {}, file_id: {}",
689                    self.file_meta.region_id, self.file_meta.file_id
690                )));
691            }
692
693            // Upload index file if write cache is enabled.
694            self.maybe_upload_index_file(index_output.clone(), index_file_id, new_index_version)
695                .await?;
696
697            let worker_request = match self.update_manifest(index_output, new_index_version).await {
698                Ok(edit) => {
699                    let index_build_finished = IndexBuildFinished {
700                        region_id: self.file_meta.region_id,
701                        edit,
702                    };
703                    WorkerRequest::Background {
704                        region_id: self.file_meta.region_id,
705                        notify: BackgroundNotify::IndexBuildFinished(index_build_finished),
706                    }
707                }
708                Err(e) => {
709                    let err = Arc::new(e);
710                    WorkerRequest::Background {
711                        region_id: self.file_meta.region_id,
712                        notify: BackgroundNotify::IndexBuildFailed(IndexBuildFailed { err }),
713                    }
714                }
715            };
716
717            let _ = self
718                .request_sender
719                .send(WorkerRequestWithTime::new(worker_request))
720                .await;
721        }
722        Ok(IndexBuildOutcome::Finished)
723    }
724
725    async fn maybe_upload_index_file(
726        &self,
727        output: IndexOutput,
728        index_file_id: FileId,
729        index_version: u64,
730    ) -> Result<()> {
731        if let Some(write_cache) = &self.write_cache {
732            let file_id = self.file_meta.file_id;
733            let region_id = self.file_meta.region_id;
734            let remote_store = self.access_layer.object_store();
735            let mut upload_tracker = UploadTracker::new(region_id);
736            let mut err = None;
737            let puffin_key =
738                IndexKey::new(region_id, index_file_id, FileType::Puffin(output.version));
739            let index_id = RegionIndexId::new(RegionFileId::new(region_id, file_id), index_version);
740            let puffin_path = RegionFilePathFactory::new(
741                self.access_layer.table_dir().to_string(),
742                self.access_layer.path_type(),
743            )
744            .build_index_file_path_with_version(index_id);
745            if let Err(e) = write_cache
746                .upload(puffin_key, &puffin_path, remote_store)
747                .await
748            {
749                err = Some(e);
750            }
751            upload_tracker.push_uploaded_file(puffin_path);
752            if let Some(err) = err {
753                // Cleans index files on failure.
754                upload_tracker
755                    .clean(
756                        &smallvec![SstInfo {
757                            file_id,
758                            index_metadata: output,
759                            ..Default::default()
760                        }],
761                        &write_cache.file_cache(),
762                        remote_store,
763                    )
764                    .await;
765                return Err(err);
766            }
767        } else {
768            debug!("write cache is not available, skip uploading index file");
769        }
770        Ok(())
771    }
772
773    async fn update_manifest(
774        &mut self,
775        output: IndexOutput,
776        new_index_version: u64,
777    ) -> Result<RegionEdit> {
778        self.file_meta.available_indexes = output.build_available_indexes();
779        self.file_meta.indexes = output.build_indexes();
780        self.file_meta.index_file_size = output.file_size;
781        self.file_meta.index_version = new_index_version;
782        let edit = RegionEdit {
783            files_to_add: vec![self.file_meta.clone()],
784            files_to_remove: vec![],
785            timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
786            flushed_sequence: None,
787            flushed_entry_id: None,
788            committed_sequence: None,
789            compaction_time_window: None,
790        };
791        let version = self
792            .manifest_ctx
793            .update_manifest(
794                RegionLeaderState::Writable,
795                RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())),
796                false,
797            )
798            .await?;
799        info!(
800            "Successfully update manifest version to {version}, region: {}, reason: {}",
801            self.file_meta.region_id,
802            self.reason.as_str()
803        );
804        Ok(edit)
805    }
806}
807
808impl PartialEq for IndexBuildTask {
809    fn eq(&self, other: &Self) -> bool {
810        self.reason.priority() == other.reason.priority()
811    }
812}
813
814impl Eq for IndexBuildTask {}
815
816impl PartialOrd for IndexBuildTask {
817    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
818        Some(self.cmp(other))
819    }
820}
821
822impl Ord for IndexBuildTask {
823    fn cmp(&self, other: &Self) -> Ordering {
824        self.reason.priority().cmp(&other.reason.priority())
825    }
826}
827
828/// Tracks the index build status of a region scheduled by the [IndexBuildScheduler].
829pub struct IndexBuildStatus {
830    pub region_id: RegionId,
831    pub building_files: HashSet<FileId>,
832    pub pending_tasks: BinaryHeap<IndexBuildTask>,
833}
834
835impl IndexBuildStatus {
836    pub fn new(region_id: RegionId) -> Self {
837        IndexBuildStatus {
838            region_id,
839            building_files: HashSet::new(),
840            pending_tasks: BinaryHeap::new(),
841        }
842    }
843
844    async fn on_failure(self, err: Arc<Error>) {
845        for task in self.pending_tasks {
846            task.on_failure(err.clone()).await;
847        }
848    }
849}
850
851pub struct IndexBuildScheduler {
852    /// Background job scheduler.
853    scheduler: SchedulerRef,
854    /// Tracks regions need to build index.
855    region_status: HashMap<RegionId, IndexBuildStatus>,
856    /// Limit of files allowed to build index concurrently for a region.
857    files_limit: usize,
858}
859
860/// Manager background index build tasks of a worker.
861impl IndexBuildScheduler {
862    pub fn new(scheduler: SchedulerRef, files_limit: usize) -> Self {
863        IndexBuildScheduler {
864            scheduler,
865            region_status: HashMap::new(),
866            files_limit,
867        }
868    }
869
870    pub(crate) async fn schedule_build(
871        &mut self,
872        version_control: &VersionControlRef,
873        task: IndexBuildTask,
874    ) -> Result<()> {
875        let status = self
876            .region_status
877            .entry(task.file_meta.region_id)
878            .or_insert_with(|| IndexBuildStatus::new(task.file_meta.region_id));
879
880        if status.building_files.contains(&task.file_meta.file_id) {
881            let region_file_id =
882                RegionFileId::new(task.file_meta.region_id, task.file_meta.file_id);
883            debug!(
884                "Aborting index build task since index is already being built for region file {:?}",
885                region_file_id
886            );
887            task.on_success(IndexBuildOutcome::Aborted(format!(
888                "Index is already being built for region file {:?}",
889                region_file_id
890            )))
891            .await;
892            task.listener.on_index_build_abort(region_file_id).await;
893            return Ok(());
894        }
895
896        status.pending_tasks.push(task);
897
898        self.schedule_next_build_batch(version_control);
899        Ok(())
900    }
901
902    /// Schedule tasks until reaching the files limit or no more tasks.
903    fn schedule_next_build_batch(&mut self, version_control: &VersionControlRef) {
904        let mut building_count = 0;
905        for status in self.region_status.values() {
906            building_count += status.building_files.len();
907        }
908
909        while building_count < self.files_limit {
910            if let Some(task) = self.find_next_task() {
911                let region_id = task.file_meta.region_id;
912                let file_id = task.file_meta.file_id;
913                let job = task.into_index_build_job(version_control.clone());
914                if self.scheduler.schedule(job).is_ok() {
915                    if let Some(status) = self.region_status.get_mut(&region_id) {
916                        status.building_files.insert(file_id);
917                        building_count += 1;
918                        status
919                            .pending_tasks
920                            .retain(|t| t.file_meta.file_id != file_id);
921                    } else {
922                        error!(
923                            "Region status not found when scheduling index build task, region: {}",
924                            region_id
925                        );
926                    }
927                } else {
928                    error!(
929                        "Failed to schedule index build job, region: {}, file_id: {}",
930                        region_id, file_id
931                    );
932                }
933            } else {
934                // No more tasks to schedule.
935                break;
936            }
937        }
938    }
939
940    /// Find the next task which has the highest priority to run.
941    fn find_next_task(&self) -> Option<IndexBuildTask> {
942        self.region_status
943            .iter()
944            .filter_map(|(_, status)| status.pending_tasks.peek())
945            .max()
946            .cloned()
947    }
948
949    pub(crate) fn on_task_stopped(
950        &mut self,
951        region_id: RegionId,
952        file_id: FileId,
953        version_control: &VersionControlRef,
954    ) {
955        if let Some(status) = self.region_status.get_mut(&region_id) {
956            status.building_files.remove(&file_id);
957            if status.building_files.is_empty() && status.pending_tasks.is_empty() {
958                // No more tasks for this region, remove it.
959                self.region_status.remove(&region_id);
960            }
961        }
962
963        self.schedule_next_build_batch(version_control);
964    }
965
966    pub(crate) async fn on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
967        error!(
968            err; "Index build scheduler encountered failure for region {}, removing all pending tasks.",
969            region_id
970        );
971        let Some(status) = self.region_status.remove(&region_id) else {
972            return;
973        };
974        status.on_failure(err).await;
975    }
976
977    /// Notifies the scheduler that the region is dropped.
978    pub(crate) async fn on_region_dropped(&mut self, region_id: RegionId) {
979        self.remove_region_on_failure(
980            region_id,
981            Arc::new(RegionDroppedSnafu { region_id }.build()),
982        )
983        .await;
984    }
985
986    /// Notifies the scheduler that the region is closed.
987    pub(crate) async fn on_region_closed(&mut self, region_id: RegionId) {
988        self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()))
989            .await;
990    }
991
992    /// Notifies the scheduler that the region is truncated.
993    pub(crate) async fn on_region_truncated(&mut self, region_id: RegionId) {
994        self.remove_region_on_failure(
995            region_id,
996            Arc::new(RegionTruncatedSnafu { region_id }.build()),
997        )
998        .await;
999    }
1000
1001    async fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
1002        let Some(status) = self.region_status.remove(&region_id) else {
1003            return;
1004        };
1005        status.on_failure(err).await;
1006    }
1007}
1008
1009/// Decodes primary keys from a flat format RecordBatch.
1010/// Returns a list of (decoded_pk_value, count) tuples where count is the number of occurrences.
1011pub(crate) fn decode_primary_keys_with_counts(
1012    batch: &RecordBatch,
1013    codec: &IndexValuesCodec,
1014) -> Result<Vec<(CompositeValues, usize)>> {
1015    let primary_key_index = primary_key_column_index(batch.num_columns());
1016    let pk_dict_array = batch
1017        .column(primary_key_index)
1018        .as_any()
1019        .downcast_ref::<PrimaryKeyArray>()
1020        .context(InvalidRecordBatchSnafu {
1021            reason: "Primary key column is not a dictionary array",
1022        })?;
1023    let pk_values_array = pk_dict_array
1024        .values()
1025        .as_any()
1026        .downcast_ref::<BinaryArray>()
1027        .context(InvalidRecordBatchSnafu {
1028            reason: "Primary key values are not binary array",
1029        })?;
1030    let keys = pk_dict_array.keys();
1031
1032    // Decodes primary keys and count consecutive occurrences
1033    let mut result: Vec<(CompositeValues, usize)> = Vec::new();
1034    let mut prev_key: Option<u32> = None;
1035
1036    for i in 0..keys.len() {
1037        let current_key = keys.value(i);
1038
1039        // Checks if current key is the same as previous key
1040        if let Some(prev) = prev_key
1041            && prev == current_key
1042        {
1043            // Safety: We already have a key in the result vector.
1044            result.last_mut().unwrap().1 += 1;
1045            continue;
1046        }
1047
1048        // New key, decodes it.
1049        let pk_bytes = pk_values_array.value(current_key as usize);
1050        let decoded_value = codec.decoder().decode(pk_bytes).context(DecodeSnafu)?;
1051
1052        result.push((decoded_value, 1));
1053        prev_key = Some(current_key);
1054    }
1055
1056    Ok(result)
1057}
1058
1059#[cfg(test)]
1060mod tests {
1061    use std::sync::Arc;
1062
1063    use api::v1::SemanticType;
1064    use common_base::readable_size::ReadableSize;
1065    use datafusion_common::HashMap;
1066    use datatypes::data_type::ConcreteDataType;
1067    use datatypes::schema::{
1068        ColumnSchema, FulltextOptions, SkippingIndexOptions, SkippingIndexType,
1069    };
1070    use object_store::ObjectStore;
1071    use object_store::services::Memory;
1072    use puffin_manager::PuffinManagerFactory;
1073    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1074    use tokio::sync::mpsc;
1075
1076    use super::*;
1077    use crate::access_layer::{FilePathProvider, Metrics, SstWriteRequest, WriteType};
1078    use crate::cache::write_cache::WriteCache;
1079    use crate::config::{FulltextIndexConfig, IndexBuildMode, MitoConfig, Mode};
1080    use crate::memtable::time_partition::TimePartitions;
1081    use crate::region::version::{VersionBuilder, VersionControl};
1082    use crate::sst::file::RegionFileId;
1083    use crate::sst::file_purger::NoopFilePurger;
1084    use crate::sst::location;
1085    use crate::sst::parquet::WriteOptions;
1086    use crate::test_util::memtable_util::EmptyMemtableBuilder;
1087    use crate::test_util::scheduler_util::SchedulerEnv;
1088    use crate::test_util::sst_util::{new_batch_by_range, new_source, sst_region_metadata};
1089
1090    struct MetaConfig {
1091        with_inverted: bool,
1092        with_fulltext: bool,
1093        with_skipping_bloom: bool,
1094    }
1095
1096    fn mock_region_metadata(
1097        MetaConfig {
1098            with_inverted,
1099            with_fulltext,
1100            with_skipping_bloom,
1101        }: MetaConfig,
1102    ) -> RegionMetadataRef {
1103        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
1104        let mut column_schema = ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false);
1105        if with_inverted {
1106            column_schema = column_schema.with_inverted_index(true);
1107        }
1108        builder
1109            .push_column_metadata(ColumnMetadata {
1110                column_schema,
1111                semantic_type: SemanticType::Field,
1112                column_id: 1,
1113            })
1114            .push_column_metadata(ColumnMetadata {
1115                column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
1116                semantic_type: SemanticType::Field,
1117                column_id: 2,
1118            })
1119            .push_column_metadata(ColumnMetadata {
1120                column_schema: ColumnSchema::new(
1121                    "c",
1122                    ConcreteDataType::timestamp_millisecond_datatype(),
1123                    false,
1124                ),
1125                semantic_type: SemanticType::Timestamp,
1126                column_id: 3,
1127            });
1128
1129        if with_fulltext {
1130            let column_schema =
1131                ColumnSchema::new("text", ConcreteDataType::string_datatype(), true)
1132                    .with_fulltext_options(FulltextOptions {
1133                        enable: true,
1134                        ..Default::default()
1135                    })
1136                    .unwrap();
1137
1138            let column = ColumnMetadata {
1139                column_schema,
1140                semantic_type: SemanticType::Field,
1141                column_id: 4,
1142            };
1143
1144            builder.push_column_metadata(column);
1145        }
1146
1147        if with_skipping_bloom {
1148            let column_schema =
1149                ColumnSchema::new("bloom", ConcreteDataType::string_datatype(), false)
1150                    .with_skipping_options(SkippingIndexOptions::new_unchecked(
1151                        42,
1152                        0.01,
1153                        SkippingIndexType::BloomFilter,
1154                    ))
1155                    .unwrap();
1156
1157            let column = ColumnMetadata {
1158                column_schema,
1159                semantic_type: SemanticType::Field,
1160                column_id: 5,
1161            };
1162
1163            builder.push_column_metadata(column);
1164        }
1165
1166        Arc::new(builder.build().unwrap())
1167    }
1168
1169    fn mock_object_store() -> ObjectStore {
1170        ObjectStore::new(Memory::default()).unwrap().finish()
1171    }
1172
1173    async fn mock_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
1174        IntermediateManager::init_fs(path).await.unwrap()
1175    }
1176    struct NoopPathProvider;
1177
1178    impl FilePathProvider for NoopPathProvider {
1179        fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
1180            unreachable!()
1181        }
1182
1183        fn build_index_file_path_with_version(&self, _index_id: RegionIndexId) -> String {
1184            unreachable!()
1185        }
1186
1187        fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
1188            unreachable!()
1189        }
1190    }
1191
1192    async fn mock_sst_file(
1193        metadata: RegionMetadataRef,
1194        env: &SchedulerEnv,
1195        build_mode: IndexBuildMode,
1196    ) -> SstInfo {
1197        let source = new_source(&[
1198            new_batch_by_range(&["a", "d"], 0, 60),
1199            new_batch_by_range(&["b", "f"], 0, 40),
1200            new_batch_by_range(&["b", "h"], 100, 200),
1201        ]);
1202        let mut index_config = MitoConfig::default().index;
1203        index_config.build_mode = build_mode;
1204        let write_request = SstWriteRequest {
1205            op_type: OperationType::Flush,
1206            metadata: metadata.clone(),
1207            source: either::Left(source),
1208            storage: None,
1209            max_sequence: None,
1210            cache_manager: Default::default(),
1211            index_options: IndexOptions::default(),
1212            index_config,
1213            inverted_index_config: Default::default(),
1214            fulltext_index_config: Default::default(),
1215            bloom_filter_index_config: Default::default(),
1216        };
1217        let mut metrics = Metrics::new(WriteType::Flush);
1218        env.access_layer
1219            .write_sst(write_request, &WriteOptions::default(), &mut metrics)
1220            .await
1221            .unwrap()
1222            .remove(0)
1223    }
1224
1225    async fn mock_version_control(
1226        metadata: RegionMetadataRef,
1227        file_purger: FilePurgerRef,
1228        files: HashMap<FileId, FileMeta>,
1229    ) -> VersionControlRef {
1230        let mutable = Arc::new(TimePartitions::new(
1231            metadata.clone(),
1232            Arc::new(EmptyMemtableBuilder::default()),
1233            0,
1234            None,
1235        ));
1236        let version_builder = VersionBuilder::new(metadata, mutable)
1237            .add_files(file_purger, files.values().cloned())
1238            .build();
1239        Arc::new(VersionControl::new(version_builder))
1240    }
1241
1242    async fn mock_indexer_builder(
1243        metadata: RegionMetadataRef,
1244        env: &SchedulerEnv,
1245    ) -> Arc<dyn IndexerBuilder + Send + Sync> {
1246        let (dir, factory) = PuffinManagerFactory::new_for_test_async("mock_indexer_builder").await;
1247        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1248        let puffin_manager = factory.build(
1249            env.access_layer.object_store().clone(),
1250            RegionFilePathFactory::new(
1251                env.access_layer.table_dir().to_string(),
1252                env.access_layer.path_type(),
1253            ),
1254        );
1255        Arc::new(IndexerBuilderImpl {
1256            build_type: IndexBuildType::Flush,
1257            metadata,
1258            row_group_size: 1024,
1259            puffin_manager,
1260            write_cache_enabled: false,
1261            intermediate_manager: intm_manager,
1262            index_options: IndexOptions::default(),
1263            inverted_index_config: InvertedIndexConfig::default(),
1264            fulltext_index_config: FulltextIndexConfig::default(),
1265            bloom_filter_index_config: BloomFilterConfig::default(),
1266        })
1267    }
1268
1269    #[tokio::test]
1270    async fn test_build_indexer_basic() {
1271        let (dir, factory) =
1272            PuffinManagerFactory::new_for_test_async("test_build_indexer_basic_").await;
1273        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1274
1275        let metadata = mock_region_metadata(MetaConfig {
1276            with_inverted: true,
1277            with_fulltext: true,
1278            with_skipping_bloom: true,
1279        });
1280        let indexer = IndexerBuilderImpl {
1281            build_type: IndexBuildType::Flush,
1282            metadata,
1283            row_group_size: 1024,
1284            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1285            write_cache_enabled: false,
1286            intermediate_manager: intm_manager,
1287            index_options: IndexOptions::default(),
1288            inverted_index_config: InvertedIndexConfig::default(),
1289            fulltext_index_config: FulltextIndexConfig::default(),
1290            bloom_filter_index_config: BloomFilterConfig::default(),
1291        }
1292        .build(FileId::random(), 0)
1293        .await;
1294
1295        assert!(indexer.inverted_indexer.is_some());
1296        assert!(indexer.fulltext_indexer.is_some());
1297        assert!(indexer.bloom_filter_indexer.is_some());
1298    }
1299
1300    #[tokio::test]
1301    async fn test_build_indexer_disable_create() {
1302        let (dir, factory) =
1303            PuffinManagerFactory::new_for_test_async("test_build_indexer_disable_create_").await;
1304        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1305
1306        let metadata = mock_region_metadata(MetaConfig {
1307            with_inverted: true,
1308            with_fulltext: true,
1309            with_skipping_bloom: true,
1310        });
1311        let indexer = IndexerBuilderImpl {
1312            build_type: IndexBuildType::Flush,
1313            metadata: metadata.clone(),
1314            row_group_size: 1024,
1315            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1316            write_cache_enabled: false,
1317            intermediate_manager: intm_manager.clone(),
1318            index_options: IndexOptions::default(),
1319            inverted_index_config: InvertedIndexConfig {
1320                create_on_flush: Mode::Disable,
1321                ..Default::default()
1322            },
1323            fulltext_index_config: FulltextIndexConfig::default(),
1324            bloom_filter_index_config: BloomFilterConfig::default(),
1325        }
1326        .build(FileId::random(), 0)
1327        .await;
1328
1329        assert!(indexer.inverted_indexer.is_none());
1330        assert!(indexer.fulltext_indexer.is_some());
1331        assert!(indexer.bloom_filter_indexer.is_some());
1332
1333        let indexer = IndexerBuilderImpl {
1334            build_type: IndexBuildType::Compact,
1335            metadata: metadata.clone(),
1336            row_group_size: 1024,
1337            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1338            write_cache_enabled: false,
1339            intermediate_manager: intm_manager.clone(),
1340            index_options: IndexOptions::default(),
1341            inverted_index_config: InvertedIndexConfig::default(),
1342            fulltext_index_config: FulltextIndexConfig {
1343                create_on_compaction: Mode::Disable,
1344                ..Default::default()
1345            },
1346            bloom_filter_index_config: BloomFilterConfig::default(),
1347        }
1348        .build(FileId::random(), 0)
1349        .await;
1350
1351        assert!(indexer.inverted_indexer.is_some());
1352        assert!(indexer.fulltext_indexer.is_none());
1353        assert!(indexer.bloom_filter_indexer.is_some());
1354
1355        let indexer = IndexerBuilderImpl {
1356            build_type: IndexBuildType::Compact,
1357            metadata,
1358            row_group_size: 1024,
1359            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1360            write_cache_enabled: false,
1361            intermediate_manager: intm_manager,
1362            index_options: IndexOptions::default(),
1363            inverted_index_config: InvertedIndexConfig::default(),
1364            fulltext_index_config: FulltextIndexConfig::default(),
1365            bloom_filter_index_config: BloomFilterConfig {
1366                create_on_compaction: Mode::Disable,
1367                ..Default::default()
1368            },
1369        }
1370        .build(FileId::random(), 0)
1371        .await;
1372
1373        assert!(indexer.inverted_indexer.is_some());
1374        assert!(indexer.fulltext_indexer.is_some());
1375        assert!(indexer.bloom_filter_indexer.is_none());
1376    }
1377
1378    #[tokio::test]
1379    async fn test_build_indexer_no_required() {
1380        let (dir, factory) =
1381            PuffinManagerFactory::new_for_test_async("test_build_indexer_no_required_").await;
1382        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1383
1384        let metadata = mock_region_metadata(MetaConfig {
1385            with_inverted: false,
1386            with_fulltext: true,
1387            with_skipping_bloom: true,
1388        });
1389        let indexer = IndexerBuilderImpl {
1390            build_type: IndexBuildType::Flush,
1391            metadata: metadata.clone(),
1392            row_group_size: 1024,
1393            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1394            write_cache_enabled: false,
1395            intermediate_manager: intm_manager.clone(),
1396            index_options: IndexOptions::default(),
1397            inverted_index_config: InvertedIndexConfig::default(),
1398            fulltext_index_config: FulltextIndexConfig::default(),
1399            bloom_filter_index_config: BloomFilterConfig::default(),
1400        }
1401        .build(FileId::random(), 0)
1402        .await;
1403
1404        assert!(indexer.inverted_indexer.is_none());
1405        assert!(indexer.fulltext_indexer.is_some());
1406        assert!(indexer.bloom_filter_indexer.is_some());
1407
1408        let metadata = mock_region_metadata(MetaConfig {
1409            with_inverted: true,
1410            with_fulltext: false,
1411            with_skipping_bloom: true,
1412        });
1413        let indexer = IndexerBuilderImpl {
1414            build_type: IndexBuildType::Flush,
1415            metadata: metadata.clone(),
1416            row_group_size: 1024,
1417            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1418            write_cache_enabled: false,
1419            intermediate_manager: intm_manager.clone(),
1420            index_options: IndexOptions::default(),
1421            inverted_index_config: InvertedIndexConfig::default(),
1422            fulltext_index_config: FulltextIndexConfig::default(),
1423            bloom_filter_index_config: BloomFilterConfig::default(),
1424        }
1425        .build(FileId::random(), 0)
1426        .await;
1427
1428        assert!(indexer.inverted_indexer.is_some());
1429        assert!(indexer.fulltext_indexer.is_none());
1430        assert!(indexer.bloom_filter_indexer.is_some());
1431
1432        let metadata = mock_region_metadata(MetaConfig {
1433            with_inverted: true,
1434            with_fulltext: true,
1435            with_skipping_bloom: false,
1436        });
1437        let indexer = IndexerBuilderImpl {
1438            build_type: IndexBuildType::Flush,
1439            metadata: metadata.clone(),
1440            row_group_size: 1024,
1441            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1442            write_cache_enabled: false,
1443            intermediate_manager: intm_manager,
1444            index_options: IndexOptions::default(),
1445            inverted_index_config: InvertedIndexConfig::default(),
1446            fulltext_index_config: FulltextIndexConfig::default(),
1447            bloom_filter_index_config: BloomFilterConfig::default(),
1448        }
1449        .build(FileId::random(), 0)
1450        .await;
1451
1452        assert!(indexer.inverted_indexer.is_some());
1453        assert!(indexer.fulltext_indexer.is_some());
1454        assert!(indexer.bloom_filter_indexer.is_none());
1455    }
1456
1457    #[tokio::test]
1458    async fn test_build_indexer_zero_row_group() {
1459        let (dir, factory) =
1460            PuffinManagerFactory::new_for_test_async("test_build_indexer_zero_row_group_").await;
1461        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1462
1463        let metadata = mock_region_metadata(MetaConfig {
1464            with_inverted: true,
1465            with_fulltext: true,
1466            with_skipping_bloom: true,
1467        });
1468        let indexer = IndexerBuilderImpl {
1469            build_type: IndexBuildType::Flush,
1470            metadata,
1471            row_group_size: 0,
1472            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1473            write_cache_enabled: false,
1474            intermediate_manager: intm_manager,
1475            index_options: IndexOptions::default(),
1476            inverted_index_config: InvertedIndexConfig::default(),
1477            fulltext_index_config: FulltextIndexConfig::default(),
1478            bloom_filter_index_config: BloomFilterConfig::default(),
1479        }
1480        .build(FileId::random(), 0)
1481        .await;
1482
1483        assert!(indexer.inverted_indexer.is_none());
1484    }
1485
1486    #[tokio::test]
1487    async fn test_index_build_task_sst_not_exist() {
1488        let env = SchedulerEnv::new().await;
1489        let (tx, _rx) = mpsc::channel(4);
1490        let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1491        let mut scheduler = env.mock_index_build_scheduler(4);
1492        let metadata = Arc::new(sst_region_metadata());
1493        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1494        let file_purger = Arc::new(NoopFilePurger {});
1495        let files = HashMap::new();
1496        let version_control =
1497            mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1498        let region_id = metadata.region_id;
1499        let indexer_builder = mock_indexer_builder(metadata, &env).await;
1500
1501        // Create mock task.
1502        let task = IndexBuildTask {
1503            file_meta: FileMeta {
1504                region_id,
1505                file_id: FileId::random(),
1506                file_size: 100,
1507                ..Default::default()
1508            },
1509            reason: IndexBuildType::Flush,
1510            access_layer: env.access_layer.clone(),
1511            listener: WorkerListener::default(),
1512            manifest_ctx,
1513            write_cache: None,
1514            file_purger,
1515            indexer_builder,
1516            request_sender: tx,
1517            result_sender: result_tx,
1518        };
1519
1520        // Schedule the build task and check result.
1521        scheduler
1522            .schedule_build(&version_control, task)
1523            .await
1524            .unwrap();
1525        match result_rx.recv().await.unwrap() {
1526            Ok(outcome) => {
1527                if outcome == IndexBuildOutcome::Finished {
1528                    panic!("Expect aborted result due to missing SST file")
1529                }
1530            }
1531            _ => panic!("Expect aborted result due to missing SST file"),
1532        }
1533    }
1534
1535    #[tokio::test]
1536    async fn test_index_build_task_sst_exist() {
1537        let env = SchedulerEnv::new().await;
1538        let mut scheduler = env.mock_index_build_scheduler(4);
1539        let metadata = Arc::new(sst_region_metadata());
1540        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1541        let region_id = metadata.region_id;
1542        let file_purger = Arc::new(NoopFilePurger {});
1543        let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
1544        let file_meta = FileMeta {
1545            region_id,
1546            file_id: sst_info.file_id,
1547            file_size: sst_info.file_size,
1548            index_file_size: sst_info.index_metadata.file_size,
1549            num_rows: sst_info.num_rows as u64,
1550            num_row_groups: sst_info.num_row_groups,
1551            ..Default::default()
1552        };
1553        let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1554        let version_control =
1555            mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1556        let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1557
1558        // Create mock task.
1559        let (tx, mut rx) = mpsc::channel(4);
1560        let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1561        let task = IndexBuildTask {
1562            file_meta: file_meta.clone(),
1563            reason: IndexBuildType::Flush,
1564            access_layer: env.access_layer.clone(),
1565            listener: WorkerListener::default(),
1566            manifest_ctx,
1567            write_cache: None,
1568            file_purger,
1569            indexer_builder,
1570            request_sender: tx,
1571            result_sender: result_tx,
1572        };
1573
1574        scheduler
1575            .schedule_build(&version_control, task)
1576            .await
1577            .unwrap();
1578
1579        // The task should finish successfully.
1580        match result_rx.recv().await.unwrap() {
1581            Ok(outcome) => {
1582                assert_eq!(outcome, IndexBuildOutcome::Finished);
1583            }
1584            _ => panic!("Expect finished result"),
1585        }
1586
1587        // A notification should be sent to the worker to update the manifest.
1588        let worker_req = rx.recv().await.unwrap().request;
1589        match worker_req {
1590            WorkerRequest::Background {
1591                region_id: req_region_id,
1592                notify: BackgroundNotify::IndexBuildFinished(finished),
1593            } => {
1594                assert_eq!(req_region_id, region_id);
1595                assert_eq!(finished.edit.files_to_add.len(), 1);
1596                let updated_meta = &finished.edit.files_to_add[0];
1597
1598                // The mock indexer builder creates all index types.
1599                assert!(!updated_meta.available_indexes.is_empty());
1600                assert!(updated_meta.index_file_size > 0);
1601                assert_eq!(updated_meta.file_id, file_meta.file_id);
1602            }
1603            _ => panic!("Unexpected worker request: {:?}", worker_req),
1604        }
1605    }
1606
1607    async fn schedule_index_build_task_with_mode(build_mode: IndexBuildMode) {
1608        let env = SchedulerEnv::new().await;
1609        let mut scheduler = env.mock_index_build_scheduler(4);
1610        let metadata = Arc::new(sst_region_metadata());
1611        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1612        let file_purger = Arc::new(NoopFilePurger {});
1613        let region_id = metadata.region_id;
1614        let sst_info = mock_sst_file(metadata.clone(), &env, build_mode.clone()).await;
1615        let file_meta = FileMeta {
1616            region_id,
1617            file_id: sst_info.file_id,
1618            file_size: sst_info.file_size,
1619            index_file_size: sst_info.index_metadata.file_size,
1620            num_rows: sst_info.num_rows as u64,
1621            num_row_groups: sst_info.num_row_groups,
1622            ..Default::default()
1623        };
1624        let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1625        let version_control =
1626            mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1627        let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1628
1629        // Create mock task.
1630        let (tx, _rx) = mpsc::channel(4);
1631        let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1632        let task = IndexBuildTask {
1633            file_meta: file_meta.clone(),
1634            reason: IndexBuildType::Flush,
1635            access_layer: env.access_layer.clone(),
1636            listener: WorkerListener::default(),
1637            manifest_ctx,
1638            write_cache: None,
1639            file_purger,
1640            indexer_builder,
1641            request_sender: tx,
1642            result_sender: result_tx,
1643        };
1644
1645        scheduler
1646            .schedule_build(&version_control, task)
1647            .await
1648            .unwrap();
1649
1650        let puffin_path = location::index_file_path(
1651            env.access_layer.table_dir(),
1652            RegionIndexId::new(RegionFileId::new(region_id, file_meta.file_id), 0),
1653            env.access_layer.path_type(),
1654        );
1655
1656        if build_mode == IndexBuildMode::Async {
1657            // The index file should not exist before the task finishes.
1658            assert!(
1659                !env.access_layer
1660                    .object_store()
1661                    .exists(&puffin_path)
1662                    .await
1663                    .unwrap()
1664            );
1665        } else {
1666            // The index file should exist before the task finishes.
1667            assert!(
1668                env.access_layer
1669                    .object_store()
1670                    .exists(&puffin_path)
1671                    .await
1672                    .unwrap()
1673            );
1674        }
1675
1676        // The task should finish successfully.
1677        match result_rx.recv().await.unwrap() {
1678            Ok(outcome) => {
1679                assert_eq!(outcome, IndexBuildOutcome::Finished);
1680            }
1681            _ => panic!("Expect finished result"),
1682        }
1683
1684        // The index file should exist after the task finishes.
1685        assert!(
1686            env.access_layer
1687                .object_store()
1688                .exists(&puffin_path)
1689                .await
1690                .unwrap()
1691        );
1692    }
1693
1694    #[tokio::test]
1695    async fn test_index_build_task_build_mode() {
1696        schedule_index_build_task_with_mode(IndexBuildMode::Async).await;
1697        schedule_index_build_task_with_mode(IndexBuildMode::Sync).await;
1698    }
1699
1700    #[tokio::test]
1701    async fn test_index_build_task_no_index() {
1702        let env = SchedulerEnv::new().await;
1703        let mut scheduler = env.mock_index_build_scheduler(4);
1704        let mut metadata = sst_region_metadata();
1705        // Unset indexes in metadata to simulate no index scenario.
1706        metadata.column_metadatas.iter_mut().for_each(|col| {
1707            col.column_schema.set_inverted_index(false);
1708            let _ = col.column_schema.unset_skipping_options();
1709        });
1710        let region_id = metadata.region_id;
1711        let metadata = Arc::new(metadata);
1712        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1713        let file_purger = Arc::new(NoopFilePurger {});
1714        let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
1715        let file_meta = FileMeta {
1716            region_id,
1717            file_id: sst_info.file_id,
1718            file_size: sst_info.file_size,
1719            index_file_size: sst_info.index_metadata.file_size,
1720            num_rows: sst_info.num_rows as u64,
1721            num_row_groups: sst_info.num_row_groups,
1722            ..Default::default()
1723        };
1724        let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1725        let version_control =
1726            mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1727        let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1728
1729        // Create mock task.
1730        let (tx, mut rx) = mpsc::channel(4);
1731        let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1732        let task = IndexBuildTask {
1733            file_meta: file_meta.clone(),
1734            reason: IndexBuildType::Flush,
1735            access_layer: env.access_layer.clone(),
1736            listener: WorkerListener::default(),
1737            manifest_ctx,
1738            write_cache: None,
1739            file_purger,
1740            indexer_builder,
1741            request_sender: tx,
1742            result_sender: result_tx,
1743        };
1744
1745        scheduler
1746            .schedule_build(&version_control, task)
1747            .await
1748            .unwrap();
1749
1750        // The task should finish successfully.
1751        match result_rx.recv().await.unwrap() {
1752            Ok(outcome) => {
1753                assert_eq!(outcome, IndexBuildOutcome::Finished);
1754            }
1755            _ => panic!("Expect finished result"),
1756        }
1757
1758        // No index is built, so no notification should be sent to the worker.
1759        let _ = rx.recv().await.is_none();
1760    }
1761
1762    #[tokio::test]
1763    async fn test_index_build_task_with_write_cache() {
1764        let env = SchedulerEnv::new().await;
1765        let mut scheduler = env.mock_index_build_scheduler(4);
1766        let metadata = Arc::new(sst_region_metadata());
1767        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1768        let file_purger = Arc::new(NoopFilePurger {});
1769        let region_id = metadata.region_id;
1770
1771        let (dir, factory) = PuffinManagerFactory::new_for_test_async("test_write_cache").await;
1772        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1773
1774        // Create mock write cache
1775        let write_cache = Arc::new(
1776            WriteCache::new_fs(
1777                dir.path().to_str().unwrap(),
1778                ReadableSize::mb(10),
1779                None,
1780                None,
1781                factory,
1782                intm_manager,
1783                ReadableSize::mb(10),
1784            )
1785            .await
1786            .unwrap(),
1787        );
1788        // Indexer builder built from write cache.
1789        let indexer_builder = Arc::new(IndexerBuilderImpl {
1790            build_type: IndexBuildType::Flush,
1791            metadata: metadata.clone(),
1792            row_group_size: 1024,
1793            puffin_manager: write_cache.build_puffin_manager().clone(),
1794            write_cache_enabled: true,
1795            intermediate_manager: write_cache.intermediate_manager().clone(),
1796            index_options: IndexOptions::default(),
1797            inverted_index_config: InvertedIndexConfig::default(),
1798            fulltext_index_config: FulltextIndexConfig::default(),
1799            bloom_filter_index_config: BloomFilterConfig::default(),
1800        });
1801
1802        let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
1803        let file_meta = FileMeta {
1804            region_id,
1805            file_id: sst_info.file_id,
1806            file_size: sst_info.file_size,
1807            index_file_size: sst_info.index_metadata.file_size,
1808            num_rows: sst_info.num_rows as u64,
1809            num_row_groups: sst_info.num_row_groups,
1810            ..Default::default()
1811        };
1812        let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1813        let version_control =
1814            mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1815
1816        // Create mock task.
1817        let (tx, mut _rx) = mpsc::channel(4);
1818        let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1819        let task = IndexBuildTask {
1820            file_meta: file_meta.clone(),
1821            reason: IndexBuildType::Flush,
1822            access_layer: env.access_layer.clone(),
1823            listener: WorkerListener::default(),
1824            manifest_ctx,
1825            write_cache: Some(write_cache.clone()),
1826            file_purger,
1827            indexer_builder,
1828            request_sender: tx,
1829            result_sender: result_tx,
1830        };
1831
1832        scheduler
1833            .schedule_build(&version_control, task)
1834            .await
1835            .unwrap();
1836
1837        // The task should finish successfully.
1838        match result_rx.recv().await.unwrap() {
1839            Ok(outcome) => {
1840                assert_eq!(outcome, IndexBuildOutcome::Finished);
1841            }
1842            _ => panic!("Expect finished result"),
1843        }
1844
1845        // The write cache should contain the uploaded index file.
1846        let index_key = IndexKey::new(
1847            region_id,
1848            file_meta.file_id,
1849            FileType::Puffin(sst_info.index_metadata.version),
1850        );
1851        assert!(write_cache.file_cache().contains_key(&index_key));
1852    }
1853
1854    async fn create_mock_task_for_schedule(
1855        env: &SchedulerEnv,
1856        file_id: FileId,
1857        region_id: RegionId,
1858        reason: IndexBuildType,
1859    ) -> IndexBuildTask {
1860        let metadata = Arc::new(sst_region_metadata());
1861        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1862        let file_purger = Arc::new(NoopFilePurger {});
1863        let indexer_builder = mock_indexer_builder(metadata, env).await;
1864        let (tx, _rx) = mpsc::channel(4);
1865        let (result_tx, _result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1866
1867        IndexBuildTask {
1868            file_meta: FileMeta {
1869                region_id,
1870                file_id,
1871                file_size: 100,
1872                ..Default::default()
1873            },
1874            reason,
1875            access_layer: env.access_layer.clone(),
1876            listener: WorkerListener::default(),
1877            manifest_ctx,
1878            write_cache: None,
1879            file_purger,
1880            indexer_builder,
1881            request_sender: tx,
1882            result_sender: result_tx,
1883        }
1884    }
1885
1886    #[tokio::test]
1887    async fn test_scheduler_comprehensive() {
1888        let env = SchedulerEnv::new().await;
1889        let mut scheduler = env.mock_index_build_scheduler(2);
1890        let metadata = Arc::new(sst_region_metadata());
1891        let region_id = metadata.region_id;
1892        let file_purger = Arc::new(NoopFilePurger {});
1893
1894        // Prepare multiple files for testing
1895        let file_id1 = FileId::random();
1896        let file_id2 = FileId::random();
1897        let file_id3 = FileId::random();
1898        let file_id4 = FileId::random();
1899        let file_id5 = FileId::random();
1900
1901        let mut files = HashMap::new();
1902        for file_id in [file_id1, file_id2, file_id3, file_id4, file_id5] {
1903            files.insert(
1904                file_id,
1905                FileMeta {
1906                    region_id,
1907                    file_id,
1908                    file_size: 100,
1909                    ..Default::default()
1910                },
1911            );
1912        }
1913
1914        let version_control = mock_version_control(metadata, file_purger, files).await;
1915
1916        // Test 1: Basic scheduling
1917        let task1 =
1918            create_mock_task_for_schedule(&env, file_id1, region_id, IndexBuildType::Flush).await;
1919        assert!(
1920            scheduler
1921                .schedule_build(&version_control, task1)
1922                .await
1923                .is_ok()
1924        );
1925        assert!(scheduler.region_status.contains_key(&region_id));
1926        let status = scheduler.region_status.get(&region_id).unwrap();
1927        assert_eq!(status.building_files.len(), 1);
1928        assert!(status.building_files.contains(&file_id1));
1929
1930        // Test 2: Duplicate file scheduling (should be skipped)
1931        let task1_dup =
1932            create_mock_task_for_schedule(&env, file_id1, region_id, IndexBuildType::Flush).await;
1933        scheduler
1934            .schedule_build(&version_control, task1_dup)
1935            .await
1936            .unwrap();
1937        let status = scheduler.region_status.get(&region_id).unwrap();
1938        assert_eq!(status.building_files.len(), 1); // Still only one
1939
1940        // Test 3: Fill up to limit (2 building tasks)
1941        let task2 =
1942            create_mock_task_for_schedule(&env, file_id2, region_id, IndexBuildType::Flush).await;
1943        scheduler
1944            .schedule_build(&version_control, task2)
1945            .await
1946            .unwrap();
1947        let status = scheduler.region_status.get(&region_id).unwrap();
1948        assert_eq!(status.building_files.len(), 2); // Reached limit
1949        assert_eq!(status.pending_tasks.len(), 0);
1950
1951        // Test 4: Add tasks with different priorities to pending queue
1952        // Now all new tasks will be pending since we reached the limit
1953        let task3 =
1954            create_mock_task_for_schedule(&env, file_id3, region_id, IndexBuildType::Compact).await;
1955        let task4 =
1956            create_mock_task_for_schedule(&env, file_id4, region_id, IndexBuildType::SchemaChange)
1957                .await;
1958        let task5 =
1959            create_mock_task_for_schedule(&env, file_id5, region_id, IndexBuildType::Manual).await;
1960
1961        scheduler
1962            .schedule_build(&version_control, task3)
1963            .await
1964            .unwrap();
1965        scheduler
1966            .schedule_build(&version_control, task4)
1967            .await
1968            .unwrap();
1969        scheduler
1970            .schedule_build(&version_control, task5)
1971            .await
1972            .unwrap();
1973
1974        let status = scheduler.region_status.get(&region_id).unwrap();
1975        assert_eq!(status.building_files.len(), 2); // Still at limit
1976        assert_eq!(status.pending_tasks.len(), 3); // Three pending
1977
1978        // Test 5: Task completion triggers scheduling next highest priority task (Manual)
1979        scheduler.on_task_stopped(region_id, file_id1, &version_control);
1980        let status = scheduler.region_status.get(&region_id).unwrap();
1981        assert!(!status.building_files.contains(&file_id1));
1982        assert_eq!(status.building_files.len(), 2); // Should schedule next task
1983        assert_eq!(status.pending_tasks.len(), 2); // One less pending
1984        // The highest priority task (Manual) should now be building
1985        assert!(status.building_files.contains(&file_id5));
1986
1987        // Test 6: Complete another task, should schedule SchemaChange (second highest priority)
1988        scheduler.on_task_stopped(region_id, file_id2, &version_control);
1989        let status = scheduler.region_status.get(&region_id).unwrap();
1990        assert_eq!(status.building_files.len(), 2);
1991        assert_eq!(status.pending_tasks.len(), 1); // One less pending
1992        assert!(status.building_files.contains(&file_id4)); // SchemaChange should be building
1993
1994        // Test 7: Complete remaining tasks and cleanup
1995        scheduler.on_task_stopped(region_id, file_id5, &version_control);
1996        scheduler.on_task_stopped(region_id, file_id4, &version_control);
1997
1998        let status = scheduler.region_status.get(&region_id).unwrap();
1999        assert_eq!(status.building_files.len(), 1); // Last task (Compact) should be building
2000        assert_eq!(status.pending_tasks.len(), 0);
2001        assert!(status.building_files.contains(&file_id3));
2002
2003        scheduler.on_task_stopped(region_id, file_id3, &version_control);
2004
2005        // Region should be removed when all tasks complete
2006        assert!(!scheduler.region_status.contains_key(&region_id));
2007
2008        // Test 8: Region dropped with pending tasks
2009        let task6 =
2010            create_mock_task_for_schedule(&env, file_id1, region_id, IndexBuildType::Flush).await;
2011        let task7 =
2012            create_mock_task_for_schedule(&env, file_id2, region_id, IndexBuildType::Flush).await;
2013        let task8 =
2014            create_mock_task_for_schedule(&env, file_id3, region_id, IndexBuildType::Manual).await;
2015
2016        scheduler
2017            .schedule_build(&version_control, task6)
2018            .await
2019            .unwrap();
2020        scheduler
2021            .schedule_build(&version_control, task7)
2022            .await
2023            .unwrap();
2024        scheduler
2025            .schedule_build(&version_control, task8)
2026            .await
2027            .unwrap();
2028
2029        assert!(scheduler.region_status.contains_key(&region_id));
2030        let status = scheduler.region_status.get(&region_id).unwrap();
2031        assert_eq!(status.building_files.len(), 2);
2032        assert_eq!(status.pending_tasks.len(), 1);
2033
2034        scheduler.on_region_dropped(region_id).await;
2035        assert!(!scheduler.region_status.contains_key(&region_id));
2036    }
2037}