mito2/sst/
index.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod bloom_filter;
16pub(crate) mod fulltext_index;
17mod indexer;
18pub mod intermediate;
19pub(crate) mod inverted_index;
20pub mod puffin_manager;
21mod statistics;
22pub(crate) mod store;
23
24use std::num::NonZeroUsize;
25use std::sync::Arc;
26
27use bloom_filter::creator::BloomFilterIndexer;
28use common_telemetry::{debug, info, warn};
29use datatypes::arrow::record_batch::RecordBatch;
30use puffin_manager::SstPuffinManager;
31use smallvec::{SmallVec, smallvec};
32use statistics::{ByteCount, RowCount};
33use store_api::metadata::RegionMetadataRef;
34use store_api::storage::{ColumnId, FileId, RegionId};
35use strum::IntoStaticStr;
36use tokio::sync::{mpsc, oneshot};
37
38use crate::access_layer::{AccessLayerRef, FilePathProvider, OperationType, RegionFilePathFactory};
39use crate::cache::file_cache::{FileType, IndexKey};
40use crate::cache::write_cache::{UploadTracker, WriteCacheRef};
41use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
42use crate::error::Result;
43use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
44use crate::metrics::INDEX_CREATE_MEMORY_USAGE;
45use crate::read::{Batch, BatchReader};
46use crate::region::options::IndexOptions;
47use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
48use crate::region::{ManifestContextRef, RegionLeaderState};
49use crate::request::{
50    BackgroundNotify, IndexBuildFailed, IndexBuildFinished, WorkerRequest, WorkerRequestWithTime,
51};
52use crate::schedule::scheduler::{Job, SchedulerRef};
53use crate::sst::file::{FileHandle, FileMeta, IndexType, RegionFileId};
54use crate::sst::file_purger::FilePurgerRef;
55use crate::sst::index::fulltext_index::creator::FulltextIndexer;
56use crate::sst::index::intermediate::IntermediateManager;
57use crate::sst::index::inverted_index::creator::InvertedIndexer;
58use crate::sst::parquet::SstInfo;
59
60pub(crate) const TYPE_INVERTED_INDEX: &str = "inverted_index";
61pub(crate) const TYPE_FULLTEXT_INDEX: &str = "fulltext_index";
62pub(crate) const TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
63
64/// Output of the index creation.
65#[derive(Debug, Clone, Default)]
66pub struct IndexOutput {
67    /// Size of the file.
68    pub file_size: u64,
69    /// Inverted index output.
70    pub inverted_index: InvertedIndexOutput,
71    /// Fulltext index output.
72    pub fulltext_index: FulltextIndexOutput,
73    /// Bloom filter output.
74    pub bloom_filter: BloomFilterOutput,
75}
76
77impl IndexOutput {
78    pub fn build_available_indexes(&self) -> SmallVec<[IndexType; 4]> {
79        let mut indexes = SmallVec::new();
80        if self.inverted_index.is_available() {
81            indexes.push(IndexType::InvertedIndex);
82        }
83        if self.fulltext_index.is_available() {
84            indexes.push(IndexType::FulltextIndex);
85        }
86        if self.bloom_filter.is_available() {
87            indexes.push(IndexType::BloomFilterIndex);
88        }
89        indexes
90    }
91}
92
93/// Base output of the index creation.
94#[derive(Debug, Clone, Default)]
95pub struct IndexBaseOutput {
96    /// Size of the index.
97    pub index_size: ByteCount,
98    /// Number of rows in the index.
99    pub row_count: RowCount,
100    /// Available columns in the index.
101    pub columns: Vec<ColumnId>,
102}
103
104impl IndexBaseOutput {
105    pub fn is_available(&self) -> bool {
106        self.index_size > 0
107    }
108}
109
110/// Output of the inverted index creation.
111pub type InvertedIndexOutput = IndexBaseOutput;
112/// Output of the fulltext index creation.
113pub type FulltextIndexOutput = IndexBaseOutput;
114/// Output of the bloom filter creation.
115pub type BloomFilterOutput = IndexBaseOutput;
116
117/// The index creator that hides the error handling details.
118#[derive(Default)]
119pub struct Indexer {
120    file_id: FileId,
121    region_id: RegionId,
122    puffin_manager: Option<SstPuffinManager>,
123    inverted_indexer: Option<InvertedIndexer>,
124    last_mem_inverted_index: usize,
125    fulltext_indexer: Option<FulltextIndexer>,
126    last_mem_fulltext_index: usize,
127    bloom_filter_indexer: Option<BloomFilterIndexer>,
128    last_mem_bloom_filter: usize,
129    intermediate_manager: Option<IntermediateManager>,
130}
131
132impl Indexer {
133    /// Updates the index with the given batch.
134    pub async fn update(&mut self, batch: &mut Batch) {
135        self.do_update(batch).await;
136
137        self.flush_mem_metrics();
138    }
139
140    /// Updates the index with the given flat format RecordBatch.
141    pub async fn update_flat(&mut self, batch: &RecordBatch) {
142        self.do_update_flat(batch).await;
143
144        self.flush_mem_metrics();
145    }
146
147    /// Finalizes the index creation.
148    pub async fn finish(&mut self) -> IndexOutput {
149        let output = self.do_finish().await;
150
151        self.flush_mem_metrics();
152        output
153    }
154
155    /// Aborts the index creation.
156    pub async fn abort(&mut self) {
157        self.do_abort().await;
158
159        self.flush_mem_metrics();
160    }
161
162    fn flush_mem_metrics(&mut self) {
163        let inverted_mem = self
164            .inverted_indexer
165            .as_ref()
166            .map_or(0, |creator| creator.memory_usage());
167        INDEX_CREATE_MEMORY_USAGE
168            .with_label_values(&[TYPE_INVERTED_INDEX])
169            .add(inverted_mem as i64 - self.last_mem_inverted_index as i64);
170        self.last_mem_inverted_index = inverted_mem;
171
172        let fulltext_mem = self
173            .fulltext_indexer
174            .as_ref()
175            .map_or(0, |creator| creator.memory_usage());
176        INDEX_CREATE_MEMORY_USAGE
177            .with_label_values(&[TYPE_FULLTEXT_INDEX])
178            .add(fulltext_mem as i64 - self.last_mem_fulltext_index as i64);
179        self.last_mem_fulltext_index = fulltext_mem;
180
181        let bloom_filter_mem = self
182            .bloom_filter_indexer
183            .as_ref()
184            .map_or(0, |creator| creator.memory_usage());
185        INDEX_CREATE_MEMORY_USAGE
186            .with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
187            .add(bloom_filter_mem as i64 - self.last_mem_bloom_filter as i64);
188        self.last_mem_bloom_filter = bloom_filter_mem;
189    }
190}
191
192#[async_trait::async_trait]
193pub trait IndexerBuilder {
194    /// Builds indexer of given file id to [index_file_path].
195    async fn build(&self, file_id: FileId) -> Indexer;
196}
197#[derive(Clone)]
198pub(crate) struct IndexerBuilderImpl {
199    pub(crate) build_type: IndexBuildType,
200    pub(crate) metadata: RegionMetadataRef,
201    pub(crate) row_group_size: usize,
202    pub(crate) puffin_manager: SstPuffinManager,
203    pub(crate) intermediate_manager: IntermediateManager,
204    pub(crate) index_options: IndexOptions,
205    pub(crate) inverted_index_config: InvertedIndexConfig,
206    pub(crate) fulltext_index_config: FulltextIndexConfig,
207    pub(crate) bloom_filter_index_config: BloomFilterConfig,
208}
209
210#[async_trait::async_trait]
211impl IndexerBuilder for IndexerBuilderImpl {
212    /// Sanity check for arguments and create a new [Indexer] if arguments are valid.
213    async fn build(&self, file_id: FileId) -> Indexer {
214        let mut indexer = Indexer {
215            file_id,
216            region_id: self.metadata.region_id,
217            ..Default::default()
218        };
219
220        indexer.inverted_indexer = self.build_inverted_indexer(file_id);
221        indexer.fulltext_indexer = self.build_fulltext_indexer(file_id).await;
222        indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(file_id);
223        indexer.intermediate_manager = Some(self.intermediate_manager.clone());
224        if indexer.inverted_indexer.is_none()
225            && indexer.fulltext_indexer.is_none()
226            && indexer.bloom_filter_indexer.is_none()
227        {
228            indexer.abort().await;
229            return Indexer::default();
230        }
231
232        indexer.puffin_manager = Some(self.puffin_manager.clone());
233        indexer
234    }
235}
236
237impl IndexerBuilderImpl {
238    fn build_inverted_indexer(&self, file_id: FileId) -> Option<InvertedIndexer> {
239        let create = match self.build_type {
240            IndexBuildType::Flush => self.inverted_index_config.create_on_flush.auto(),
241            IndexBuildType::Compact => self.inverted_index_config.create_on_compaction.auto(),
242            _ => true,
243        };
244
245        if !create {
246            debug!(
247                "Skip creating inverted index due to config, region_id: {}, file_id: {}",
248                self.metadata.region_id, file_id,
249            );
250            return None;
251        }
252
253        let indexed_column_ids = self.metadata.inverted_indexed_column_ids(
254            self.index_options.inverted_index.ignore_column_ids.iter(),
255        );
256        if indexed_column_ids.is_empty() {
257            debug!(
258                "No columns to be indexed, skip creating inverted index, region_id: {}, file_id: {}",
259                self.metadata.region_id, file_id,
260            );
261            return None;
262        }
263
264        let Some(mut segment_row_count) =
265            NonZeroUsize::new(self.index_options.inverted_index.segment_row_count)
266        else {
267            warn!(
268                "Segment row count is 0, skip creating index, region_id: {}, file_id: {}",
269                self.metadata.region_id, file_id,
270            );
271            return None;
272        };
273
274        let Some(row_group_size) = NonZeroUsize::new(self.row_group_size) else {
275            warn!(
276                "Row group size is 0, skip creating index, region_id: {}, file_id: {}",
277                self.metadata.region_id, file_id,
278            );
279            return None;
280        };
281
282        // if segment row count not aligned with row group size, adjust it to be aligned.
283        if row_group_size.get() % segment_row_count.get() != 0 {
284            segment_row_count = row_group_size;
285        }
286
287        let indexer = InvertedIndexer::new(
288            file_id,
289            &self.metadata,
290            self.intermediate_manager.clone(),
291            self.inverted_index_config.mem_threshold_on_create(),
292            segment_row_count,
293            indexed_column_ids,
294        );
295
296        Some(indexer)
297    }
298
299    async fn build_fulltext_indexer(&self, file_id: FileId) -> Option<FulltextIndexer> {
300        let create = match self.build_type {
301            IndexBuildType::Flush => self.fulltext_index_config.create_on_flush.auto(),
302            IndexBuildType::Compact => self.fulltext_index_config.create_on_compaction.auto(),
303            _ => true,
304        };
305
306        if !create {
307            debug!(
308                "Skip creating full-text index due to config, region_id: {}, file_id: {}",
309                self.metadata.region_id, file_id,
310            );
311            return None;
312        }
313
314        let mem_limit = self.fulltext_index_config.mem_threshold_on_create();
315        let creator = FulltextIndexer::new(
316            &self.metadata.region_id,
317            &file_id,
318            &self.intermediate_manager,
319            &self.metadata,
320            self.fulltext_index_config.compress,
321            mem_limit,
322        )
323        .await;
324
325        let err = match creator {
326            Ok(creator) => {
327                if creator.is_none() {
328                    debug!(
329                        "Skip creating full-text index due to no columns require indexing, region_id: {}, file_id: {}",
330                        self.metadata.region_id, file_id,
331                    );
332                }
333                return creator;
334            }
335            Err(err) => err,
336        };
337
338        if cfg!(any(test, feature = "test")) {
339            panic!(
340                "Failed to create full-text indexer, region_id: {}, file_id: {}, err: {:?}",
341                self.metadata.region_id, file_id, err
342            );
343        } else {
344            warn!(
345                err; "Failed to create full-text indexer, region_id: {}, file_id: {}",
346                self.metadata.region_id, file_id,
347            );
348        }
349
350        None
351    }
352
353    fn build_bloom_filter_indexer(&self, file_id: FileId) -> Option<BloomFilterIndexer> {
354        let create = match self.build_type {
355            IndexBuildType::Flush => self.bloom_filter_index_config.create_on_flush.auto(),
356            IndexBuildType::Compact => self.bloom_filter_index_config.create_on_compaction.auto(),
357            _ => true,
358        };
359
360        if !create {
361            debug!(
362                "Skip creating bloom filter due to config, region_id: {}, file_id: {}",
363                self.metadata.region_id, file_id,
364            );
365            return None;
366        }
367
368        let mem_limit = self.bloom_filter_index_config.mem_threshold_on_create();
369        let indexer = BloomFilterIndexer::new(
370            file_id,
371            &self.metadata,
372            self.intermediate_manager.clone(),
373            mem_limit,
374        );
375
376        let err = match indexer {
377            Ok(indexer) => {
378                if indexer.is_none() {
379                    debug!(
380                        "Skip creating bloom filter due to no columns require indexing, region_id: {}, file_id: {}",
381                        self.metadata.region_id, file_id,
382                    );
383                }
384                return indexer;
385            }
386            Err(err) => err,
387        };
388
389        if cfg!(any(test, feature = "test")) {
390            panic!(
391                "Failed to create bloom filter, region_id: {}, file_id: {}, err: {:?}",
392                self.metadata.region_id, file_id, err
393            );
394        } else {
395            warn!(
396                err; "Failed to create bloom filter, region_id: {}, file_id: {}",
397                self.metadata.region_id, file_id,
398            );
399        }
400
401        None
402    }
403}
404
405/// Type of an index build task.
406#[derive(Debug, Clone, PartialEq, IntoStaticStr)]
407pub enum IndexBuildType {
408    /// Build index when schema change.
409    SchemaChange,
410    /// Create or update index after flush.
411    Flush,
412    /// Create or update index after compact.
413    Compact,
414    /// Manually build index.
415    Manual,
416}
417
418impl IndexBuildType {
419    fn as_str(&self) -> &'static str {
420        self.into()
421    }
422}
423
424impl From<OperationType> for IndexBuildType {
425    fn from(op_type: OperationType) -> Self {
426        match op_type {
427            OperationType::Flush => IndexBuildType::Flush,
428            OperationType::Compact => IndexBuildType::Compact,
429        }
430    }
431}
432
433/// Outcome of an index build task.
434#[derive(Debug, Clone, PartialEq, Eq, Hash)]
435pub enum IndexBuildOutcome {
436    Finished,
437    Aborted(String),
438}
439
440pub struct IndexBuildTask {
441    /// The file meta to build index for.
442    pub file_meta: FileMeta,
443    pub reason: IndexBuildType,
444    pub access_layer: AccessLayerRef,
445    pub(crate) manifest_ctx: ManifestContextRef,
446    pub write_cache: Option<WriteCacheRef>,
447    pub file_purger: FilePurgerRef,
448    /// When write cache is enabled, the indexer builder should be built from the write cache.
449    /// Otherwise, it should be built from the access layer.
450    pub indexer_builder: Arc<dyn IndexerBuilder + Send + Sync>,
451    /// Request sender to notify the region worker.
452    pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
453    /// Optional sender to send the result back to the caller.
454    pub result_sender: Option<oneshot::Sender<IndexBuildOutcome>>,
455}
456
457impl IndexBuildTask {
458    fn into_index_build_job(mut self, version_control: &VersionControlRef) -> Job {
459        let version_data = version_control.current();
460
461        Box::pin(async move {
462            self.do_index_build(version_data).await;
463        })
464    }
465
466    async fn do_index_build(&mut self, version_data: VersionControlData) {
467        let outcome = match self.index_build(&version_data).await {
468            Ok(outcome) => outcome,
469            Err(e) => {
470                warn!(
471                    e; "Index build task failed, region: {}, file_id: {}",
472                    self.file_meta.region_id, self.file_meta.file_id,
473                );
474                IndexBuildOutcome::Aborted(format!("Index build failed: {}", e))
475            }
476        };
477        if let Some(sender) = self.result_sender.take() {
478            let _ = sender.send(outcome);
479        }
480    }
481
482    // Checks if the SST file still exists in object store and version to avoid conflict with compaction.
483    async fn check_sst_file_exists(&self, version: &VersionRef) -> bool {
484        let region_id = self.file_meta.region_id;
485        let file_id = self.file_meta.file_id;
486
487        let found_in_version = version
488            .ssts
489            .levels()
490            .iter()
491            .flat_map(|level| level.files.iter())
492            .any(|(id, handle)| {
493                *id == self.file_meta.file_id && !handle.is_deleted() && !handle.compacting()
494            });
495        if !found_in_version {
496            warn!(
497                "File id {} not found in region version for index build, region: {}",
498                file_id, region_id
499            );
500            false
501        } else {
502            // If the file's metadata is present in the current version, the physical SST file
503            // is guaranteed to exist on object store. The file purger removes the physical
504            // file only after its metadata is removed from the version.
505            true
506        }
507    }
508
509    async fn index_build(
510        &mut self,
511        version_data: &VersionControlData,
512    ) -> Result<IndexBuildOutcome> {
513        let version = &version_data.version;
514        let mut indexer = self.indexer_builder.build(self.file_meta.file_id).await;
515        let mut parquet_reader = self
516            .access_layer
517            .read_sst(FileHandle::new(
518                self.file_meta.clone(),
519                self.file_purger.clone(),
520            ))
521            .build()
522            .await?;
523
524        // TODO(SNC123): optimize index batch
525        loop {
526            match parquet_reader.next_batch().await {
527                Ok(Some(batch)) => {
528                    indexer.update(&mut batch.clone()).await;
529                }
530                Ok(None) => break,
531                Err(e) => {
532                    indexer.abort().await;
533                    return Err(e);
534                }
535            }
536        }
537        let index_output = indexer.finish().await;
538
539        if index_output.file_size > 0 {
540            // Check SST file existence again after building index.
541            if !self.check_sst_file_exists(version).await {
542                // Calls abort to clean up index files.
543                indexer.abort().await;
544                return Ok(IndexBuildOutcome::Aborted(format!(
545                    "SST file not found during index build, region: {}, file_id: {}",
546                    self.file_meta.region_id, self.file_meta.file_id
547                )));
548            }
549
550            // Upload index file if write cache is enabled.
551            self.maybe_upload_index_file(index_output.clone()).await?;
552
553            let worker_request = match self.update_manifest(index_output).await {
554                Ok(edit) => {
555                    let index_build_finished = IndexBuildFinished {
556                        region_id: self.file_meta.region_id,
557                        edit,
558                    };
559                    WorkerRequest::Background {
560                        region_id: self.file_meta.region_id,
561                        notify: BackgroundNotify::IndexBuildFinished(index_build_finished),
562                    }
563                }
564                Err(e) => {
565                    let err = Arc::new(e);
566                    WorkerRequest::Background {
567                        region_id: self.file_meta.region_id,
568                        notify: BackgroundNotify::IndexBuildFailed(IndexBuildFailed { err }),
569                    }
570                }
571            };
572
573            let _ = self
574                .request_sender
575                .send(WorkerRequestWithTime::new(worker_request))
576                .await;
577        }
578        Ok(IndexBuildOutcome::Finished)
579    }
580
581    async fn maybe_upload_index_file(&self, output: IndexOutput) -> Result<()> {
582        if let Some(write_cache) = &self.write_cache {
583            let file_id = self.file_meta.file_id;
584            let region_id = self.file_meta.region_id;
585            let remote_store = self.access_layer.object_store();
586            let mut upload_tracker = UploadTracker::new(region_id);
587            let mut err = None;
588            let puffin_key = IndexKey::new(region_id, file_id, FileType::Puffin);
589            let puffin_path = RegionFilePathFactory::new(
590                self.access_layer.table_dir().to_string(),
591                self.access_layer.path_type(),
592            )
593            .build_index_file_path(RegionFileId::new(region_id, file_id));
594            if let Err(e) = write_cache
595                .upload(puffin_key, &puffin_path, remote_store)
596                .await
597            {
598                err = Some(e);
599            }
600            upload_tracker.push_uploaded_file(puffin_path);
601            if let Some(err) = err {
602                // Cleans index files on failure.
603                upload_tracker
604                    .clean(
605                        &smallvec![SstInfo {
606                            file_id,
607                            index_metadata: output,
608                            ..Default::default()
609                        }],
610                        &write_cache.file_cache(),
611                        remote_store,
612                    )
613                    .await;
614                return Err(err);
615            }
616        } else {
617            debug!("write cache is not available, skip uploading index file");
618        }
619        Ok(())
620    }
621
622    async fn update_manifest(&mut self, output: IndexOutput) -> Result<RegionEdit> {
623        self.file_meta.available_indexes = output.build_available_indexes();
624        self.file_meta.index_file_size = output.file_size;
625        let edit = RegionEdit {
626            files_to_add: vec![self.file_meta.clone()],
627            files_to_remove: vec![],
628            timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
629            flushed_sequence: None,
630            flushed_entry_id: None,
631            committed_sequence: None,
632            compaction_time_window: None,
633        };
634        let version = self
635            .manifest_ctx
636            .update_manifest(
637                RegionLeaderState::Writable,
638                RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())),
639            )
640            .await?;
641        info!(
642            "Successfully update manifest version to {version}, region: {}, reason: {}",
643            self.file_meta.region_id,
644            self.reason.as_str()
645        );
646        Ok(edit)
647    }
648}
649
650#[derive(Clone)]
651pub struct IndexBuildScheduler {
652    scheduler: SchedulerRef,
653}
654
655impl IndexBuildScheduler {
656    pub fn new(scheduler: SchedulerRef) -> Self {
657        IndexBuildScheduler { scheduler }
658    }
659
660    pub(crate) fn schedule_build(
661        &mut self,
662        version_control: &VersionControlRef,
663        task: IndexBuildTask,
664    ) -> Result<()> {
665        let job = task.into_index_build_job(version_control);
666        self.scheduler.schedule(job)?;
667        Ok(())
668    }
669}
670
671#[cfg(test)]
672mod tests {
673    use std::sync::Arc;
674
675    use api::v1::SemanticType;
676    use common_base::readable_size::ReadableSize;
677    use datafusion_common::HashMap;
678    use datatypes::data_type::ConcreteDataType;
679    use datatypes::schema::{
680        ColumnSchema, FulltextOptions, SkippingIndexOptions, SkippingIndexType,
681    };
682    use object_store::ObjectStore;
683    use object_store::services::Memory;
684    use puffin_manager::PuffinManagerFactory;
685    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
686    use tokio::sync::{mpsc, oneshot};
687
688    use super::*;
689    use crate::access_layer::{FilePathProvider, SstWriteRequest, WriteType};
690    use crate::cache::write_cache::WriteCache;
691    use crate::config::{FulltextIndexConfig, IndexBuildMode, MitoConfig, Mode};
692    use crate::memtable::time_partition::TimePartitions;
693    use crate::region::version::{VersionBuilder, VersionControl};
694    use crate::sst::file::RegionFileId;
695    use crate::sst::file_purger::NoopFilePurger;
696    use crate::sst::location;
697    use crate::sst::parquet::WriteOptions;
698    use crate::test_util::memtable_util::EmptyMemtableBuilder;
699    use crate::test_util::scheduler_util::SchedulerEnv;
700    use crate::test_util::sst_util::{new_batch_by_range, new_source, sst_region_metadata};
701
702    struct MetaConfig {
703        with_inverted: bool,
704        with_fulltext: bool,
705        with_skipping_bloom: bool,
706    }
707
708    fn mock_region_metadata(
709        MetaConfig {
710            with_inverted,
711            with_fulltext,
712            with_skipping_bloom,
713        }: MetaConfig,
714    ) -> RegionMetadataRef {
715        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
716        let mut column_schema = ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false);
717        if with_inverted {
718            column_schema = column_schema.with_inverted_index(true);
719        }
720        builder
721            .push_column_metadata(ColumnMetadata {
722                column_schema,
723                semantic_type: SemanticType::Field,
724                column_id: 1,
725            })
726            .push_column_metadata(ColumnMetadata {
727                column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
728                semantic_type: SemanticType::Field,
729                column_id: 2,
730            })
731            .push_column_metadata(ColumnMetadata {
732                column_schema: ColumnSchema::new(
733                    "c",
734                    ConcreteDataType::timestamp_millisecond_datatype(),
735                    false,
736                ),
737                semantic_type: SemanticType::Timestamp,
738                column_id: 3,
739            });
740
741        if with_fulltext {
742            let column_schema =
743                ColumnSchema::new("text", ConcreteDataType::string_datatype(), true)
744                    .with_fulltext_options(FulltextOptions {
745                        enable: true,
746                        ..Default::default()
747                    })
748                    .unwrap();
749
750            let column = ColumnMetadata {
751                column_schema,
752                semantic_type: SemanticType::Field,
753                column_id: 4,
754            };
755
756            builder.push_column_metadata(column);
757        }
758
759        if with_skipping_bloom {
760            let column_schema =
761                ColumnSchema::new("bloom", ConcreteDataType::string_datatype(), false)
762                    .with_skipping_options(SkippingIndexOptions::new_unchecked(
763                        42,
764                        0.01,
765                        SkippingIndexType::BloomFilter,
766                    ))
767                    .unwrap();
768
769            let column = ColumnMetadata {
770                column_schema,
771                semantic_type: SemanticType::Field,
772                column_id: 5,
773            };
774
775            builder.push_column_metadata(column);
776        }
777
778        Arc::new(builder.build().unwrap())
779    }
780
781    fn mock_object_store() -> ObjectStore {
782        ObjectStore::new(Memory::default()).unwrap().finish()
783    }
784
785    async fn mock_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
786        IntermediateManager::init_fs(path).await.unwrap()
787    }
788    struct NoopPathProvider;
789
790    impl FilePathProvider for NoopPathProvider {
791        fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
792            unreachable!()
793        }
794
795        fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
796            unreachable!()
797        }
798    }
799
800    async fn mock_sst_file(
801        metadata: RegionMetadataRef,
802        env: &SchedulerEnv,
803        build_mode: IndexBuildMode,
804    ) -> SstInfo {
805        let source = new_source(&[
806            new_batch_by_range(&["a", "d"], 0, 60),
807            new_batch_by_range(&["b", "f"], 0, 40),
808            new_batch_by_range(&["b", "h"], 100, 200),
809        ]);
810        let mut index_config = MitoConfig::default().index;
811        index_config.build_mode = build_mode;
812        let write_request = SstWriteRequest {
813            op_type: OperationType::Flush,
814            metadata: metadata.clone(),
815            source: either::Left(source),
816            storage: None,
817            max_sequence: None,
818            cache_manager: Default::default(),
819            index_options: IndexOptions::default(),
820            index_config,
821            inverted_index_config: Default::default(),
822            fulltext_index_config: Default::default(),
823            bloom_filter_index_config: Default::default(),
824        };
825        env.access_layer
826            .write_sst(write_request, &WriteOptions::default(), WriteType::Flush)
827            .await
828            .unwrap()
829            .0
830            .remove(0)
831    }
832
833    async fn mock_version_control(
834        metadata: RegionMetadataRef,
835        file_purger: FilePurgerRef,
836        files: HashMap<FileId, FileMeta>,
837    ) -> VersionControlRef {
838        let mutable = Arc::new(TimePartitions::new(
839            metadata.clone(),
840            Arc::new(EmptyMemtableBuilder::default()),
841            0,
842            None,
843        ));
844        let version_builder = VersionBuilder::new(metadata, mutable)
845            .add_files(file_purger, files.values().cloned())
846            .build();
847        Arc::new(VersionControl::new(version_builder))
848    }
849
850    async fn mock_indexer_builder(
851        metadata: RegionMetadataRef,
852        env: &SchedulerEnv,
853    ) -> Arc<dyn IndexerBuilder + Send + Sync> {
854        let (dir, factory) = PuffinManagerFactory::new_for_test_async("mock_indexer_builder").await;
855        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
856        let puffin_manager = factory.build(
857            env.access_layer.object_store().clone(),
858            RegionFilePathFactory::new(
859                env.access_layer.table_dir().to_string(),
860                env.access_layer.path_type(),
861            ),
862        );
863        Arc::new(IndexerBuilderImpl {
864            build_type: IndexBuildType::Flush,
865            metadata,
866            row_group_size: 1024,
867            puffin_manager,
868            intermediate_manager: intm_manager,
869            index_options: IndexOptions::default(),
870            inverted_index_config: InvertedIndexConfig::default(),
871            fulltext_index_config: FulltextIndexConfig::default(),
872            bloom_filter_index_config: BloomFilterConfig::default(),
873        })
874    }
875
876    #[tokio::test]
877    async fn test_build_indexer_basic() {
878        let (dir, factory) =
879            PuffinManagerFactory::new_for_test_async("test_build_indexer_basic_").await;
880        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
881
882        let metadata = mock_region_metadata(MetaConfig {
883            with_inverted: true,
884            with_fulltext: true,
885            with_skipping_bloom: true,
886        });
887        let indexer = IndexerBuilderImpl {
888            build_type: IndexBuildType::Flush,
889            metadata,
890            row_group_size: 1024,
891            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
892            intermediate_manager: intm_manager,
893            index_options: IndexOptions::default(),
894            inverted_index_config: InvertedIndexConfig::default(),
895            fulltext_index_config: FulltextIndexConfig::default(),
896            bloom_filter_index_config: BloomFilterConfig::default(),
897        }
898        .build(FileId::random())
899        .await;
900
901        assert!(indexer.inverted_indexer.is_some());
902        assert!(indexer.fulltext_indexer.is_some());
903        assert!(indexer.bloom_filter_indexer.is_some());
904    }
905
906    #[tokio::test]
907    async fn test_build_indexer_disable_create() {
908        let (dir, factory) =
909            PuffinManagerFactory::new_for_test_async("test_build_indexer_disable_create_").await;
910        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
911
912        let metadata = mock_region_metadata(MetaConfig {
913            with_inverted: true,
914            with_fulltext: true,
915            with_skipping_bloom: true,
916        });
917        let indexer = IndexerBuilderImpl {
918            build_type: IndexBuildType::Flush,
919            metadata: metadata.clone(),
920            row_group_size: 1024,
921            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
922            intermediate_manager: intm_manager.clone(),
923            index_options: IndexOptions::default(),
924            inverted_index_config: InvertedIndexConfig {
925                create_on_flush: Mode::Disable,
926                ..Default::default()
927            },
928            fulltext_index_config: FulltextIndexConfig::default(),
929            bloom_filter_index_config: BloomFilterConfig::default(),
930        }
931        .build(FileId::random())
932        .await;
933
934        assert!(indexer.inverted_indexer.is_none());
935        assert!(indexer.fulltext_indexer.is_some());
936        assert!(indexer.bloom_filter_indexer.is_some());
937
938        let indexer = IndexerBuilderImpl {
939            build_type: IndexBuildType::Compact,
940            metadata: metadata.clone(),
941            row_group_size: 1024,
942            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
943            intermediate_manager: intm_manager.clone(),
944            index_options: IndexOptions::default(),
945            inverted_index_config: InvertedIndexConfig::default(),
946            fulltext_index_config: FulltextIndexConfig {
947                create_on_compaction: Mode::Disable,
948                ..Default::default()
949            },
950            bloom_filter_index_config: BloomFilterConfig::default(),
951        }
952        .build(FileId::random())
953        .await;
954
955        assert!(indexer.inverted_indexer.is_some());
956        assert!(indexer.fulltext_indexer.is_none());
957        assert!(indexer.bloom_filter_indexer.is_some());
958
959        let indexer = IndexerBuilderImpl {
960            build_type: IndexBuildType::Compact,
961            metadata,
962            row_group_size: 1024,
963            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
964            intermediate_manager: intm_manager,
965            index_options: IndexOptions::default(),
966            inverted_index_config: InvertedIndexConfig::default(),
967            fulltext_index_config: FulltextIndexConfig::default(),
968            bloom_filter_index_config: BloomFilterConfig {
969                create_on_compaction: Mode::Disable,
970                ..Default::default()
971            },
972        }
973        .build(FileId::random())
974        .await;
975
976        assert!(indexer.inverted_indexer.is_some());
977        assert!(indexer.fulltext_indexer.is_some());
978        assert!(indexer.bloom_filter_indexer.is_none());
979    }
980
981    #[tokio::test]
982    async fn test_build_indexer_no_required() {
983        let (dir, factory) =
984            PuffinManagerFactory::new_for_test_async("test_build_indexer_no_required_").await;
985        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
986
987        let metadata = mock_region_metadata(MetaConfig {
988            with_inverted: false,
989            with_fulltext: true,
990            with_skipping_bloom: true,
991        });
992        let indexer = IndexerBuilderImpl {
993            build_type: IndexBuildType::Flush,
994            metadata: metadata.clone(),
995            row_group_size: 1024,
996            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
997            intermediate_manager: intm_manager.clone(),
998            index_options: IndexOptions::default(),
999            inverted_index_config: InvertedIndexConfig::default(),
1000            fulltext_index_config: FulltextIndexConfig::default(),
1001            bloom_filter_index_config: BloomFilterConfig::default(),
1002        }
1003        .build(FileId::random())
1004        .await;
1005
1006        assert!(indexer.inverted_indexer.is_none());
1007        assert!(indexer.fulltext_indexer.is_some());
1008        assert!(indexer.bloom_filter_indexer.is_some());
1009
1010        let metadata = mock_region_metadata(MetaConfig {
1011            with_inverted: true,
1012            with_fulltext: false,
1013            with_skipping_bloom: true,
1014        });
1015        let indexer = IndexerBuilderImpl {
1016            build_type: IndexBuildType::Flush,
1017            metadata: metadata.clone(),
1018            row_group_size: 1024,
1019            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1020            intermediate_manager: intm_manager.clone(),
1021            index_options: IndexOptions::default(),
1022            inverted_index_config: InvertedIndexConfig::default(),
1023            fulltext_index_config: FulltextIndexConfig::default(),
1024            bloom_filter_index_config: BloomFilterConfig::default(),
1025        }
1026        .build(FileId::random())
1027        .await;
1028
1029        assert!(indexer.inverted_indexer.is_some());
1030        assert!(indexer.fulltext_indexer.is_none());
1031        assert!(indexer.bloom_filter_indexer.is_some());
1032
1033        let metadata = mock_region_metadata(MetaConfig {
1034            with_inverted: true,
1035            with_fulltext: true,
1036            with_skipping_bloom: false,
1037        });
1038        let indexer = IndexerBuilderImpl {
1039            build_type: IndexBuildType::Flush,
1040            metadata: metadata.clone(),
1041            row_group_size: 1024,
1042            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1043            intermediate_manager: intm_manager,
1044            index_options: IndexOptions::default(),
1045            inverted_index_config: InvertedIndexConfig::default(),
1046            fulltext_index_config: FulltextIndexConfig::default(),
1047            bloom_filter_index_config: BloomFilterConfig::default(),
1048        }
1049        .build(FileId::random())
1050        .await;
1051
1052        assert!(indexer.inverted_indexer.is_some());
1053        assert!(indexer.fulltext_indexer.is_some());
1054        assert!(indexer.bloom_filter_indexer.is_none());
1055    }
1056
1057    #[tokio::test]
1058    async fn test_build_indexer_zero_row_group() {
1059        let (dir, factory) =
1060            PuffinManagerFactory::new_for_test_async("test_build_indexer_zero_row_group_").await;
1061        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1062
1063        let metadata = mock_region_metadata(MetaConfig {
1064            with_inverted: true,
1065            with_fulltext: true,
1066            with_skipping_bloom: true,
1067        });
1068        let indexer = IndexerBuilderImpl {
1069            build_type: IndexBuildType::Flush,
1070            metadata,
1071            row_group_size: 0,
1072            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1073            intermediate_manager: intm_manager,
1074            index_options: IndexOptions::default(),
1075            inverted_index_config: InvertedIndexConfig::default(),
1076            fulltext_index_config: FulltextIndexConfig::default(),
1077            bloom_filter_index_config: BloomFilterConfig::default(),
1078        }
1079        .build(FileId::random())
1080        .await;
1081
1082        assert!(indexer.inverted_indexer.is_none());
1083    }
1084
1085    #[tokio::test]
1086    async fn test_index_build_task_sst_not_exist() {
1087        let env = SchedulerEnv::new().await;
1088        let (tx, _rx) = mpsc::channel(4);
1089        let (result_tx, result_rx) = oneshot::channel::<IndexBuildOutcome>();
1090        let mut scheduler = env.mock_index_build_scheduler();
1091        let metadata = Arc::new(sst_region_metadata());
1092        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1093        let file_purger = Arc::new(NoopFilePurger {});
1094        let files = HashMap::new();
1095        let version_control =
1096            mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1097        let region_id = metadata.region_id;
1098        let indexer_builder = mock_indexer_builder(metadata, &env).await;
1099
1100        // Create mock task.
1101        let task = IndexBuildTask {
1102            file_meta: FileMeta {
1103                region_id,
1104                file_id: FileId::random(),
1105                file_size: 100,
1106                ..Default::default()
1107            },
1108            reason: IndexBuildType::Flush,
1109            access_layer: env.access_layer.clone(),
1110            manifest_ctx,
1111            write_cache: None,
1112            file_purger,
1113            indexer_builder,
1114            request_sender: tx,
1115            result_sender: Some(result_tx),
1116        };
1117
1118        // Schedule the build task and check result.
1119        scheduler.schedule_build(&version_control, task).unwrap();
1120        match result_rx.await.unwrap() {
1121            IndexBuildOutcome::Aborted(_) => {}
1122            _ => panic!("Expect aborted result due to missing SST file"),
1123        }
1124    }
1125
1126    #[tokio::test]
1127    async fn test_index_build_task_sst_exist() {
1128        let env = SchedulerEnv::new().await;
1129        let mut scheduler = env.mock_index_build_scheduler();
1130        let metadata = Arc::new(sst_region_metadata());
1131        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1132        let region_id = metadata.region_id;
1133        let file_purger = Arc::new(NoopFilePurger {});
1134        let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
1135        let file_meta = FileMeta {
1136            region_id,
1137            file_id: sst_info.file_id,
1138            file_size: sst_info.file_size,
1139            index_file_size: sst_info.index_metadata.file_size,
1140            num_rows: sst_info.num_rows as u64,
1141            num_row_groups: sst_info.num_row_groups,
1142            ..Default::default()
1143        };
1144        let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1145        let version_control =
1146            mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1147        let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1148
1149        // Create mock task.
1150        let (tx, mut rx) = mpsc::channel(4);
1151        let (result_tx, result_rx) = oneshot::channel::<IndexBuildOutcome>();
1152        let task = IndexBuildTask {
1153            file_meta: file_meta.clone(),
1154            reason: IndexBuildType::Flush,
1155            access_layer: env.access_layer.clone(),
1156            manifest_ctx,
1157            write_cache: None,
1158            file_purger,
1159            indexer_builder,
1160            request_sender: tx,
1161            result_sender: Some(result_tx),
1162        };
1163
1164        scheduler.schedule_build(&version_control, task).unwrap();
1165
1166        // The task should finish successfully.
1167        assert_eq!(result_rx.await.unwrap(), IndexBuildOutcome::Finished);
1168
1169        // A notification should be sent to the worker to update the manifest.
1170        let worker_req = rx.recv().await.unwrap().request;
1171        match worker_req {
1172            WorkerRequest::Background {
1173                region_id: req_region_id,
1174                notify: BackgroundNotify::IndexBuildFinished(finished),
1175            } => {
1176                assert_eq!(req_region_id, region_id);
1177                assert_eq!(finished.edit.files_to_add.len(), 1);
1178                let updated_meta = &finished.edit.files_to_add[0];
1179
1180                // The mock indexer builder creates all index types.
1181                assert!(!updated_meta.available_indexes.is_empty());
1182                assert!(updated_meta.index_file_size > 0);
1183                assert_eq!(updated_meta.file_id, file_meta.file_id);
1184            }
1185            _ => panic!("Unexpected worker request: {:?}", worker_req),
1186        }
1187    }
1188
1189    async fn schedule_index_build_task_with_mode(build_mode: IndexBuildMode) {
1190        let env = SchedulerEnv::new().await;
1191        let mut scheduler = env.mock_index_build_scheduler();
1192        let metadata = Arc::new(sst_region_metadata());
1193        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1194        let file_purger = Arc::new(NoopFilePurger {});
1195        let region_id = metadata.region_id;
1196        let sst_info = mock_sst_file(metadata.clone(), &env, build_mode.clone()).await;
1197        let file_meta = FileMeta {
1198            region_id,
1199            file_id: sst_info.file_id,
1200            file_size: sst_info.file_size,
1201            index_file_size: sst_info.index_metadata.file_size,
1202            num_rows: sst_info.num_rows as u64,
1203            num_row_groups: sst_info.num_row_groups,
1204            ..Default::default()
1205        };
1206        let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1207        let version_control =
1208            mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1209        let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1210
1211        // Create mock task.
1212        let (tx, _rx) = mpsc::channel(4);
1213        let (result_tx, result_rx) = oneshot::channel::<IndexBuildOutcome>();
1214        let task = IndexBuildTask {
1215            file_meta: file_meta.clone(),
1216            reason: IndexBuildType::Flush,
1217            access_layer: env.access_layer.clone(),
1218            manifest_ctx,
1219            write_cache: None,
1220            file_purger,
1221            indexer_builder,
1222            request_sender: tx,
1223            result_sender: Some(result_tx),
1224        };
1225
1226        scheduler.schedule_build(&version_control, task).unwrap();
1227
1228        let puffin_path = location::index_file_path(
1229            env.access_layer.table_dir(),
1230            RegionFileId::new(region_id, file_meta.file_id),
1231            env.access_layer.path_type(),
1232        );
1233
1234        if build_mode == IndexBuildMode::Async {
1235            // The index file should not exist before the task finishes.
1236            assert!(
1237                !env.access_layer
1238                    .object_store()
1239                    .exists(&puffin_path)
1240                    .await
1241                    .unwrap()
1242            );
1243        } else {
1244            // The index file should exist before the task finishes.
1245            assert!(
1246                env.access_layer
1247                    .object_store()
1248                    .exists(&puffin_path)
1249                    .await
1250                    .unwrap()
1251            );
1252        }
1253
1254        // The task should finish successfully.
1255        assert_eq!(result_rx.await.unwrap(), IndexBuildOutcome::Finished);
1256
1257        // The index file should exist after the task finishes.
1258        assert!(
1259            env.access_layer
1260                .object_store()
1261                .exists(&puffin_path)
1262                .await
1263                .unwrap()
1264        );
1265    }
1266
1267    #[tokio::test]
1268    async fn test_index_build_task_build_mode() {
1269        schedule_index_build_task_with_mode(IndexBuildMode::Async).await;
1270        schedule_index_build_task_with_mode(IndexBuildMode::Sync).await;
1271    }
1272
1273    #[tokio::test]
1274    async fn test_index_build_task_no_index() {
1275        let env = SchedulerEnv::new().await;
1276        let mut scheduler = env.mock_index_build_scheduler();
1277        let mut metadata = sst_region_metadata();
1278        // Unset indexes in metadata to simulate no index scenario.
1279        metadata.column_metadatas.iter_mut().for_each(|col| {
1280            col.column_schema.set_inverted_index(false);
1281            let _ = col.column_schema.unset_skipping_options();
1282        });
1283        let region_id = metadata.region_id;
1284        let metadata = Arc::new(metadata);
1285        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1286        let file_purger = Arc::new(NoopFilePurger {});
1287        let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
1288        let file_meta = FileMeta {
1289            region_id,
1290            file_id: sst_info.file_id,
1291            file_size: sst_info.file_size,
1292            index_file_size: sst_info.index_metadata.file_size,
1293            num_rows: sst_info.num_rows as u64,
1294            num_row_groups: sst_info.num_row_groups,
1295            ..Default::default()
1296        };
1297        let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1298        let version_control =
1299            mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1300        let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1301
1302        // Create mock task.
1303        let (tx, mut rx) = mpsc::channel(4);
1304        let (result_tx, result_rx) = oneshot::channel::<IndexBuildOutcome>();
1305        let task = IndexBuildTask {
1306            file_meta: file_meta.clone(),
1307            reason: IndexBuildType::Flush,
1308            access_layer: env.access_layer.clone(),
1309            manifest_ctx,
1310            write_cache: None,
1311            file_purger,
1312            indexer_builder,
1313            request_sender: tx,
1314            result_sender: Some(result_tx),
1315        };
1316
1317        scheduler.schedule_build(&version_control, task).unwrap();
1318
1319        // The task should finish successfully.
1320        assert_eq!(result_rx.await.unwrap(), IndexBuildOutcome::Finished);
1321
1322        // No index is built, so no notification should be sent to the worker.
1323        let _ = rx.recv().await.is_none();
1324    }
1325
1326    #[tokio::test]
1327    async fn test_index_build_task_with_write_cache() {
1328        let env = SchedulerEnv::new().await;
1329        let mut scheduler = env.mock_index_build_scheduler();
1330        let metadata = Arc::new(sst_region_metadata());
1331        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1332        let file_purger = Arc::new(NoopFilePurger {});
1333        let region_id = metadata.region_id;
1334
1335        let (dir, factory) = PuffinManagerFactory::new_for_test_async("test_write_cache").await;
1336        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1337
1338        // Create mock write cache
1339        let write_cache = Arc::new(
1340            WriteCache::new_fs(
1341                dir.path().to_str().unwrap(),
1342                ReadableSize::mb(10),
1343                None,
1344                factory,
1345                intm_manager,
1346            )
1347            .await
1348            .unwrap(),
1349        );
1350        // Indexer builder built from write cache.
1351        let indexer_builder = Arc::new(IndexerBuilderImpl {
1352            build_type: IndexBuildType::Flush,
1353            metadata: metadata.clone(),
1354            row_group_size: 1024,
1355            puffin_manager: write_cache.build_puffin_manager().clone(),
1356            intermediate_manager: write_cache.intermediate_manager().clone(),
1357            index_options: IndexOptions::default(),
1358            inverted_index_config: InvertedIndexConfig::default(),
1359            fulltext_index_config: FulltextIndexConfig::default(),
1360            bloom_filter_index_config: BloomFilterConfig::default(),
1361        });
1362
1363        let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
1364        let file_meta = FileMeta {
1365            region_id,
1366            file_id: sst_info.file_id,
1367            file_size: sst_info.file_size,
1368            index_file_size: sst_info.index_metadata.file_size,
1369            num_rows: sst_info.num_rows as u64,
1370            num_row_groups: sst_info.num_row_groups,
1371            ..Default::default()
1372        };
1373        let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1374        let version_control =
1375            mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1376
1377        // Create mock task.
1378        let (tx, mut _rx) = mpsc::channel(4);
1379        let (result_tx, result_rx) = oneshot::channel::<IndexBuildOutcome>();
1380        let task = IndexBuildTask {
1381            file_meta: file_meta.clone(),
1382            reason: IndexBuildType::Flush,
1383            access_layer: env.access_layer.clone(),
1384            manifest_ctx,
1385            write_cache: Some(write_cache.clone()),
1386            file_purger,
1387            indexer_builder,
1388            request_sender: tx,
1389            result_sender: Some(result_tx),
1390        };
1391
1392        scheduler.schedule_build(&version_control, task).unwrap();
1393
1394        // The task should finish successfully.
1395        assert_eq!(result_rx.await.unwrap(), IndexBuildOutcome::Finished);
1396
1397        // The write cache should contain the uploaded index file.
1398        let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Puffin);
1399        assert!(write_cache.file_cache().contains_key(&index_key));
1400    }
1401}