mito2/sst/
index.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod bloom_filter;
16pub(crate) mod fulltext_index;
17mod indexer;
18pub mod intermediate;
19pub(crate) mod inverted_index;
20pub mod puffin_manager;
21mod statistics;
22pub(crate) mod store;
23#[cfg(feature = "vector_index")]
24pub(crate) mod vector_index;
25
26use std::cmp::Ordering;
27use std::collections::{BinaryHeap, HashMap, HashSet};
28use std::num::NonZeroUsize;
29use std::sync::Arc;
30
31use bloom_filter::creator::BloomFilterIndexer;
32use common_telemetry::{debug, error, info, warn};
33use datatypes::arrow::array::BinaryArray;
34use datatypes::arrow::record_batch::RecordBatch;
35use mito_codec::index::IndexValuesCodec;
36use mito_codec::row_converter::CompositeValues;
37use object_store::ObjectStore;
38use puffin_manager::SstPuffinManager;
39use smallvec::{SmallVec, smallvec};
40use snafu::{OptionExt, ResultExt};
41use statistics::{ByteCount, RowCount};
42use store_api::metadata::RegionMetadataRef;
43use store_api::storage::{ColumnId, FileId, RegionId};
44use strum::IntoStaticStr;
45use tokio::sync::mpsc::Sender;
46#[cfg(feature = "vector_index")]
47use vector_index::creator::VectorIndexer;
48
49use crate::access_layer::{AccessLayerRef, FilePathProvider, OperationType, RegionFilePathFactory};
50use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
51use crate::cache::write_cache::{UploadTracker, WriteCacheRef};
52#[cfg(feature = "vector_index")]
53use crate::config::VectorIndexConfig;
54use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
55use crate::error::{
56    BuildIndexAsyncSnafu, DecodeSnafu, Error, InvalidRecordBatchSnafu, RegionClosedSnafu,
57    RegionDroppedSnafu, RegionTruncatedSnafu, Result,
58};
59use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
60use crate::metrics::INDEX_CREATE_MEMORY_USAGE;
61use crate::read::Batch;
62use crate::region::options::IndexOptions;
63use crate::region::version::VersionControlRef;
64use crate::region::{ManifestContextRef, RegionLeaderState};
65use crate::request::{
66    BackgroundNotify, IndexBuildFailed, IndexBuildFinished, IndexBuildStopped, WorkerRequest,
67    WorkerRequestWithTime,
68};
69use crate::schedule::scheduler::{Job, SchedulerRef};
70use crate::sst::file::{
71    ColumnIndexMetadata, FileHandle, FileMeta, IndexType, IndexTypes, RegionFileId, RegionIndexId,
72};
73use crate::sst::file_purger::FilePurgerRef;
74use crate::sst::index::fulltext_index::creator::FulltextIndexer;
75use crate::sst::index::intermediate::IntermediateManager;
76use crate::sst::index::inverted_index::creator::InvertedIndexer;
77use crate::sst::parquet::SstInfo;
78use crate::sst::parquet::flat_format::primary_key_column_index;
79use crate::sst::parquet::format::PrimaryKeyArray;
80use crate::worker::WorkerListener;
81
82pub(crate) const TYPE_INVERTED_INDEX: &str = "inverted_index";
83pub(crate) const TYPE_FULLTEXT_INDEX: &str = "fulltext_index";
84pub(crate) const TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
85#[cfg(feature = "vector_index")]
86pub(crate) const TYPE_VECTOR_INDEX: &str = "vector_index";
87
88/// Triggers background download of an index file to the local cache.
89pub(crate) fn trigger_index_background_download(
90    file_cache: Option<&FileCacheRef>,
91    file_id: &RegionIndexId,
92    file_size_hint: Option<u64>,
93    path_factory: &RegionFilePathFactory,
94    object_store: &ObjectStore,
95) {
96    if let (Some(file_cache), Some(file_size)) = (file_cache, file_size_hint) {
97        let index_key = IndexKey::new(
98            file_id.region_id(),
99            file_id.file_id(),
100            FileType::Puffin(file_id.version),
101        );
102        let remote_path = path_factory.build_index_file_path(file_id.file_id);
103        file_cache.maybe_download_background(
104            index_key,
105            remote_path,
106            object_store.clone(),
107            file_size,
108        );
109    }
110}
111
112/// Output of the index creation.
113#[derive(Debug, Clone, Default)]
114pub struct IndexOutput {
115    /// Size of the file.
116    pub file_size: u64,
117    /// Index version.
118    pub version: u64,
119    /// Inverted index output.
120    pub inverted_index: InvertedIndexOutput,
121    /// Fulltext index output.
122    pub fulltext_index: FulltextIndexOutput,
123    /// Bloom filter output.
124    pub bloom_filter: BloomFilterOutput,
125    /// Vector index output.
126    #[cfg(feature = "vector_index")]
127    pub vector_index: VectorIndexOutput,
128}
129
130impl IndexOutput {
131    pub fn build_available_indexes(&self) -> SmallVec<[IndexType; 4]> {
132        let mut indexes = SmallVec::new();
133        if self.inverted_index.is_available() {
134            indexes.push(IndexType::InvertedIndex);
135        }
136        if self.fulltext_index.is_available() {
137            indexes.push(IndexType::FulltextIndex);
138        }
139        if self.bloom_filter.is_available() {
140            indexes.push(IndexType::BloomFilterIndex);
141        }
142        #[cfg(feature = "vector_index")]
143        if self.vector_index.is_available() {
144            indexes.push(IndexType::VectorIndex);
145        }
146        indexes
147    }
148
149    pub fn build_indexes(&self) -> Vec<ColumnIndexMetadata> {
150        let mut map: HashMap<ColumnId, IndexTypes> = HashMap::new();
151
152        if self.inverted_index.is_available() {
153            for &col in &self.inverted_index.columns {
154                map.entry(col).or_default().push(IndexType::InvertedIndex);
155            }
156        }
157        if self.fulltext_index.is_available() {
158            for &col in &self.fulltext_index.columns {
159                map.entry(col).or_default().push(IndexType::FulltextIndex);
160            }
161        }
162        if self.bloom_filter.is_available() {
163            for &col in &self.bloom_filter.columns {
164                map.entry(col)
165                    .or_default()
166                    .push(IndexType::BloomFilterIndex);
167            }
168        }
169        #[cfg(feature = "vector_index")]
170        if self.vector_index.is_available() {
171            for &col in &self.vector_index.columns {
172                map.entry(col).or_default().push(IndexType::VectorIndex);
173            }
174        }
175
176        map.into_iter()
177            .map(|(column_id, created_indexes)| ColumnIndexMetadata {
178                column_id,
179                created_indexes,
180            })
181            .collect::<Vec<_>>()
182    }
183}
184
185/// Base output of the index creation.
186#[derive(Debug, Clone, Default)]
187pub struct IndexBaseOutput {
188    /// Size of the index.
189    pub index_size: ByteCount,
190    /// Number of rows in the index.
191    pub row_count: RowCount,
192    /// Available columns in the index.
193    pub columns: Vec<ColumnId>,
194}
195
196impl IndexBaseOutput {
197    pub fn is_available(&self) -> bool {
198        self.index_size > 0
199    }
200}
201
202/// Output of the inverted index creation.
203pub type InvertedIndexOutput = IndexBaseOutput;
204/// Output of the fulltext index creation.
205pub type FulltextIndexOutput = IndexBaseOutput;
206/// Output of the bloom filter creation.
207pub type BloomFilterOutput = IndexBaseOutput;
208/// Output of the vector index creation.
209#[cfg(feature = "vector_index")]
210pub type VectorIndexOutput = IndexBaseOutput;
211
212/// The index creator that hides the error handling details.
213#[derive(Default)]
214pub struct Indexer {
215    file_id: FileId,
216    region_id: RegionId,
217    index_version: u64,
218    puffin_manager: Option<SstPuffinManager>,
219    write_cache_enabled: bool,
220    inverted_indexer: Option<InvertedIndexer>,
221    last_mem_inverted_index: usize,
222    fulltext_indexer: Option<FulltextIndexer>,
223    last_mem_fulltext_index: usize,
224    bloom_filter_indexer: Option<BloomFilterIndexer>,
225    last_mem_bloom_filter: usize,
226    #[cfg(feature = "vector_index")]
227    vector_indexer: Option<VectorIndexer>,
228    #[cfg(feature = "vector_index")]
229    last_mem_vector_index: usize,
230    intermediate_manager: Option<IntermediateManager>,
231}
232
233impl Indexer {
234    /// Updates the index with the given batch.
235    pub async fn update(&mut self, batch: &mut Batch) {
236        self.do_update(batch).await;
237
238        self.flush_mem_metrics();
239    }
240
241    /// Updates the index with the given flat format RecordBatch.
242    pub async fn update_flat(&mut self, batch: &RecordBatch) {
243        self.do_update_flat(batch).await;
244
245        self.flush_mem_metrics();
246    }
247
248    /// Finalizes the index creation.
249    pub async fn finish(&mut self) -> IndexOutput {
250        let output = self.do_finish().await;
251
252        self.flush_mem_metrics();
253        output
254    }
255
256    /// Aborts the index creation.
257    pub async fn abort(&mut self) {
258        self.do_abort().await;
259
260        self.flush_mem_metrics();
261    }
262
263    fn flush_mem_metrics(&mut self) {
264        let inverted_mem = self
265            .inverted_indexer
266            .as_ref()
267            .map_or(0, |creator| creator.memory_usage());
268        INDEX_CREATE_MEMORY_USAGE
269            .with_label_values(&[TYPE_INVERTED_INDEX])
270            .add(inverted_mem as i64 - self.last_mem_inverted_index as i64);
271        self.last_mem_inverted_index = inverted_mem;
272
273        let fulltext_mem = self
274            .fulltext_indexer
275            .as_ref()
276            .map_or(0, |creator| creator.memory_usage());
277        INDEX_CREATE_MEMORY_USAGE
278            .with_label_values(&[TYPE_FULLTEXT_INDEX])
279            .add(fulltext_mem as i64 - self.last_mem_fulltext_index as i64);
280        self.last_mem_fulltext_index = fulltext_mem;
281
282        let bloom_filter_mem = self
283            .bloom_filter_indexer
284            .as_ref()
285            .map_or(0, |creator| creator.memory_usage());
286        INDEX_CREATE_MEMORY_USAGE
287            .with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
288            .add(bloom_filter_mem as i64 - self.last_mem_bloom_filter as i64);
289        self.last_mem_bloom_filter = bloom_filter_mem;
290
291        #[cfg(feature = "vector_index")]
292        {
293            let vector_mem = self
294                .vector_indexer
295                .as_ref()
296                .map_or(0, |creator| creator.memory_usage());
297            INDEX_CREATE_MEMORY_USAGE
298                .with_label_values(&[TYPE_VECTOR_INDEX])
299                .add(vector_mem as i64 - self.last_mem_vector_index as i64);
300            self.last_mem_vector_index = vector_mem;
301        }
302    }
303}
304
305#[async_trait::async_trait]
306pub trait IndexerBuilder {
307    /// Builds indexer of given file id to [index_file_path].
308    async fn build(&self, file_id: FileId, index_version: u64) -> Indexer;
309}
310#[derive(Clone)]
311pub(crate) struct IndexerBuilderImpl {
312    pub(crate) build_type: IndexBuildType,
313    pub(crate) metadata: RegionMetadataRef,
314    pub(crate) row_group_size: usize,
315    pub(crate) puffin_manager: SstPuffinManager,
316    pub(crate) write_cache_enabled: bool,
317    pub(crate) intermediate_manager: IntermediateManager,
318    pub(crate) index_options: IndexOptions,
319    pub(crate) inverted_index_config: InvertedIndexConfig,
320    pub(crate) fulltext_index_config: FulltextIndexConfig,
321    pub(crate) bloom_filter_index_config: BloomFilterConfig,
322    #[cfg(feature = "vector_index")]
323    pub(crate) vector_index_config: VectorIndexConfig,
324}
325
326#[async_trait::async_trait]
327impl IndexerBuilder for IndexerBuilderImpl {
328    /// Sanity check for arguments and create a new [Indexer] if arguments are valid.
329    async fn build(&self, file_id: FileId, index_version: u64) -> Indexer {
330        let mut indexer = Indexer {
331            file_id,
332            region_id: self.metadata.region_id,
333            index_version,
334            write_cache_enabled: self.write_cache_enabled,
335            ..Default::default()
336        };
337
338        indexer.inverted_indexer = self.build_inverted_indexer(file_id);
339        indexer.fulltext_indexer = self.build_fulltext_indexer(file_id).await;
340        indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(file_id);
341        #[cfg(feature = "vector_index")]
342        {
343            indexer.vector_indexer = self.build_vector_indexer(file_id);
344        }
345        indexer.intermediate_manager = Some(self.intermediate_manager.clone());
346
347        #[cfg(feature = "vector_index")]
348        let has_any_indexer = indexer.inverted_indexer.is_some()
349            || indexer.fulltext_indexer.is_some()
350            || indexer.bloom_filter_indexer.is_some()
351            || indexer.vector_indexer.is_some();
352        #[cfg(not(feature = "vector_index"))]
353        let has_any_indexer = indexer.inverted_indexer.is_some()
354            || indexer.fulltext_indexer.is_some()
355            || indexer.bloom_filter_indexer.is_some();
356
357        if !has_any_indexer {
358            indexer.abort().await;
359            return Indexer::default();
360        }
361
362        indexer.puffin_manager = Some(self.puffin_manager.clone());
363        indexer
364    }
365}
366
367impl IndexerBuilderImpl {
368    fn build_inverted_indexer(&self, file_id: FileId) -> Option<InvertedIndexer> {
369        let create = match self.build_type {
370            IndexBuildType::Flush => self.inverted_index_config.create_on_flush.auto(),
371            IndexBuildType::Compact => self.inverted_index_config.create_on_compaction.auto(),
372            _ => true,
373        };
374
375        if !create {
376            debug!(
377                "Skip creating inverted index due to config, region_id: {}, file_id: {}",
378                self.metadata.region_id, file_id,
379            );
380            return None;
381        }
382
383        let indexed_column_ids = self.metadata.inverted_indexed_column_ids(
384            self.index_options.inverted_index.ignore_column_ids.iter(),
385        );
386        if indexed_column_ids.is_empty() {
387            debug!(
388                "No columns to be indexed, skip creating inverted index, region_id: {}, file_id: {}",
389                self.metadata.region_id, file_id,
390            );
391            return None;
392        }
393
394        let Some(mut segment_row_count) =
395            NonZeroUsize::new(self.index_options.inverted_index.segment_row_count)
396        else {
397            warn!(
398                "Segment row count is 0, skip creating index, region_id: {}, file_id: {}",
399                self.metadata.region_id, file_id,
400            );
401            return None;
402        };
403
404        let Some(row_group_size) = NonZeroUsize::new(self.row_group_size) else {
405            warn!(
406                "Row group size is 0, skip creating index, region_id: {}, file_id: {}",
407                self.metadata.region_id, file_id,
408            );
409            return None;
410        };
411
412        // if segment row count not aligned with row group size, adjust it to be aligned.
413        if row_group_size.get() % segment_row_count.get() != 0 {
414            segment_row_count = row_group_size;
415        }
416
417        let indexer = InvertedIndexer::new(
418            file_id,
419            &self.metadata,
420            self.intermediate_manager.clone(),
421            self.inverted_index_config.mem_threshold_on_create(),
422            segment_row_count,
423            indexed_column_ids,
424        );
425
426        Some(indexer)
427    }
428
429    async fn build_fulltext_indexer(&self, file_id: FileId) -> Option<FulltextIndexer> {
430        let create = match self.build_type {
431            IndexBuildType::Flush => self.fulltext_index_config.create_on_flush.auto(),
432            IndexBuildType::Compact => self.fulltext_index_config.create_on_compaction.auto(),
433            _ => true,
434        };
435
436        if !create {
437            debug!(
438                "Skip creating full-text index due to config, region_id: {}, file_id: {}",
439                self.metadata.region_id, file_id,
440            );
441            return None;
442        }
443
444        let mem_limit = self.fulltext_index_config.mem_threshold_on_create();
445        let creator = FulltextIndexer::new(
446            &self.metadata.region_id,
447            &file_id,
448            &self.intermediate_manager,
449            &self.metadata,
450            self.fulltext_index_config.compress,
451            mem_limit,
452        )
453        .await;
454
455        let err = match creator {
456            Ok(creator) => {
457                if creator.is_none() {
458                    debug!(
459                        "Skip creating full-text index due to no columns require indexing, region_id: {}, file_id: {}",
460                        self.metadata.region_id, file_id,
461                    );
462                }
463                return creator;
464            }
465            Err(err) => err,
466        };
467
468        if cfg!(any(test, feature = "test")) {
469            panic!(
470                "Failed to create full-text indexer, region_id: {}, file_id: {}, err: {:?}",
471                self.metadata.region_id, file_id, err
472            );
473        } else {
474            warn!(
475                err; "Failed to create full-text indexer, region_id: {}, file_id: {}",
476                self.metadata.region_id, file_id,
477            );
478        }
479
480        None
481    }
482
483    fn build_bloom_filter_indexer(&self, file_id: FileId) -> Option<BloomFilterIndexer> {
484        let create = match self.build_type {
485            IndexBuildType::Flush => self.bloom_filter_index_config.create_on_flush.auto(),
486            IndexBuildType::Compact => self.bloom_filter_index_config.create_on_compaction.auto(),
487            _ => true,
488        };
489
490        if !create {
491            debug!(
492                "Skip creating bloom filter due to config, region_id: {}, file_id: {}",
493                self.metadata.region_id, file_id,
494            );
495            return None;
496        }
497
498        let mem_limit = self.bloom_filter_index_config.mem_threshold_on_create();
499        let indexer = BloomFilterIndexer::new(
500            file_id,
501            &self.metadata,
502            self.intermediate_manager.clone(),
503            mem_limit,
504        );
505
506        let err = match indexer {
507            Ok(indexer) => {
508                if indexer.is_none() {
509                    debug!(
510                        "Skip creating bloom filter due to no columns require indexing, region_id: {}, file_id: {}",
511                        self.metadata.region_id, file_id,
512                    );
513                }
514                return indexer;
515            }
516            Err(err) => err,
517        };
518
519        if cfg!(any(test, feature = "test")) {
520            panic!(
521                "Failed to create bloom filter, region_id: {}, file_id: {}, err: {:?}",
522                self.metadata.region_id, file_id, err
523            );
524        } else {
525            warn!(
526                err; "Failed to create bloom filter, region_id: {}, file_id: {}",
527                self.metadata.region_id, file_id,
528            );
529        }
530
531        None
532    }
533
534    #[cfg(feature = "vector_index")]
535    fn build_vector_indexer(&self, file_id: FileId) -> Option<VectorIndexer> {
536        let create = match self.build_type {
537            IndexBuildType::Flush => self.vector_index_config.create_on_flush.auto(),
538            IndexBuildType::Compact => self.vector_index_config.create_on_compaction.auto(),
539            _ => true,
540        };
541
542        if !create {
543            debug!(
544                "Skip creating vector index due to config, region_id: {}, file_id: {}",
545                self.metadata.region_id, file_id,
546            );
547            return None;
548        }
549
550        // Get vector index column IDs and options from metadata
551        let vector_index_options = self.metadata.vector_indexed_column_ids();
552        if vector_index_options.is_empty() {
553            debug!(
554                "No vector columns to index, skip creating vector index, region_id: {}, file_id: {}",
555                self.metadata.region_id, file_id,
556            );
557            return None;
558        }
559
560        let mem_limit = self.vector_index_config.mem_threshold_on_create();
561        let indexer = VectorIndexer::new(
562            file_id,
563            &self.metadata,
564            self.intermediate_manager.clone(),
565            mem_limit,
566            &vector_index_options,
567        );
568
569        let err = match indexer {
570            Ok(indexer) => {
571                if indexer.is_none() {
572                    debug!(
573                        "Skip creating vector index due to no columns require indexing, region_id: {}, file_id: {}",
574                        self.metadata.region_id, file_id,
575                    );
576                }
577                return indexer;
578            }
579            Err(err) => err,
580        };
581
582        if cfg!(any(test, feature = "test")) {
583            panic!(
584                "Failed to create vector index, region_id: {}, file_id: {}, err: {:?}",
585                self.metadata.region_id, file_id, err
586            );
587        } else {
588            warn!(
589                err; "Failed to create vector index, region_id: {}, file_id: {}",
590                self.metadata.region_id, file_id,
591            );
592        }
593
594        None
595    }
596}
597
598/// Type of an index build task.
599#[derive(Debug, Clone, IntoStaticStr, PartialEq)]
600pub enum IndexBuildType {
601    /// Build index when schema change.
602    SchemaChange,
603    /// Create or update index after flush.
604    Flush,
605    /// Create or update index after compact.
606    Compact,
607    /// Manually build index.
608    Manual,
609}
610
611impl IndexBuildType {
612    fn as_str(&self) -> &'static str {
613        self.into()
614    }
615
616    // Higher value means higher priority.
617    fn priority(&self) -> u8 {
618        match self {
619            IndexBuildType::Manual => 3,
620            IndexBuildType::SchemaChange => 2,
621            IndexBuildType::Flush => 1,
622            IndexBuildType::Compact => 0,
623        }
624    }
625}
626
627impl From<OperationType> for IndexBuildType {
628    fn from(op_type: OperationType) -> Self {
629        match op_type {
630            OperationType::Flush => IndexBuildType::Flush,
631            OperationType::Compact => IndexBuildType::Compact,
632        }
633    }
634}
635
636/// Outcome of an index build task.
637#[derive(Debug, Clone, PartialEq, Eq, Hash)]
638pub enum IndexBuildOutcome {
639    Finished,
640    Aborted(String),
641}
642
643/// Mpsc output result sender.
644pub type ResultMpscSender = Sender<Result<IndexBuildOutcome>>;
645
646#[derive(Clone)]
647pub struct IndexBuildTask {
648    /// The SST file handle to build index for.
649    pub file: FileHandle,
650    /// The file meta to build index for.
651    pub file_meta: FileMeta,
652    pub reason: IndexBuildType,
653    pub access_layer: AccessLayerRef,
654    pub(crate) listener: WorkerListener,
655    pub(crate) manifest_ctx: ManifestContextRef,
656    pub write_cache: Option<WriteCacheRef>,
657    pub file_purger: FilePurgerRef,
658    /// When write cache is enabled, the indexer builder should be built from the write cache.
659    /// Otherwise, it should be built from the access layer.
660    pub indexer_builder: Arc<dyn IndexerBuilder + Send + Sync>,
661    /// Request sender to notify the region worker.
662    pub(crate) request_sender: Sender<WorkerRequestWithTime>,
663    /// Index build result sender.
664    pub(crate) result_sender: ResultMpscSender,
665}
666
667impl std::fmt::Debug for IndexBuildTask {
668    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
669        f.debug_struct("IndexBuildTask")
670            .field("region_id", &self.file_meta.region_id)
671            .field("file_id", &self.file_meta.file_id)
672            .field("reason", &self.reason)
673            .finish()
674    }
675}
676
677impl IndexBuildTask {
678    /// Notify the caller the job is success.
679    pub async fn on_success(&self, outcome: IndexBuildOutcome) {
680        let _ = self.result_sender.send(Ok(outcome)).await;
681    }
682
683    /// Send index build error to waiter.
684    pub async fn on_failure(&self, err: Arc<Error>) {
685        let _ = self
686            .result_sender
687            .send(Err(err.clone()).context(BuildIndexAsyncSnafu {
688                region_id: self.file_meta.region_id,
689            }))
690            .await;
691    }
692
693    fn into_index_build_job(mut self, version_control: VersionControlRef) -> Job {
694        Box::pin(async move {
695            self.do_index_build(version_control).await;
696        })
697    }
698
699    async fn do_index_build(&mut self, version_control: VersionControlRef) {
700        self.listener
701            .on_index_build_begin(RegionFileId::new(
702                self.file_meta.region_id,
703                self.file_meta.file_id,
704            ))
705            .await;
706        match self.index_build(version_control).await {
707            Ok(outcome) => self.on_success(outcome).await,
708            Err(e) => {
709                warn!(
710                    e; "Index build task failed, region: {}, file_id: {}",
711                    self.file_meta.region_id, self.file_meta.file_id,
712                );
713                self.on_failure(e.into()).await
714            }
715        }
716        let worker_request = WorkerRequest::Background {
717            region_id: self.file_meta.region_id,
718            notify: BackgroundNotify::IndexBuildStopped(IndexBuildStopped {
719                region_id: self.file_meta.region_id,
720                file_id: self.file_meta.file_id,
721            }),
722        };
723        let _ = self
724            .request_sender
725            .send(WorkerRequestWithTime::new(worker_request))
726            .await;
727    }
728
729    // Checks if the SST file still exists in object store and version to avoid conflict with compaction.
730    async fn check_sst_file_exists(&self, version_control: &VersionControlRef) -> bool {
731        let file_id = self.file_meta.file_id;
732        let level = self.file_meta.level;
733        // We should check current version instead of the version when the job is created.
734        let version = version_control.current().version;
735
736        let Some(level_files) = version.ssts.levels().get(level as usize) else {
737            warn!(
738                "File id {} not found in level {} for index build, region: {}",
739                file_id, level, self.file_meta.region_id
740            );
741            return false;
742        };
743
744        match level_files.files.get(&file_id) {
745            Some(handle) if !handle.is_deleted() && !handle.compacting() => {
746                // If the file's metadata is present in the current version, the physical SST file
747                // is guaranteed to exist on object store. The file purger removes the physical
748                // file only after its metadata is removed from the version.
749                true
750            }
751            _ => {
752                warn!(
753                    "File id {} not found in region version for index build, region: {}",
754                    file_id, self.file_meta.region_id
755                );
756                false
757            }
758        }
759    }
760
761    async fn index_build(
762        &mut self,
763        version_control: VersionControlRef,
764    ) -> Result<IndexBuildOutcome> {
765        // Determine the new index version
766        let new_index_version = if self.file_meta.index_file_size > 0 {
767            // Increment version if index file exists to avoid overwrite.
768            self.file_meta.index_version + 1
769        } else {
770            0 // Default version for new index files
771        };
772
773        // Use the same file_id but with new version for index file
774        let index_file_id = self.file_meta.file_id;
775        let mut indexer = self
776            .indexer_builder
777            .build(index_file_id, new_index_version)
778            .await;
779
780        // Check SST file existence before building index to avoid failure of parquet reader.
781        if !self.check_sst_file_exists(&version_control).await {
782            // Calls abort to clean up index files.
783            indexer.abort().await;
784            self.listener
785                .on_index_build_abort(RegionFileId::new(
786                    self.file_meta.region_id,
787                    self.file_meta.file_id,
788                ))
789                .await;
790            return Ok(IndexBuildOutcome::Aborted(format!(
791                "SST file not found during index build, region: {}, file_id: {}",
792                self.file_meta.region_id, self.file_meta.file_id
793            )));
794        }
795
796        let parquet_reader = self
797            .access_layer
798            .read_sst(self.file.clone()) // use the latest file handle instead of creating a new one
799            .build()
800            .await?;
801
802        if let Some(mut parquet_reader) = parquet_reader {
803            // TODO(SNC123): optimize index batch
804            loop {
805                match parquet_reader.next_record_batch().await {
806                    Ok(Some(batch)) => {
807                        indexer.update_flat(&batch).await;
808                    }
809                    Ok(None) => break,
810                    Err(e) => {
811                        indexer.abort().await;
812                        return Err(e);
813                    }
814                }
815            }
816        }
817        let index_output = indexer.finish().await;
818
819        if index_output.file_size > 0 {
820            // Check SST file existence again after building index.
821            if !self.check_sst_file_exists(&version_control).await {
822                // Calls abort to clean up index files.
823                indexer.abort().await;
824                self.listener
825                    .on_index_build_abort(RegionFileId::new(
826                        self.file_meta.region_id,
827                        self.file_meta.file_id,
828                    ))
829                    .await;
830                return Ok(IndexBuildOutcome::Aborted(format!(
831                    "SST file not found during index build, region: {}, file_id: {}",
832                    self.file_meta.region_id, self.file_meta.file_id
833                )));
834            }
835
836            // Upload index file if write cache is enabled.
837            self.maybe_upload_index_file(index_output.clone(), index_file_id, new_index_version)
838                .await?;
839
840            let worker_request = match self.update_manifest(index_output, new_index_version).await {
841                Ok(edit) => {
842                    let index_build_finished = IndexBuildFinished {
843                        region_id: self.file_meta.region_id,
844                        edit,
845                    };
846                    WorkerRequest::Background {
847                        region_id: self.file_meta.region_id,
848                        notify: BackgroundNotify::IndexBuildFinished(index_build_finished),
849                    }
850                }
851                Err(e) => {
852                    let err = Arc::new(e);
853                    WorkerRequest::Background {
854                        region_id: self.file_meta.region_id,
855                        notify: BackgroundNotify::IndexBuildFailed(IndexBuildFailed { err }),
856                    }
857                }
858            };
859
860            let _ = self
861                .request_sender
862                .send(WorkerRequestWithTime::new(worker_request))
863                .await;
864        }
865        Ok(IndexBuildOutcome::Finished)
866    }
867
868    async fn maybe_upload_index_file(
869        &self,
870        output: IndexOutput,
871        index_file_id: FileId,
872        index_version: u64,
873    ) -> Result<()> {
874        if let Some(write_cache) = &self.write_cache {
875            let file_id = self.file_meta.file_id;
876            let region_id = self.file_meta.region_id;
877            let remote_store = self.access_layer.object_store();
878            let mut upload_tracker = UploadTracker::new(region_id);
879            let mut err = None;
880            let puffin_key =
881                IndexKey::new(region_id, index_file_id, FileType::Puffin(output.version));
882            let index_id = RegionIndexId::new(RegionFileId::new(region_id, file_id), index_version);
883            let puffin_path = RegionFilePathFactory::new(
884                self.access_layer.table_dir().to_string(),
885                self.access_layer.path_type(),
886            )
887            .build_index_file_path_with_version(index_id);
888            if let Err(e) = write_cache
889                .upload(puffin_key, &puffin_path, remote_store)
890                .await
891            {
892                err = Some(e);
893            }
894            upload_tracker.push_uploaded_file(puffin_path);
895            if let Some(err) = err {
896                // Cleans index files on failure.
897                upload_tracker
898                    .clean(
899                        &smallvec![SstInfo {
900                            file_id,
901                            index_metadata: output,
902                            ..Default::default()
903                        }],
904                        &write_cache.file_cache(),
905                        remote_store,
906                    )
907                    .await;
908                return Err(err);
909            }
910        } else {
911            debug!("write cache is not available, skip uploading index file");
912        }
913        Ok(())
914    }
915
916    async fn update_manifest(
917        &mut self,
918        output: IndexOutput,
919        new_index_version: u64,
920    ) -> Result<RegionEdit> {
921        self.file_meta.available_indexes = output.build_available_indexes();
922        self.file_meta.indexes = output.build_indexes();
923        self.file_meta.index_file_size = output.file_size;
924        self.file_meta.index_version = new_index_version;
925        let edit = RegionEdit {
926            files_to_add: vec![self.file_meta.clone()],
927            files_to_remove: vec![],
928            timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
929            flushed_sequence: None,
930            flushed_entry_id: None,
931            committed_sequence: None,
932            compaction_time_window: None,
933        };
934        let version = self
935            .manifest_ctx
936            .update_manifest(
937                RegionLeaderState::Writable,
938                RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())),
939                false,
940            )
941            .await?;
942        info!(
943            "Successfully update manifest version to {version}, region: {}, reason: {}",
944            self.file_meta.region_id,
945            self.reason.as_str()
946        );
947        Ok(edit)
948    }
949}
950
951impl PartialEq for IndexBuildTask {
952    fn eq(&self, other: &Self) -> bool {
953        self.reason.priority() == other.reason.priority()
954    }
955}
956
957impl Eq for IndexBuildTask {}
958
959impl PartialOrd for IndexBuildTask {
960    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
961        Some(self.cmp(other))
962    }
963}
964
965impl Ord for IndexBuildTask {
966    fn cmp(&self, other: &Self) -> Ordering {
967        self.reason.priority().cmp(&other.reason.priority())
968    }
969}
970
971/// Tracks the index build status of a region scheduled by the [IndexBuildScheduler].
972pub struct IndexBuildStatus {
973    pub region_id: RegionId,
974    pub building_files: HashSet<FileId>,
975    pub pending_tasks: BinaryHeap<IndexBuildTask>,
976}
977
978impl IndexBuildStatus {
979    pub fn new(region_id: RegionId) -> Self {
980        IndexBuildStatus {
981            region_id,
982            building_files: HashSet::new(),
983            pending_tasks: BinaryHeap::new(),
984        }
985    }
986
987    async fn on_failure(self, err: Arc<Error>) {
988        for task in self.pending_tasks {
989            task.on_failure(err.clone()).await;
990        }
991    }
992}
993
994pub struct IndexBuildScheduler {
995    /// Background job scheduler.
996    scheduler: SchedulerRef,
997    /// Tracks regions need to build index.
998    region_status: HashMap<RegionId, IndexBuildStatus>,
999    /// Limit of files allowed to build index concurrently for a region.
1000    files_limit: usize,
1001}
1002
1003/// Manager background index build tasks of a worker.
1004impl IndexBuildScheduler {
1005    pub fn new(scheduler: SchedulerRef, files_limit: usize) -> Self {
1006        IndexBuildScheduler {
1007            scheduler,
1008            region_status: HashMap::new(),
1009            files_limit,
1010        }
1011    }
1012
1013    pub(crate) async fn schedule_build(
1014        &mut self,
1015        version_control: &VersionControlRef,
1016        task: IndexBuildTask,
1017    ) -> Result<()> {
1018        let status = self
1019            .region_status
1020            .entry(task.file_meta.region_id)
1021            .or_insert_with(|| IndexBuildStatus::new(task.file_meta.region_id));
1022
1023        if status.building_files.contains(&task.file_meta.file_id) {
1024            let region_file_id =
1025                RegionFileId::new(task.file_meta.region_id, task.file_meta.file_id);
1026            debug!(
1027                "Aborting index build task since index is already being built for region file {:?}",
1028                region_file_id
1029            );
1030            task.on_success(IndexBuildOutcome::Aborted(format!(
1031                "Index is already being built for region file {:?}",
1032                region_file_id
1033            )))
1034            .await;
1035            task.listener.on_index_build_abort(region_file_id).await;
1036            return Ok(());
1037        }
1038
1039        status.pending_tasks.push(task);
1040
1041        self.schedule_next_build_batch(version_control);
1042        Ok(())
1043    }
1044
1045    /// Schedule tasks until reaching the files limit or no more tasks.
1046    fn schedule_next_build_batch(&mut self, version_control: &VersionControlRef) {
1047        let mut building_count = 0;
1048        for status in self.region_status.values() {
1049            building_count += status.building_files.len();
1050        }
1051
1052        while building_count < self.files_limit {
1053            if let Some(task) = self.find_next_task() {
1054                let region_id = task.file_meta.region_id;
1055                let file_id = task.file_meta.file_id;
1056                let job = task.into_index_build_job(version_control.clone());
1057                if self.scheduler.schedule(job).is_ok() {
1058                    if let Some(status) = self.region_status.get_mut(&region_id) {
1059                        status.building_files.insert(file_id);
1060                        building_count += 1;
1061                        status
1062                            .pending_tasks
1063                            .retain(|t| t.file_meta.file_id != file_id);
1064                    } else {
1065                        error!(
1066                            "Region status not found when scheduling index build task, region: {}",
1067                            region_id
1068                        );
1069                    }
1070                } else {
1071                    error!(
1072                        "Failed to schedule index build job, region: {}, file_id: {}",
1073                        region_id, file_id
1074                    );
1075                }
1076            } else {
1077                // No more tasks to schedule.
1078                break;
1079            }
1080        }
1081    }
1082
1083    /// Find the next task which has the highest priority to run.
1084    fn find_next_task(&self) -> Option<IndexBuildTask> {
1085        self.region_status
1086            .iter()
1087            .filter_map(|(_, status)| status.pending_tasks.peek())
1088            .max()
1089            .cloned()
1090    }
1091
1092    pub(crate) fn on_task_stopped(
1093        &mut self,
1094        region_id: RegionId,
1095        file_id: FileId,
1096        version_control: &VersionControlRef,
1097    ) {
1098        if let Some(status) = self.region_status.get_mut(&region_id) {
1099            status.building_files.remove(&file_id);
1100            if status.building_files.is_empty() && status.pending_tasks.is_empty() {
1101                // No more tasks for this region, remove it.
1102                self.region_status.remove(&region_id);
1103            }
1104        }
1105
1106        self.schedule_next_build_batch(version_control);
1107    }
1108
1109    pub(crate) async fn on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
1110        error!(
1111            err; "Index build scheduler encountered failure for region {}, removing all pending tasks.",
1112            region_id
1113        );
1114        let Some(status) = self.region_status.remove(&region_id) else {
1115            return;
1116        };
1117        status.on_failure(err).await;
1118    }
1119
1120    /// Notifies the scheduler that the region is dropped.
1121    pub(crate) async fn on_region_dropped(&mut self, region_id: RegionId) {
1122        self.remove_region_on_failure(
1123            region_id,
1124            Arc::new(RegionDroppedSnafu { region_id }.build()),
1125        )
1126        .await;
1127    }
1128
1129    /// Notifies the scheduler that the region is closed.
1130    pub(crate) async fn on_region_closed(&mut self, region_id: RegionId) {
1131        self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()))
1132            .await;
1133    }
1134
1135    /// Notifies the scheduler that the region is truncated.
1136    pub(crate) async fn on_region_truncated(&mut self, region_id: RegionId) {
1137        self.remove_region_on_failure(
1138            region_id,
1139            Arc::new(RegionTruncatedSnafu { region_id }.build()),
1140        )
1141        .await;
1142    }
1143
1144    async fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
1145        let Some(status) = self.region_status.remove(&region_id) else {
1146            return;
1147        };
1148        status.on_failure(err).await;
1149    }
1150}
1151
1152/// Decodes primary keys from a flat format RecordBatch.
1153/// Returns a list of (decoded_pk_value, count) tuples where count is the number of occurrences.
1154pub(crate) fn decode_primary_keys_with_counts(
1155    batch: &RecordBatch,
1156    codec: &IndexValuesCodec,
1157) -> Result<Vec<(CompositeValues, usize)>> {
1158    let primary_key_index = primary_key_column_index(batch.num_columns());
1159    let pk_dict_array = batch
1160        .column(primary_key_index)
1161        .as_any()
1162        .downcast_ref::<PrimaryKeyArray>()
1163        .context(InvalidRecordBatchSnafu {
1164            reason: "Primary key column is not a dictionary array",
1165        })?;
1166    let pk_values_array = pk_dict_array
1167        .values()
1168        .as_any()
1169        .downcast_ref::<BinaryArray>()
1170        .context(InvalidRecordBatchSnafu {
1171            reason: "Primary key values are not binary array",
1172        })?;
1173    let keys = pk_dict_array.keys();
1174
1175    // Decodes primary keys and count consecutive occurrences
1176    let mut result: Vec<(CompositeValues, usize)> = Vec::new();
1177    let mut prev_key: Option<u32> = None;
1178
1179    let pk_indices = keys.values();
1180    for &current_key in pk_indices.iter().take(keys.len()) {
1181        // Checks if current key is the same as previous key
1182        if let Some(prev) = prev_key
1183            && prev == current_key
1184        {
1185            // Safety: We already have a key in the result vector.
1186            result.last_mut().unwrap().1 += 1;
1187            continue;
1188        }
1189
1190        // New key, decodes it.
1191        let pk_bytes = pk_values_array.value(current_key as usize);
1192        let decoded_value = codec.decoder().decode(pk_bytes).context(DecodeSnafu)?;
1193
1194        result.push((decoded_value, 1));
1195        prev_key = Some(current_key);
1196    }
1197
1198    Ok(result)
1199}
1200
1201#[cfg(test)]
1202mod tests {
1203    use std::sync::Arc;
1204
1205    use api::v1::SemanticType;
1206    use common_base::readable_size::ReadableSize;
1207    use datafusion_common::HashMap;
1208    use datatypes::data_type::ConcreteDataType;
1209    use datatypes::schema::{
1210        ColumnSchema, FulltextOptions, SkippingIndexOptions, SkippingIndexType,
1211    };
1212    use object_store::ObjectStore;
1213    use object_store::services::Memory;
1214    use puffin_manager::PuffinManagerFactory;
1215    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1216    use tokio::sync::mpsc;
1217
1218    use super::*;
1219    use crate::access_layer::{FilePathProvider, Metrics, SstWriteRequest, WriteType};
1220    use crate::cache::write_cache::WriteCache;
1221    use crate::config::{FulltextIndexConfig, IndexBuildMode, MitoConfig, Mode};
1222    use crate::memtable::time_partition::TimePartitions;
1223    use crate::region::version::{VersionBuilder, VersionControl};
1224    use crate::sst::file::RegionFileId;
1225    use crate::sst::file_purger::NoopFilePurger;
1226    use crate::sst::location;
1227    use crate::sst::parquet::WriteOptions;
1228    use crate::test_util::memtable_util::EmptyMemtableBuilder;
1229    use crate::test_util::scheduler_util::SchedulerEnv;
1230    use crate::test_util::sst_util::{
1231        new_flat_source_from_record_batches, new_record_batch_by_range, sst_region_metadata,
1232    };
1233
1234    struct MetaConfig {
1235        with_inverted: bool,
1236        with_fulltext: bool,
1237        with_skipping_bloom: bool,
1238        #[cfg(feature = "vector_index")]
1239        with_vector: bool,
1240    }
1241
1242    fn mock_region_metadata(
1243        MetaConfig {
1244            with_inverted,
1245            with_fulltext,
1246            with_skipping_bloom,
1247            #[cfg(feature = "vector_index")]
1248            with_vector,
1249        }: MetaConfig,
1250    ) -> RegionMetadataRef {
1251        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
1252        let mut column_schema = ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false);
1253        if with_inverted {
1254            column_schema = column_schema.with_inverted_index(true);
1255        }
1256        builder
1257            .push_column_metadata(ColumnMetadata {
1258                column_schema,
1259                semantic_type: SemanticType::Field,
1260                column_id: 1,
1261            })
1262            .push_column_metadata(ColumnMetadata {
1263                column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
1264                semantic_type: SemanticType::Field,
1265                column_id: 2,
1266            })
1267            .push_column_metadata(ColumnMetadata {
1268                column_schema: ColumnSchema::new(
1269                    "c",
1270                    ConcreteDataType::timestamp_millisecond_datatype(),
1271                    false,
1272                ),
1273                semantic_type: SemanticType::Timestamp,
1274                column_id: 3,
1275            });
1276
1277        if with_fulltext {
1278            let column_schema =
1279                ColumnSchema::new("text", ConcreteDataType::string_datatype(), true)
1280                    .with_fulltext_options(FulltextOptions {
1281                        enable: true,
1282                        ..Default::default()
1283                    })
1284                    .unwrap();
1285
1286            let column = ColumnMetadata {
1287                column_schema,
1288                semantic_type: SemanticType::Field,
1289                column_id: 4,
1290            };
1291
1292            builder.push_column_metadata(column);
1293        }
1294
1295        if with_skipping_bloom {
1296            let column_schema =
1297                ColumnSchema::new("bloom", ConcreteDataType::string_datatype(), false)
1298                    .with_skipping_options(SkippingIndexOptions::new_unchecked(
1299                        42,
1300                        0.01,
1301                        SkippingIndexType::BloomFilter,
1302                    ))
1303                    .unwrap();
1304
1305            let column = ColumnMetadata {
1306                column_schema,
1307                semantic_type: SemanticType::Field,
1308                column_id: 5,
1309            };
1310
1311            builder.push_column_metadata(column);
1312        }
1313
1314        #[cfg(feature = "vector_index")]
1315        if with_vector {
1316            use index::vector::VectorIndexOptions;
1317
1318            let options = VectorIndexOptions::default();
1319            let column_schema =
1320                ColumnSchema::new("vec", ConcreteDataType::vector_datatype(4), true)
1321                    .with_vector_index_options(&options)
1322                    .unwrap();
1323            let column = ColumnMetadata {
1324                column_schema,
1325                semantic_type: SemanticType::Field,
1326                column_id: 6,
1327            };
1328
1329            builder.push_column_metadata(column);
1330        }
1331
1332        Arc::new(builder.build().unwrap())
1333    }
1334
1335    fn mock_object_store() -> ObjectStore {
1336        ObjectStore::new(Memory::default()).unwrap().finish()
1337    }
1338
1339    async fn mock_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
1340        IntermediateManager::init_fs(path).await.unwrap()
1341    }
1342    struct NoopPathProvider;
1343
1344    impl FilePathProvider for NoopPathProvider {
1345        fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
1346            unreachable!()
1347        }
1348
1349        fn build_index_file_path_with_version(&self, _index_id: RegionIndexId) -> String {
1350            unreachable!()
1351        }
1352
1353        fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
1354            unreachable!()
1355        }
1356    }
1357
1358    async fn mock_sst_file(
1359        metadata: RegionMetadataRef,
1360        env: &SchedulerEnv,
1361        build_mode: IndexBuildMode,
1362    ) -> SstInfo {
1363        let source = new_flat_source_from_record_batches(vec![
1364            new_record_batch_by_range(&["a", "d"], 0, 60),
1365            new_record_batch_by_range(&["b", "f"], 0, 40),
1366            new_record_batch_by_range(&["b", "h"], 100, 200),
1367        ]);
1368        let mut index_config = MitoConfig::default().index;
1369        index_config.build_mode = build_mode;
1370        let write_request = SstWriteRequest {
1371            op_type: OperationType::Flush,
1372            metadata: metadata.clone(),
1373            source,
1374            storage: None,
1375            max_sequence: None,
1376            sst_write_format: Default::default(),
1377            cache_manager: Default::default(),
1378            index_options: IndexOptions::default(),
1379            index_config,
1380            inverted_index_config: Default::default(),
1381            fulltext_index_config: Default::default(),
1382            bloom_filter_index_config: Default::default(),
1383            #[cfg(feature = "vector_index")]
1384            vector_index_config: Default::default(),
1385        };
1386        let mut metrics = Metrics::new(WriteType::Flush);
1387        env.access_layer
1388            .write_sst(write_request, &WriteOptions::default(), &mut metrics)
1389            .await
1390            .unwrap()
1391            .remove(0)
1392    }
1393
1394    async fn mock_version_control(
1395        metadata: RegionMetadataRef,
1396        file_purger: FilePurgerRef,
1397        files: HashMap<FileId, FileMeta>,
1398    ) -> VersionControlRef {
1399        let mutable = Arc::new(TimePartitions::new(
1400            metadata.clone(),
1401            Arc::new(EmptyMemtableBuilder::default()),
1402            0,
1403            None,
1404        ));
1405        let version_builder = VersionBuilder::new(metadata, mutable)
1406            .add_files(file_purger, files.values().cloned())
1407            .build();
1408        Arc::new(VersionControl::new(version_builder))
1409    }
1410
1411    async fn mock_indexer_builder(
1412        metadata: RegionMetadataRef,
1413        env: &SchedulerEnv,
1414    ) -> Arc<dyn IndexerBuilder + Send + Sync> {
1415        let (dir, factory) = PuffinManagerFactory::new_for_test_async("mock_indexer_builder").await;
1416        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1417        let puffin_manager = factory.build(
1418            env.access_layer.object_store().clone(),
1419            RegionFilePathFactory::new(
1420                env.access_layer.table_dir().to_string(),
1421                env.access_layer.path_type(),
1422            ),
1423        );
1424        Arc::new(IndexerBuilderImpl {
1425            build_type: IndexBuildType::Flush,
1426            metadata,
1427            row_group_size: 1024,
1428            puffin_manager,
1429            write_cache_enabled: false,
1430            intermediate_manager: intm_manager,
1431            index_options: IndexOptions::default(),
1432            inverted_index_config: InvertedIndexConfig::default(),
1433            fulltext_index_config: FulltextIndexConfig::default(),
1434            bloom_filter_index_config: BloomFilterConfig::default(),
1435            #[cfg(feature = "vector_index")]
1436            vector_index_config: Default::default(),
1437        })
1438    }
1439
1440    #[tokio::test]
1441    async fn test_build_indexer_basic() {
1442        let (dir, factory) =
1443            PuffinManagerFactory::new_for_test_async("test_build_indexer_basic_").await;
1444        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1445
1446        let metadata = mock_region_metadata(MetaConfig {
1447            with_inverted: true,
1448            with_fulltext: true,
1449            with_skipping_bloom: true,
1450            #[cfg(feature = "vector_index")]
1451            with_vector: false,
1452        });
1453        let indexer = IndexerBuilderImpl {
1454            build_type: IndexBuildType::Flush,
1455            metadata,
1456            row_group_size: 1024,
1457            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1458            write_cache_enabled: false,
1459            intermediate_manager: intm_manager,
1460            index_options: IndexOptions::default(),
1461            inverted_index_config: InvertedIndexConfig::default(),
1462            fulltext_index_config: FulltextIndexConfig::default(),
1463            bloom_filter_index_config: BloomFilterConfig::default(),
1464            #[cfg(feature = "vector_index")]
1465            vector_index_config: Default::default(),
1466        }
1467        .build(FileId::random(), 0)
1468        .await;
1469
1470        assert!(indexer.inverted_indexer.is_some());
1471        assert!(indexer.fulltext_indexer.is_some());
1472        assert!(indexer.bloom_filter_indexer.is_some());
1473    }
1474
1475    #[tokio::test]
1476    async fn test_build_indexer_disable_create() {
1477        let (dir, factory) =
1478            PuffinManagerFactory::new_for_test_async("test_build_indexer_disable_create_").await;
1479        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1480
1481        let metadata = mock_region_metadata(MetaConfig {
1482            with_inverted: true,
1483            with_fulltext: true,
1484            with_skipping_bloom: true,
1485            #[cfg(feature = "vector_index")]
1486            with_vector: false,
1487        });
1488        let indexer = IndexerBuilderImpl {
1489            build_type: IndexBuildType::Flush,
1490            metadata: metadata.clone(),
1491            row_group_size: 1024,
1492            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1493            write_cache_enabled: false,
1494            intermediate_manager: intm_manager.clone(),
1495            index_options: IndexOptions::default(),
1496            inverted_index_config: InvertedIndexConfig {
1497                create_on_flush: Mode::Disable,
1498                ..Default::default()
1499            },
1500            fulltext_index_config: FulltextIndexConfig::default(),
1501            bloom_filter_index_config: BloomFilterConfig::default(),
1502            #[cfg(feature = "vector_index")]
1503            vector_index_config: Default::default(),
1504        }
1505        .build(FileId::random(), 0)
1506        .await;
1507
1508        assert!(indexer.inverted_indexer.is_none());
1509        assert!(indexer.fulltext_indexer.is_some());
1510        assert!(indexer.bloom_filter_indexer.is_some());
1511
1512        let indexer = IndexerBuilderImpl {
1513            build_type: IndexBuildType::Compact,
1514            metadata: metadata.clone(),
1515            row_group_size: 1024,
1516            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1517            write_cache_enabled: false,
1518            intermediate_manager: intm_manager.clone(),
1519            index_options: IndexOptions::default(),
1520            inverted_index_config: InvertedIndexConfig::default(),
1521            fulltext_index_config: FulltextIndexConfig {
1522                create_on_compaction: Mode::Disable,
1523                ..Default::default()
1524            },
1525            bloom_filter_index_config: BloomFilterConfig::default(),
1526            #[cfg(feature = "vector_index")]
1527            vector_index_config: Default::default(),
1528        }
1529        .build(FileId::random(), 0)
1530        .await;
1531
1532        assert!(indexer.inverted_indexer.is_some());
1533        assert!(indexer.fulltext_indexer.is_none());
1534        assert!(indexer.bloom_filter_indexer.is_some());
1535
1536        let indexer = IndexerBuilderImpl {
1537            build_type: IndexBuildType::Compact,
1538            metadata,
1539            row_group_size: 1024,
1540            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1541            write_cache_enabled: false,
1542            intermediate_manager: intm_manager,
1543            index_options: IndexOptions::default(),
1544            inverted_index_config: InvertedIndexConfig::default(),
1545            fulltext_index_config: FulltextIndexConfig::default(),
1546            bloom_filter_index_config: BloomFilterConfig {
1547                create_on_compaction: Mode::Disable,
1548                ..Default::default()
1549            },
1550            #[cfg(feature = "vector_index")]
1551            vector_index_config: Default::default(),
1552        }
1553        .build(FileId::random(), 0)
1554        .await;
1555
1556        assert!(indexer.inverted_indexer.is_some());
1557        assert!(indexer.fulltext_indexer.is_some());
1558        assert!(indexer.bloom_filter_indexer.is_none());
1559    }
1560
1561    #[tokio::test]
1562    async fn test_build_indexer_no_required() {
1563        let (dir, factory) =
1564            PuffinManagerFactory::new_for_test_async("test_build_indexer_no_required_").await;
1565        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1566
1567        let metadata = mock_region_metadata(MetaConfig {
1568            with_inverted: false,
1569            with_fulltext: true,
1570            with_skipping_bloom: true,
1571            #[cfg(feature = "vector_index")]
1572            with_vector: false,
1573        });
1574        let indexer = IndexerBuilderImpl {
1575            build_type: IndexBuildType::Flush,
1576            metadata: metadata.clone(),
1577            row_group_size: 1024,
1578            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1579            write_cache_enabled: false,
1580            intermediate_manager: intm_manager.clone(),
1581            index_options: IndexOptions::default(),
1582            inverted_index_config: InvertedIndexConfig::default(),
1583            fulltext_index_config: FulltextIndexConfig::default(),
1584            bloom_filter_index_config: BloomFilterConfig::default(),
1585            #[cfg(feature = "vector_index")]
1586            vector_index_config: Default::default(),
1587        }
1588        .build(FileId::random(), 0)
1589        .await;
1590
1591        assert!(indexer.inverted_indexer.is_none());
1592        assert!(indexer.fulltext_indexer.is_some());
1593        assert!(indexer.bloom_filter_indexer.is_some());
1594
1595        let metadata = mock_region_metadata(MetaConfig {
1596            with_inverted: true,
1597            with_fulltext: false,
1598            with_skipping_bloom: true,
1599            #[cfg(feature = "vector_index")]
1600            with_vector: false,
1601        });
1602        let indexer = IndexerBuilderImpl {
1603            build_type: IndexBuildType::Flush,
1604            metadata: metadata.clone(),
1605            row_group_size: 1024,
1606            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1607            write_cache_enabled: false,
1608            intermediate_manager: intm_manager.clone(),
1609            index_options: IndexOptions::default(),
1610            inverted_index_config: InvertedIndexConfig::default(),
1611            fulltext_index_config: FulltextIndexConfig::default(),
1612            bloom_filter_index_config: BloomFilterConfig::default(),
1613            #[cfg(feature = "vector_index")]
1614            vector_index_config: Default::default(),
1615        }
1616        .build(FileId::random(), 0)
1617        .await;
1618
1619        assert!(indexer.inverted_indexer.is_some());
1620        assert!(indexer.fulltext_indexer.is_none());
1621        assert!(indexer.bloom_filter_indexer.is_some());
1622
1623        let metadata = mock_region_metadata(MetaConfig {
1624            with_inverted: true,
1625            with_fulltext: true,
1626            with_skipping_bloom: false,
1627            #[cfg(feature = "vector_index")]
1628            with_vector: false,
1629        });
1630        let indexer = IndexerBuilderImpl {
1631            build_type: IndexBuildType::Flush,
1632            metadata: metadata.clone(),
1633            row_group_size: 1024,
1634            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1635            write_cache_enabled: false,
1636            intermediate_manager: intm_manager,
1637            index_options: IndexOptions::default(),
1638            inverted_index_config: InvertedIndexConfig::default(),
1639            fulltext_index_config: FulltextIndexConfig::default(),
1640            bloom_filter_index_config: BloomFilterConfig::default(),
1641            #[cfg(feature = "vector_index")]
1642            vector_index_config: Default::default(),
1643        }
1644        .build(FileId::random(), 0)
1645        .await;
1646
1647        assert!(indexer.inverted_indexer.is_some());
1648        assert!(indexer.fulltext_indexer.is_some());
1649        assert!(indexer.bloom_filter_indexer.is_none());
1650    }
1651
1652    #[tokio::test]
1653    async fn test_build_indexer_zero_row_group() {
1654        let (dir, factory) =
1655            PuffinManagerFactory::new_for_test_async("test_build_indexer_zero_row_group_").await;
1656        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1657
1658        let metadata = mock_region_metadata(MetaConfig {
1659            with_inverted: true,
1660            with_fulltext: true,
1661            with_skipping_bloom: true,
1662            #[cfg(feature = "vector_index")]
1663            with_vector: false,
1664        });
1665        let indexer = IndexerBuilderImpl {
1666            build_type: IndexBuildType::Flush,
1667            metadata,
1668            row_group_size: 0,
1669            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1670            write_cache_enabled: false,
1671            intermediate_manager: intm_manager,
1672            index_options: IndexOptions::default(),
1673            inverted_index_config: InvertedIndexConfig::default(),
1674            fulltext_index_config: FulltextIndexConfig::default(),
1675            bloom_filter_index_config: BloomFilterConfig::default(),
1676            #[cfg(feature = "vector_index")]
1677            vector_index_config: Default::default(),
1678        }
1679        .build(FileId::random(), 0)
1680        .await;
1681
1682        assert!(indexer.inverted_indexer.is_none());
1683    }
1684
1685    #[cfg(feature = "vector_index")]
1686    #[tokio::test]
1687    async fn test_update_flat_builds_vector_index() {
1688        use datatypes::arrow::array::BinaryBuilder;
1689        use datatypes::arrow::datatypes::{DataType, Field, Schema};
1690
1691        struct TestPathProvider;
1692
1693        impl FilePathProvider for TestPathProvider {
1694            fn build_index_file_path(&self, file_id: RegionFileId) -> String {
1695                format!("index/{}.puffin", file_id)
1696            }
1697
1698            fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
1699                format!("index/{}.puffin", index_id)
1700            }
1701
1702            fn build_sst_file_path(&self, file_id: RegionFileId) -> String {
1703                format!("sst/{}.parquet", file_id)
1704            }
1705        }
1706
1707        fn f32s_to_bytes(values: &[f32]) -> Vec<u8> {
1708            let mut bytes = Vec::with_capacity(values.len() * 4);
1709            for v in values {
1710                bytes.extend_from_slice(&v.to_le_bytes());
1711            }
1712            bytes
1713        }
1714
1715        let (dir, factory) =
1716            PuffinManagerFactory::new_for_test_async("test_update_flat_builds_vector_index_").await;
1717        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1718
1719        let metadata = mock_region_metadata(MetaConfig {
1720            with_inverted: false,
1721            with_fulltext: false,
1722            with_skipping_bloom: false,
1723            with_vector: true,
1724        });
1725
1726        let mut indexer = IndexerBuilderImpl {
1727            build_type: IndexBuildType::Flush,
1728            metadata,
1729            row_group_size: 1024,
1730            puffin_manager: factory.build(mock_object_store(), TestPathProvider),
1731            write_cache_enabled: false,
1732            intermediate_manager: intm_manager,
1733            index_options: IndexOptions::default(),
1734            inverted_index_config: InvertedIndexConfig::default(),
1735            fulltext_index_config: FulltextIndexConfig::default(),
1736            bloom_filter_index_config: BloomFilterConfig::default(),
1737            vector_index_config: Default::default(),
1738        }
1739        .build(FileId::random(), 0)
1740        .await;
1741
1742        assert!(indexer.vector_indexer.is_some());
1743
1744        let vec1 = f32s_to_bytes(&[1.0, 0.0, 0.0, 0.0]);
1745        let vec2 = f32s_to_bytes(&[0.0, 1.0, 0.0, 0.0]);
1746
1747        let mut builder = BinaryBuilder::with_capacity(2, vec1.len() + vec2.len());
1748        builder.append_value(&vec1);
1749        builder.append_value(&vec2);
1750
1751        let schema = Arc::new(Schema::new(vec![Field::new("vec", DataType::Binary, true)]));
1752        let batch = RecordBatch::try_new(schema, vec![Arc::new(builder.finish())]).unwrap();
1753
1754        indexer.update_flat(&batch).await;
1755        let output = indexer.finish().await;
1756
1757        assert!(output.vector_index.is_available());
1758        assert!(output.vector_index.columns.contains(&6));
1759    }
1760
1761    #[tokio::test]
1762    async fn test_index_build_task_sst_not_exist() {
1763        let env = SchedulerEnv::new().await;
1764        let (tx, _rx) = mpsc::channel(4);
1765        let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1766        let mut scheduler = env.mock_index_build_scheduler(4);
1767        let metadata = Arc::new(sst_region_metadata());
1768        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1769        let file_purger = Arc::new(NoopFilePurger {});
1770        let files = HashMap::new();
1771        let version_control =
1772            mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1773        let region_id = metadata.region_id;
1774        let indexer_builder = mock_indexer_builder(metadata, &env).await;
1775
1776        let file_meta = FileMeta {
1777            region_id,
1778            file_id: FileId::random(),
1779            file_size: 100,
1780            ..Default::default()
1781        };
1782
1783        let file = FileHandle::new(file_meta.clone(), file_purger.clone());
1784
1785        // Create mock task.
1786        let task = IndexBuildTask {
1787            file,
1788            file_meta,
1789            reason: IndexBuildType::Flush,
1790            access_layer: env.access_layer.clone(),
1791            listener: WorkerListener::default(),
1792            manifest_ctx,
1793            write_cache: None,
1794            file_purger,
1795            indexer_builder,
1796            request_sender: tx,
1797            result_sender: result_tx,
1798        };
1799
1800        // Schedule the build task and check result.
1801        scheduler
1802            .schedule_build(&version_control, task)
1803            .await
1804            .unwrap();
1805        match result_rx.recv().await.unwrap() {
1806            Ok(outcome) => {
1807                if outcome == IndexBuildOutcome::Finished {
1808                    panic!("Expect aborted result due to missing SST file")
1809                }
1810            }
1811            _ => panic!("Expect aborted result due to missing SST file"),
1812        }
1813    }
1814
1815    #[tokio::test]
1816    async fn test_index_build_task_sst_exist() {
1817        let env = SchedulerEnv::new().await;
1818        let mut scheduler = env.mock_index_build_scheduler(4);
1819        let metadata = Arc::new(sst_region_metadata());
1820        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1821        let region_id = metadata.region_id;
1822        let file_purger = Arc::new(NoopFilePurger {});
1823        let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
1824        let file_meta = FileMeta {
1825            region_id,
1826            file_id: sst_info.file_id,
1827            file_size: sst_info.file_size,
1828            max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
1829            index_file_size: sst_info.index_metadata.file_size,
1830            num_rows: sst_info.num_rows as u64,
1831            num_row_groups: sst_info.num_row_groups,
1832            ..Default::default()
1833        };
1834        let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1835        let version_control =
1836            mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1837        let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1838
1839        let file = FileHandle::new(file_meta.clone(), file_purger.clone());
1840
1841        // Create mock task.
1842        let (tx, mut rx) = mpsc::channel(4);
1843        let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1844        let task = IndexBuildTask {
1845            file,
1846            file_meta: file_meta.clone(),
1847            reason: IndexBuildType::Flush,
1848            access_layer: env.access_layer.clone(),
1849            listener: WorkerListener::default(),
1850            manifest_ctx,
1851            write_cache: None,
1852            file_purger,
1853            indexer_builder,
1854            request_sender: tx,
1855            result_sender: result_tx,
1856        };
1857
1858        scheduler
1859            .schedule_build(&version_control, task)
1860            .await
1861            .unwrap();
1862
1863        // The task should finish successfully.
1864        match result_rx.recv().await.unwrap() {
1865            Ok(outcome) => {
1866                assert_eq!(outcome, IndexBuildOutcome::Finished);
1867            }
1868            _ => panic!("Expect finished result"),
1869        }
1870
1871        // A notification should be sent to the worker to update the manifest.
1872        let worker_req = rx.recv().await.unwrap().request;
1873        match worker_req {
1874            WorkerRequest::Background {
1875                region_id: req_region_id,
1876                notify: BackgroundNotify::IndexBuildFinished(finished),
1877            } => {
1878                assert_eq!(req_region_id, region_id);
1879                assert_eq!(finished.edit.files_to_add.len(), 1);
1880                let updated_meta = &finished.edit.files_to_add[0];
1881
1882                // The mock indexer builder creates all index types.
1883                assert!(!updated_meta.available_indexes.is_empty());
1884                assert!(updated_meta.index_file_size > 0);
1885                assert_eq!(updated_meta.file_id, file_meta.file_id);
1886            }
1887            _ => panic!("Unexpected worker request: {:?}", worker_req),
1888        }
1889    }
1890
1891    async fn schedule_index_build_task_with_mode(build_mode: IndexBuildMode) {
1892        let env = SchedulerEnv::new().await;
1893        let mut scheduler = env.mock_index_build_scheduler(4);
1894        let metadata = Arc::new(sst_region_metadata());
1895        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1896        let file_purger = Arc::new(NoopFilePurger {});
1897        let region_id = metadata.region_id;
1898        let sst_info = mock_sst_file(metadata.clone(), &env, build_mode.clone()).await;
1899        let file_meta = FileMeta {
1900            region_id,
1901            file_id: sst_info.file_id,
1902            file_size: sst_info.file_size,
1903            max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
1904            index_file_size: sst_info.index_metadata.file_size,
1905            num_rows: sst_info.num_rows as u64,
1906            num_row_groups: sst_info.num_row_groups,
1907            ..Default::default()
1908        };
1909        let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1910        let version_control =
1911            mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1912        let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1913
1914        let file = FileHandle::new(file_meta.clone(), file_purger.clone());
1915
1916        // Create mock task.
1917        let (tx, _rx) = mpsc::channel(4);
1918        let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1919        let task = IndexBuildTask {
1920            file,
1921            file_meta: file_meta.clone(),
1922            reason: IndexBuildType::Flush,
1923            access_layer: env.access_layer.clone(),
1924            listener: WorkerListener::default(),
1925            manifest_ctx,
1926            write_cache: None,
1927            file_purger,
1928            indexer_builder,
1929            request_sender: tx,
1930            result_sender: result_tx,
1931        };
1932
1933        scheduler
1934            .schedule_build(&version_control, task)
1935            .await
1936            .unwrap();
1937
1938        let puffin_path = location::index_file_path(
1939            env.access_layer.table_dir(),
1940            RegionIndexId::new(RegionFileId::new(region_id, file_meta.file_id), 0),
1941            env.access_layer.path_type(),
1942        );
1943
1944        if build_mode == IndexBuildMode::Async {
1945            // The index file should not exist before the task finishes.
1946            assert!(
1947                !env.access_layer
1948                    .object_store()
1949                    .exists(&puffin_path)
1950                    .await
1951                    .unwrap()
1952            );
1953        } else {
1954            // The index file should exist before the task finishes.
1955            assert!(
1956                env.access_layer
1957                    .object_store()
1958                    .exists(&puffin_path)
1959                    .await
1960                    .unwrap()
1961            );
1962        }
1963
1964        // The task should finish successfully.
1965        match result_rx.recv().await.unwrap() {
1966            Ok(outcome) => {
1967                assert_eq!(outcome, IndexBuildOutcome::Finished);
1968            }
1969            _ => panic!("Expect finished result"),
1970        }
1971
1972        // The index file should exist after the task finishes.
1973        assert!(
1974            env.access_layer
1975                .object_store()
1976                .exists(&puffin_path)
1977                .await
1978                .unwrap()
1979        );
1980    }
1981
1982    #[tokio::test]
1983    async fn test_index_build_task_build_mode() {
1984        schedule_index_build_task_with_mode(IndexBuildMode::Async).await;
1985        schedule_index_build_task_with_mode(IndexBuildMode::Sync).await;
1986    }
1987
1988    #[tokio::test]
1989    async fn test_index_build_task_no_index() {
1990        let env = SchedulerEnv::new().await;
1991        let mut scheduler = env.mock_index_build_scheduler(4);
1992        let mut metadata = sst_region_metadata();
1993        // Unset indexes in metadata to simulate no index scenario.
1994        metadata.column_metadatas.iter_mut().for_each(|col| {
1995            col.column_schema.set_inverted_index(false);
1996            let _ = col.column_schema.unset_skipping_options();
1997        });
1998        let region_id = metadata.region_id;
1999        let metadata = Arc::new(metadata);
2000        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
2001        let file_purger = Arc::new(NoopFilePurger {});
2002        let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
2003        let file_meta = FileMeta {
2004            region_id,
2005            file_id: sst_info.file_id,
2006            file_size: sst_info.file_size,
2007            max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
2008            index_file_size: sst_info.index_metadata.file_size,
2009            num_rows: sst_info.num_rows as u64,
2010            num_row_groups: sst_info.num_row_groups,
2011            ..Default::default()
2012        };
2013        let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
2014        let version_control =
2015            mock_version_control(metadata.clone(), file_purger.clone(), files).await;
2016        let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
2017
2018        let file = FileHandle::new(file_meta.clone(), file_purger.clone());
2019
2020        // Create mock task.
2021        let (tx, mut rx) = mpsc::channel(4);
2022        let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
2023        let task = IndexBuildTask {
2024            file,
2025            file_meta: file_meta.clone(),
2026            reason: IndexBuildType::Flush,
2027            access_layer: env.access_layer.clone(),
2028            listener: WorkerListener::default(),
2029            manifest_ctx,
2030            write_cache: None,
2031            file_purger,
2032            indexer_builder,
2033            request_sender: tx,
2034            result_sender: result_tx,
2035        };
2036
2037        scheduler
2038            .schedule_build(&version_control, task)
2039            .await
2040            .unwrap();
2041
2042        // The task should finish successfully.
2043        match result_rx.recv().await.unwrap() {
2044            Ok(outcome) => {
2045                assert_eq!(outcome, IndexBuildOutcome::Finished);
2046            }
2047            _ => panic!("Expect finished result"),
2048        }
2049
2050        // No index is built, so no notification should be sent to the worker.
2051        let _ = rx.recv().await.is_none();
2052    }
2053
2054    #[tokio::test]
2055    async fn test_index_build_task_with_write_cache() {
2056        let env = SchedulerEnv::new().await;
2057        let mut scheduler = env.mock_index_build_scheduler(4);
2058        let metadata = Arc::new(sst_region_metadata());
2059        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
2060        let file_purger = Arc::new(NoopFilePurger {});
2061        let region_id = metadata.region_id;
2062
2063        let (dir, factory) = PuffinManagerFactory::new_for_test_async("test_write_cache").await;
2064        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
2065
2066        // Create mock write cache
2067        let write_cache = Arc::new(
2068            WriteCache::new_fs(
2069                dir.path().to_str().unwrap(),
2070                ReadableSize::mb(10),
2071                None,
2072                None,
2073                true, // enable_background_worker
2074                factory,
2075                intm_manager,
2076                ReadableSize::mb(10),
2077            )
2078            .await
2079            .unwrap(),
2080        );
2081        // Indexer builder built from write cache.
2082        let indexer_builder = Arc::new(IndexerBuilderImpl {
2083            build_type: IndexBuildType::Flush,
2084            metadata: metadata.clone(),
2085            row_group_size: 1024,
2086            puffin_manager: write_cache.build_puffin_manager().clone(),
2087            write_cache_enabled: true,
2088            intermediate_manager: write_cache.intermediate_manager().clone(),
2089            index_options: IndexOptions::default(),
2090            inverted_index_config: InvertedIndexConfig::default(),
2091            fulltext_index_config: FulltextIndexConfig::default(),
2092            bloom_filter_index_config: BloomFilterConfig::default(),
2093            #[cfg(feature = "vector_index")]
2094            vector_index_config: Default::default(),
2095        });
2096
2097        let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
2098        let file_meta = FileMeta {
2099            region_id,
2100            file_id: sst_info.file_id,
2101            file_size: sst_info.file_size,
2102            index_file_size: sst_info.index_metadata.file_size,
2103            num_rows: sst_info.num_rows as u64,
2104            num_row_groups: sst_info.num_row_groups,
2105            ..Default::default()
2106        };
2107        let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
2108        let version_control =
2109            mock_version_control(metadata.clone(), file_purger.clone(), files).await;
2110
2111        let file = FileHandle::new(file_meta.clone(), file_purger.clone());
2112
2113        // Create mock task.
2114        let (tx, mut _rx) = mpsc::channel(4);
2115        let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
2116        let task = IndexBuildTask {
2117            file,
2118            file_meta: file_meta.clone(),
2119            reason: IndexBuildType::Flush,
2120            access_layer: env.access_layer.clone(),
2121            listener: WorkerListener::default(),
2122            manifest_ctx,
2123            write_cache: Some(write_cache.clone()),
2124            file_purger,
2125            indexer_builder,
2126            request_sender: tx,
2127            result_sender: result_tx,
2128        };
2129
2130        scheduler
2131            .schedule_build(&version_control, task)
2132            .await
2133            .unwrap();
2134
2135        // The task should finish successfully.
2136        match result_rx.recv().await.unwrap() {
2137            Ok(outcome) => {
2138                assert_eq!(outcome, IndexBuildOutcome::Finished);
2139            }
2140            _ => panic!("Expect finished result"),
2141        }
2142
2143        // The write cache should contain the uploaded index file.
2144        let index_key = IndexKey::new(
2145            region_id,
2146            file_meta.file_id,
2147            FileType::Puffin(sst_info.index_metadata.version),
2148        );
2149        assert!(write_cache.file_cache().contains_key(&index_key));
2150    }
2151
2152    async fn create_mock_task_for_schedule(
2153        env: &SchedulerEnv,
2154        file_id: FileId,
2155        region_id: RegionId,
2156        reason: IndexBuildType,
2157    ) -> IndexBuildTask {
2158        let metadata = Arc::new(sst_region_metadata());
2159        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
2160        let file_purger = Arc::new(NoopFilePurger {});
2161        let indexer_builder = mock_indexer_builder(metadata, env).await;
2162        let (tx, _rx) = mpsc::channel(4);
2163        let (result_tx, _result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
2164
2165        let file_meta = FileMeta {
2166            region_id,
2167            file_id,
2168            file_size: 100,
2169            ..Default::default()
2170        };
2171
2172        let file = FileHandle::new(file_meta.clone(), file_purger.clone());
2173
2174        IndexBuildTask {
2175            file,
2176            file_meta,
2177            reason,
2178            access_layer: env.access_layer.clone(),
2179            listener: WorkerListener::default(),
2180            manifest_ctx,
2181            write_cache: None,
2182            file_purger,
2183            indexer_builder,
2184            request_sender: tx,
2185            result_sender: result_tx,
2186        }
2187    }
2188
2189    #[tokio::test]
2190    async fn test_scheduler_comprehensive() {
2191        let env = SchedulerEnv::new().await;
2192        let mut scheduler = env.mock_index_build_scheduler(2);
2193        let metadata = Arc::new(sst_region_metadata());
2194        let region_id = metadata.region_id;
2195        let file_purger = Arc::new(NoopFilePurger {});
2196
2197        // Prepare multiple files for testing
2198        let file_id1 = FileId::random();
2199        let file_id2 = FileId::random();
2200        let file_id3 = FileId::random();
2201        let file_id4 = FileId::random();
2202        let file_id5 = FileId::random();
2203
2204        let mut files = HashMap::new();
2205        for file_id in [file_id1, file_id2, file_id3, file_id4, file_id5] {
2206            files.insert(
2207                file_id,
2208                FileMeta {
2209                    region_id,
2210                    file_id,
2211                    file_size: 100,
2212                    ..Default::default()
2213                },
2214            );
2215        }
2216
2217        let version_control = mock_version_control(metadata, file_purger, files).await;
2218
2219        // Test 1: Basic scheduling
2220        let task1 =
2221            create_mock_task_for_schedule(&env, file_id1, region_id, IndexBuildType::Flush).await;
2222        assert!(
2223            scheduler
2224                .schedule_build(&version_control, task1)
2225                .await
2226                .is_ok()
2227        );
2228        assert!(scheduler.region_status.contains_key(&region_id));
2229        let status = scheduler.region_status.get(&region_id).unwrap();
2230        assert_eq!(status.building_files.len(), 1);
2231        assert!(status.building_files.contains(&file_id1));
2232
2233        // Test 2: Duplicate file scheduling (should be skipped)
2234        let task1_dup =
2235            create_mock_task_for_schedule(&env, file_id1, region_id, IndexBuildType::Flush).await;
2236        scheduler
2237            .schedule_build(&version_control, task1_dup)
2238            .await
2239            .unwrap();
2240        let status = scheduler.region_status.get(&region_id).unwrap();
2241        assert_eq!(status.building_files.len(), 1); // Still only one
2242
2243        // Test 3: Fill up to limit (2 building tasks)
2244        let task2 =
2245            create_mock_task_for_schedule(&env, file_id2, region_id, IndexBuildType::Flush).await;
2246        scheduler
2247            .schedule_build(&version_control, task2)
2248            .await
2249            .unwrap();
2250        let status = scheduler.region_status.get(&region_id).unwrap();
2251        assert_eq!(status.building_files.len(), 2); // Reached limit
2252        assert_eq!(status.pending_tasks.len(), 0);
2253
2254        // Test 4: Add tasks with different priorities to pending queue
2255        // Now all new tasks will be pending since we reached the limit
2256        let task3 =
2257            create_mock_task_for_schedule(&env, file_id3, region_id, IndexBuildType::Compact).await;
2258        let task4 =
2259            create_mock_task_for_schedule(&env, file_id4, region_id, IndexBuildType::SchemaChange)
2260                .await;
2261        let task5 =
2262            create_mock_task_for_schedule(&env, file_id5, region_id, IndexBuildType::Manual).await;
2263
2264        scheduler
2265            .schedule_build(&version_control, task3)
2266            .await
2267            .unwrap();
2268        scheduler
2269            .schedule_build(&version_control, task4)
2270            .await
2271            .unwrap();
2272        scheduler
2273            .schedule_build(&version_control, task5)
2274            .await
2275            .unwrap();
2276
2277        let status = scheduler.region_status.get(&region_id).unwrap();
2278        assert_eq!(status.building_files.len(), 2); // Still at limit
2279        assert_eq!(status.pending_tasks.len(), 3); // Three pending
2280
2281        // Test 5: Task completion triggers scheduling next highest priority task (Manual)
2282        scheduler.on_task_stopped(region_id, file_id1, &version_control);
2283        let status = scheduler.region_status.get(&region_id).unwrap();
2284        assert!(!status.building_files.contains(&file_id1));
2285        assert_eq!(status.building_files.len(), 2); // Should schedule next task
2286        assert_eq!(status.pending_tasks.len(), 2); // One less pending
2287        // The highest priority task (Manual) should now be building
2288        assert!(status.building_files.contains(&file_id5));
2289
2290        // Test 6: Complete another task, should schedule SchemaChange (second highest priority)
2291        scheduler.on_task_stopped(region_id, file_id2, &version_control);
2292        let status = scheduler.region_status.get(&region_id).unwrap();
2293        assert_eq!(status.building_files.len(), 2);
2294        assert_eq!(status.pending_tasks.len(), 1); // One less pending
2295        assert!(status.building_files.contains(&file_id4)); // SchemaChange should be building
2296
2297        // Test 7: Complete remaining tasks and cleanup
2298        scheduler.on_task_stopped(region_id, file_id5, &version_control);
2299        scheduler.on_task_stopped(region_id, file_id4, &version_control);
2300
2301        let status = scheduler.region_status.get(&region_id).unwrap();
2302        assert_eq!(status.building_files.len(), 1); // Last task (Compact) should be building
2303        assert_eq!(status.pending_tasks.len(), 0);
2304        assert!(status.building_files.contains(&file_id3));
2305
2306        scheduler.on_task_stopped(region_id, file_id3, &version_control);
2307
2308        // Region should be removed when all tasks complete
2309        assert!(!scheduler.region_status.contains_key(&region_id));
2310
2311        // Test 8: Region dropped with pending tasks
2312        let task6 =
2313            create_mock_task_for_schedule(&env, file_id1, region_id, IndexBuildType::Flush).await;
2314        let task7 =
2315            create_mock_task_for_schedule(&env, file_id2, region_id, IndexBuildType::Flush).await;
2316        let task8 =
2317            create_mock_task_for_schedule(&env, file_id3, region_id, IndexBuildType::Manual).await;
2318
2319        scheduler
2320            .schedule_build(&version_control, task6)
2321            .await
2322            .unwrap();
2323        scheduler
2324            .schedule_build(&version_control, task7)
2325            .await
2326            .unwrap();
2327        scheduler
2328            .schedule_build(&version_control, task8)
2329            .await
2330            .unwrap();
2331
2332        assert!(scheduler.region_status.contains_key(&region_id));
2333        let status = scheduler.region_status.get(&region_id).unwrap();
2334        assert_eq!(status.building_files.len(), 2);
2335        assert_eq!(status.pending_tasks.len(), 1);
2336
2337        scheduler.on_region_dropped(region_id).await;
2338        assert!(!scheduler.region_status.contains_key(&region_id));
2339    }
2340}