mito2/sst/
index.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod bloom_filter;
16pub(crate) mod fulltext_index;
17mod indexer;
18pub mod intermediate;
19pub(crate) mod inverted_index;
20pub mod puffin_manager;
21mod statistics;
22pub(crate) mod store;
23
24use std::num::NonZeroUsize;
25use std::sync::Arc;
26
27use bloom_filter::creator::BloomFilterIndexer;
28use common_telemetry::{debug, info, warn};
29use datatypes::arrow::array::BinaryArray;
30use datatypes::arrow::record_batch::RecordBatch;
31use mito_codec::index::IndexValuesCodec;
32use mito_codec::row_converter::CompositeValues;
33use puffin_manager::SstPuffinManager;
34use smallvec::{SmallVec, smallvec};
35use snafu::{OptionExt, ResultExt};
36use statistics::{ByteCount, RowCount};
37use store_api::metadata::RegionMetadataRef;
38use store_api::storage::{ColumnId, FileId, RegionId};
39use strum::IntoStaticStr;
40use tokio::sync::mpsc::Sender;
41
42use crate::access_layer::{AccessLayerRef, FilePathProvider, OperationType, RegionFilePathFactory};
43use crate::cache::file_cache::{FileType, IndexKey};
44use crate::cache::write_cache::{UploadTracker, WriteCacheRef};
45use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
46use crate::error::{BuildIndexAsyncSnafu, DecodeSnafu, Error, InvalidRecordBatchSnafu, Result};
47use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
48use crate::metrics::INDEX_CREATE_MEMORY_USAGE;
49use crate::read::{Batch, BatchReader};
50use crate::region::options::IndexOptions;
51use crate::region::version::VersionControlRef;
52use crate::region::{ManifestContextRef, RegionLeaderState};
53use crate::request::{
54    BackgroundNotify, IndexBuildFailed, IndexBuildFinished, WorkerRequest, WorkerRequestWithTime,
55};
56use crate::schedule::scheduler::{Job, SchedulerRef};
57use crate::sst::file::{FileHandle, FileMeta, IndexType, RegionFileId};
58use crate::sst::file_purger::FilePurgerRef;
59use crate::sst::index::fulltext_index::creator::FulltextIndexer;
60use crate::sst::index::intermediate::IntermediateManager;
61use crate::sst::index::inverted_index::creator::InvertedIndexer;
62use crate::sst::parquet::SstInfo;
63use crate::sst::parquet::flat_format::primary_key_column_index;
64use crate::sst::parquet::format::PrimaryKeyArray;
65use crate::worker::WorkerListener;
66
67pub(crate) const TYPE_INVERTED_INDEX: &str = "inverted_index";
68pub(crate) const TYPE_FULLTEXT_INDEX: &str = "fulltext_index";
69pub(crate) const TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
70
71/// Output of the index creation.
72#[derive(Debug, Clone, Default)]
73pub struct IndexOutput {
74    /// Size of the file.
75    pub file_size: u64,
76    /// Inverted index output.
77    pub inverted_index: InvertedIndexOutput,
78    /// Fulltext index output.
79    pub fulltext_index: FulltextIndexOutput,
80    /// Bloom filter output.
81    pub bloom_filter: BloomFilterOutput,
82}
83
84impl IndexOutput {
85    pub fn build_available_indexes(&self) -> SmallVec<[IndexType; 4]> {
86        let mut indexes = SmallVec::new();
87        if self.inverted_index.is_available() {
88            indexes.push(IndexType::InvertedIndex);
89        }
90        if self.fulltext_index.is_available() {
91            indexes.push(IndexType::FulltextIndex);
92        }
93        if self.bloom_filter.is_available() {
94            indexes.push(IndexType::BloomFilterIndex);
95        }
96        indexes
97    }
98}
99
100/// Base output of the index creation.
101#[derive(Debug, Clone, Default)]
102pub struct IndexBaseOutput {
103    /// Size of the index.
104    pub index_size: ByteCount,
105    /// Number of rows in the index.
106    pub row_count: RowCount,
107    /// Available columns in the index.
108    pub columns: Vec<ColumnId>,
109}
110
111impl IndexBaseOutput {
112    pub fn is_available(&self) -> bool {
113        self.index_size > 0
114    }
115}
116
117/// Output of the inverted index creation.
118pub type InvertedIndexOutput = IndexBaseOutput;
119/// Output of the fulltext index creation.
120pub type FulltextIndexOutput = IndexBaseOutput;
121/// Output of the bloom filter creation.
122pub type BloomFilterOutput = IndexBaseOutput;
123
124/// The index creator that hides the error handling details.
125#[derive(Default)]
126pub struct Indexer {
127    file_id: FileId,
128    region_id: RegionId,
129    puffin_manager: Option<SstPuffinManager>,
130    inverted_indexer: Option<InvertedIndexer>,
131    last_mem_inverted_index: usize,
132    fulltext_indexer: Option<FulltextIndexer>,
133    last_mem_fulltext_index: usize,
134    bloom_filter_indexer: Option<BloomFilterIndexer>,
135    last_mem_bloom_filter: usize,
136    intermediate_manager: Option<IntermediateManager>,
137}
138
139impl Indexer {
140    /// Updates the index with the given batch.
141    pub async fn update(&mut self, batch: &mut Batch) {
142        self.do_update(batch).await;
143
144        self.flush_mem_metrics();
145    }
146
147    /// Updates the index with the given flat format RecordBatch.
148    pub async fn update_flat(&mut self, batch: &RecordBatch) {
149        self.do_update_flat(batch).await;
150
151        self.flush_mem_metrics();
152    }
153
154    /// Finalizes the index creation.
155    pub async fn finish(&mut self) -> IndexOutput {
156        let output = self.do_finish().await;
157
158        self.flush_mem_metrics();
159        output
160    }
161
162    /// Aborts the index creation.
163    pub async fn abort(&mut self) {
164        self.do_abort().await;
165
166        self.flush_mem_metrics();
167    }
168
169    fn flush_mem_metrics(&mut self) {
170        let inverted_mem = self
171            .inverted_indexer
172            .as_ref()
173            .map_or(0, |creator| creator.memory_usage());
174        INDEX_CREATE_MEMORY_USAGE
175            .with_label_values(&[TYPE_INVERTED_INDEX])
176            .add(inverted_mem as i64 - self.last_mem_inverted_index as i64);
177        self.last_mem_inverted_index = inverted_mem;
178
179        let fulltext_mem = self
180            .fulltext_indexer
181            .as_ref()
182            .map_or(0, |creator| creator.memory_usage());
183        INDEX_CREATE_MEMORY_USAGE
184            .with_label_values(&[TYPE_FULLTEXT_INDEX])
185            .add(fulltext_mem as i64 - self.last_mem_fulltext_index as i64);
186        self.last_mem_fulltext_index = fulltext_mem;
187
188        let bloom_filter_mem = self
189            .bloom_filter_indexer
190            .as_ref()
191            .map_or(0, |creator| creator.memory_usage());
192        INDEX_CREATE_MEMORY_USAGE
193            .with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
194            .add(bloom_filter_mem as i64 - self.last_mem_bloom_filter as i64);
195        self.last_mem_bloom_filter = bloom_filter_mem;
196    }
197}
198
199#[async_trait::async_trait]
200pub trait IndexerBuilder {
201    /// Builds indexer of given file id to [index_file_path].
202    async fn build(&self, file_id: FileId) -> Indexer;
203}
204#[derive(Clone)]
205pub(crate) struct IndexerBuilderImpl {
206    pub(crate) build_type: IndexBuildType,
207    pub(crate) metadata: RegionMetadataRef,
208    pub(crate) row_group_size: usize,
209    pub(crate) puffin_manager: SstPuffinManager,
210    pub(crate) intermediate_manager: IntermediateManager,
211    pub(crate) index_options: IndexOptions,
212    pub(crate) inverted_index_config: InvertedIndexConfig,
213    pub(crate) fulltext_index_config: FulltextIndexConfig,
214    pub(crate) bloom_filter_index_config: BloomFilterConfig,
215}
216
217#[async_trait::async_trait]
218impl IndexerBuilder for IndexerBuilderImpl {
219    /// Sanity check for arguments and create a new [Indexer] if arguments are valid.
220    async fn build(&self, file_id: FileId) -> Indexer {
221        let mut indexer = Indexer {
222            file_id,
223            region_id: self.metadata.region_id,
224            ..Default::default()
225        };
226
227        indexer.inverted_indexer = self.build_inverted_indexer(file_id);
228        indexer.fulltext_indexer = self.build_fulltext_indexer(file_id).await;
229        indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(file_id);
230        indexer.intermediate_manager = Some(self.intermediate_manager.clone());
231        if indexer.inverted_indexer.is_none()
232            && indexer.fulltext_indexer.is_none()
233            && indexer.bloom_filter_indexer.is_none()
234        {
235            indexer.abort().await;
236            return Indexer::default();
237        }
238
239        indexer.puffin_manager = Some(self.puffin_manager.clone());
240        indexer
241    }
242}
243
244impl IndexerBuilderImpl {
245    fn build_inverted_indexer(&self, file_id: FileId) -> Option<InvertedIndexer> {
246        let create = match self.build_type {
247            IndexBuildType::Flush => self.inverted_index_config.create_on_flush.auto(),
248            IndexBuildType::Compact => self.inverted_index_config.create_on_compaction.auto(),
249            _ => true,
250        };
251
252        if !create {
253            debug!(
254                "Skip creating inverted index due to config, region_id: {}, file_id: {}",
255                self.metadata.region_id, file_id,
256            );
257            return None;
258        }
259
260        let indexed_column_ids = self.metadata.inverted_indexed_column_ids(
261            self.index_options.inverted_index.ignore_column_ids.iter(),
262        );
263        if indexed_column_ids.is_empty() {
264            debug!(
265                "No columns to be indexed, skip creating inverted index, region_id: {}, file_id: {}",
266                self.metadata.region_id, file_id,
267            );
268            return None;
269        }
270
271        let Some(mut segment_row_count) =
272            NonZeroUsize::new(self.index_options.inverted_index.segment_row_count)
273        else {
274            warn!(
275                "Segment row count is 0, skip creating index, region_id: {}, file_id: {}",
276                self.metadata.region_id, file_id,
277            );
278            return None;
279        };
280
281        let Some(row_group_size) = NonZeroUsize::new(self.row_group_size) else {
282            warn!(
283                "Row group size is 0, skip creating index, region_id: {}, file_id: {}",
284                self.metadata.region_id, file_id,
285            );
286            return None;
287        };
288
289        // if segment row count not aligned with row group size, adjust it to be aligned.
290        if row_group_size.get() % segment_row_count.get() != 0 {
291            segment_row_count = row_group_size;
292        }
293
294        let indexer = InvertedIndexer::new(
295            file_id,
296            &self.metadata,
297            self.intermediate_manager.clone(),
298            self.inverted_index_config.mem_threshold_on_create(),
299            segment_row_count,
300            indexed_column_ids,
301        );
302
303        Some(indexer)
304    }
305
306    async fn build_fulltext_indexer(&self, file_id: FileId) -> Option<FulltextIndexer> {
307        let create = match self.build_type {
308            IndexBuildType::Flush => self.fulltext_index_config.create_on_flush.auto(),
309            IndexBuildType::Compact => self.fulltext_index_config.create_on_compaction.auto(),
310            _ => true,
311        };
312
313        if !create {
314            debug!(
315                "Skip creating full-text index due to config, region_id: {}, file_id: {}",
316                self.metadata.region_id, file_id,
317            );
318            return None;
319        }
320
321        let mem_limit = self.fulltext_index_config.mem_threshold_on_create();
322        let creator = FulltextIndexer::new(
323            &self.metadata.region_id,
324            &file_id,
325            &self.intermediate_manager,
326            &self.metadata,
327            self.fulltext_index_config.compress,
328            mem_limit,
329        )
330        .await;
331
332        let err = match creator {
333            Ok(creator) => {
334                if creator.is_none() {
335                    debug!(
336                        "Skip creating full-text index due to no columns require indexing, region_id: {}, file_id: {}",
337                        self.metadata.region_id, file_id,
338                    );
339                }
340                return creator;
341            }
342            Err(err) => err,
343        };
344
345        if cfg!(any(test, feature = "test")) {
346            panic!(
347                "Failed to create full-text indexer, region_id: {}, file_id: {}, err: {:?}",
348                self.metadata.region_id, file_id, err
349            );
350        } else {
351            warn!(
352                err; "Failed to create full-text indexer, region_id: {}, file_id: {}",
353                self.metadata.region_id, file_id,
354            );
355        }
356
357        None
358    }
359
360    fn build_bloom_filter_indexer(&self, file_id: FileId) -> Option<BloomFilterIndexer> {
361        let create = match self.build_type {
362            IndexBuildType::Flush => self.bloom_filter_index_config.create_on_flush.auto(),
363            IndexBuildType::Compact => self.bloom_filter_index_config.create_on_compaction.auto(),
364            _ => true,
365        };
366
367        if !create {
368            debug!(
369                "Skip creating bloom filter due to config, region_id: {}, file_id: {}",
370                self.metadata.region_id, file_id,
371            );
372            return None;
373        }
374
375        let mem_limit = self.bloom_filter_index_config.mem_threshold_on_create();
376        let indexer = BloomFilterIndexer::new(
377            file_id,
378            &self.metadata,
379            self.intermediate_manager.clone(),
380            mem_limit,
381        );
382
383        let err = match indexer {
384            Ok(indexer) => {
385                if indexer.is_none() {
386                    debug!(
387                        "Skip creating bloom filter due to no columns require indexing, region_id: {}, file_id: {}",
388                        self.metadata.region_id, file_id,
389                    );
390                }
391                return indexer;
392            }
393            Err(err) => err,
394        };
395
396        if cfg!(any(test, feature = "test")) {
397            panic!(
398                "Failed to create bloom filter, region_id: {}, file_id: {}, err: {:?}",
399                self.metadata.region_id, file_id, err
400            );
401        } else {
402            warn!(
403                err; "Failed to create bloom filter, region_id: {}, file_id: {}",
404                self.metadata.region_id, file_id,
405            );
406        }
407
408        None
409    }
410}
411
412/// Type of an index build task.
413#[derive(Debug, Clone, PartialEq, IntoStaticStr)]
414pub enum IndexBuildType {
415    /// Build index when schema change.
416    SchemaChange,
417    /// Create or update index after flush.
418    Flush,
419    /// Create or update index after compact.
420    Compact,
421    /// Manually build index.
422    Manual,
423}
424
425impl IndexBuildType {
426    fn as_str(&self) -> &'static str {
427        self.into()
428    }
429}
430
431impl From<OperationType> for IndexBuildType {
432    fn from(op_type: OperationType) -> Self {
433        match op_type {
434            OperationType::Flush => IndexBuildType::Flush,
435            OperationType::Compact => IndexBuildType::Compact,
436        }
437    }
438}
439
440/// Outcome of an index build task.
441#[derive(Debug, Clone, PartialEq, Eq, Hash)]
442pub enum IndexBuildOutcome {
443    Finished,
444    Aborted(String),
445}
446
447/// Mpsc output result sender.
448pub type ResultMpscSender = Sender<Result<IndexBuildOutcome>>;
449
450pub struct IndexBuildTask {
451    /// The file meta to build index for.
452    pub file_meta: FileMeta,
453    pub reason: IndexBuildType,
454    pub access_layer: AccessLayerRef,
455    pub(crate) listener: WorkerListener,
456    pub(crate) manifest_ctx: ManifestContextRef,
457    pub write_cache: Option<WriteCacheRef>,
458    pub file_purger: FilePurgerRef,
459    /// When write cache is enabled, the indexer builder should be built from the write cache.
460    /// Otherwise, it should be built from the access layer.
461    pub indexer_builder: Arc<dyn IndexerBuilder + Send + Sync>,
462    /// Request sender to notify the region worker.
463    pub(crate) request_sender: Sender<WorkerRequestWithTime>,
464    /// Index build result sender.
465    pub(crate) result_sender: ResultMpscSender,
466}
467
468impl IndexBuildTask {
469    /// Notify the caller the job is success.
470    pub async fn on_success(&mut self, outcome: IndexBuildOutcome) {
471        let _ = self.result_sender.send(Ok(outcome)).await;
472    }
473
474    /// Send index build error to waiter.
475    pub async fn on_failure(&mut self, err: Arc<Error>) {
476        let _ = self
477            .result_sender
478            .send(Err(err.clone()).context(BuildIndexAsyncSnafu {
479                region_id: self.file_meta.region_id,
480            }))
481            .await;
482    }
483
484    fn into_index_build_job(mut self, version_control: VersionControlRef) -> Job {
485        Box::pin(async move {
486            self.do_index_build(version_control).await;
487        })
488    }
489
490    async fn do_index_build(&mut self, version_control: VersionControlRef) {
491        self.listener
492            .on_index_build_begin(RegionFileId::new(
493                self.file_meta.region_id,
494                self.file_meta.file_id,
495            ))
496            .await;
497        match self.index_build(version_control).await {
498            Ok(outcome) => self.on_success(outcome).await,
499            Err(e) => {
500                warn!(
501                    e; "Index build task failed, region: {}, file_id: {}",
502                    self.file_meta.region_id, self.file_meta.file_id,
503                );
504                self.on_failure(e.into()).await
505            }
506        };
507    }
508
509    // Checks if the SST file still exists in object store and version to avoid conflict with compaction.
510    async fn check_sst_file_exists(&self, version_control: &VersionControlRef) -> bool {
511        let file_id = self.file_meta.file_id;
512        let level = self.file_meta.level;
513        // We should check current version instead of the version when the job is created.
514        let version = version_control.current().version;
515
516        let Some(level_files) = version.ssts.levels().get(level as usize) else {
517            warn!(
518                "File id {} not found in level {} for index build, region: {}",
519                file_id, level, self.file_meta.region_id
520            );
521            return false;
522        };
523
524        match level_files.files.get(&file_id) {
525            Some(handle) if !handle.is_deleted() && !handle.compacting() => {
526                // If the file's metadata is present in the current version, the physical SST file
527                // is guaranteed to exist on object store. The file purger removes the physical
528                // file only after its metadata is removed from the version.
529                true
530            }
531            _ => {
532                warn!(
533                    "File id {} not found in region version for index build, region: {}",
534                    file_id, self.file_meta.region_id
535                );
536                false
537            }
538        }
539    }
540
541    async fn index_build(
542        &mut self,
543        version_control: VersionControlRef,
544    ) -> Result<IndexBuildOutcome> {
545        let mut indexer = self.indexer_builder.build(self.file_meta.file_id).await;
546
547        // Check SST file existence before building index to avoid failure of parquet reader.
548        if !self.check_sst_file_exists(&version_control).await {
549            // Calls abort to clean up index files.
550            indexer.abort().await;
551            self.listener
552                .on_index_build_abort(RegionFileId::new(
553                    self.file_meta.region_id,
554                    self.file_meta.file_id,
555                ))
556                .await;
557            return Ok(IndexBuildOutcome::Aborted(format!(
558                "SST file not found during index build, region: {}, file_id: {}",
559                self.file_meta.region_id, self.file_meta.file_id
560            )));
561        }
562
563        let mut parquet_reader = self
564            .access_layer
565            .read_sst(FileHandle::new(
566                self.file_meta.clone(),
567                self.file_purger.clone(),
568            ))
569            .build()
570            .await?;
571
572        // TODO(SNC123): optimize index batch
573        loop {
574            match parquet_reader.next_batch().await {
575                Ok(Some(batch)) => {
576                    indexer.update(&mut batch.clone()).await;
577                }
578                Ok(None) => break,
579                Err(e) => {
580                    indexer.abort().await;
581                    return Err(e);
582                }
583            }
584        }
585        let index_output = indexer.finish().await;
586
587        if index_output.file_size > 0 {
588            // Check SST file existence again after building index.
589            if !self.check_sst_file_exists(&version_control).await {
590                // Calls abort to clean up index files.
591                indexer.abort().await;
592                self.listener
593                    .on_index_build_abort(RegionFileId::new(
594                        self.file_meta.region_id,
595                        self.file_meta.file_id,
596                    ))
597                    .await;
598                return Ok(IndexBuildOutcome::Aborted(format!(
599                    "SST file not found during index build, region: {}, file_id: {}",
600                    self.file_meta.region_id, self.file_meta.file_id
601                )));
602            }
603
604            // Upload index file if write cache is enabled.
605            self.maybe_upload_index_file(index_output.clone()).await?;
606
607            let worker_request = match self.update_manifest(index_output).await {
608                Ok(edit) => {
609                    let index_build_finished = IndexBuildFinished {
610                        region_id: self.file_meta.region_id,
611                        edit,
612                    };
613                    WorkerRequest::Background {
614                        region_id: self.file_meta.region_id,
615                        notify: BackgroundNotify::IndexBuildFinished(index_build_finished),
616                    }
617                }
618                Err(e) => {
619                    let err = Arc::new(e);
620                    WorkerRequest::Background {
621                        region_id: self.file_meta.region_id,
622                        notify: BackgroundNotify::IndexBuildFailed(IndexBuildFailed { err }),
623                    }
624                }
625            };
626
627            let _ = self
628                .request_sender
629                .send(WorkerRequestWithTime::new(worker_request))
630                .await;
631        }
632        Ok(IndexBuildOutcome::Finished)
633    }
634
635    async fn maybe_upload_index_file(&self, output: IndexOutput) -> Result<()> {
636        if let Some(write_cache) = &self.write_cache {
637            let file_id = self.file_meta.file_id;
638            let region_id = self.file_meta.region_id;
639            let remote_store = self.access_layer.object_store();
640            let mut upload_tracker = UploadTracker::new(region_id);
641            let mut err = None;
642            let puffin_key = IndexKey::new(region_id, file_id, FileType::Puffin);
643            let puffin_path = RegionFilePathFactory::new(
644                self.access_layer.table_dir().to_string(),
645                self.access_layer.path_type(),
646            )
647            .build_index_file_path(RegionFileId::new(region_id, file_id));
648            if let Err(e) = write_cache
649                .upload(puffin_key, &puffin_path, remote_store)
650                .await
651            {
652                err = Some(e);
653            }
654            upload_tracker.push_uploaded_file(puffin_path);
655            if let Some(err) = err {
656                // Cleans index files on failure.
657                upload_tracker
658                    .clean(
659                        &smallvec![SstInfo {
660                            file_id,
661                            index_metadata: output,
662                            ..Default::default()
663                        }],
664                        &write_cache.file_cache(),
665                        remote_store,
666                    )
667                    .await;
668                return Err(err);
669            }
670        } else {
671            debug!("write cache is not available, skip uploading index file");
672        }
673        Ok(())
674    }
675
676    async fn update_manifest(&mut self, output: IndexOutput) -> Result<RegionEdit> {
677        self.file_meta.available_indexes = output.build_available_indexes();
678        self.file_meta.index_file_size = output.file_size;
679        let edit = RegionEdit {
680            files_to_add: vec![self.file_meta.clone()],
681            files_to_remove: vec![],
682            timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
683            flushed_sequence: None,
684            flushed_entry_id: None,
685            committed_sequence: None,
686            compaction_time_window: None,
687        };
688        let version = self
689            .manifest_ctx
690            .update_manifest(
691                RegionLeaderState::Writable,
692                RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())),
693            )
694            .await?;
695        info!(
696            "Successfully update manifest version to {version}, region: {}, reason: {}",
697            self.file_meta.region_id,
698            self.reason.as_str()
699        );
700        Ok(edit)
701    }
702}
703
704#[derive(Clone)]
705pub struct IndexBuildScheduler {
706    scheduler: SchedulerRef,
707}
708
709impl IndexBuildScheduler {
710    pub fn new(scheduler: SchedulerRef) -> Self {
711        IndexBuildScheduler { scheduler }
712    }
713
714    pub(crate) fn schedule_build(
715        &mut self,
716        version_control: &VersionControlRef,
717        task: IndexBuildTask,
718    ) -> Result<()> {
719        // We should clone version control to expand the lifetime.
720        let job = task.into_index_build_job(version_control.clone());
721        self.scheduler.schedule(job)?;
722        Ok(())
723    }
724}
725
726/// Decodes primary keys from a flat format RecordBatch.
727/// Returns a list of (decoded_pk_value, count) tuples where count is the number of occurrences.
728pub(crate) fn decode_primary_keys_with_counts(
729    batch: &RecordBatch,
730    codec: &IndexValuesCodec,
731) -> Result<Vec<(CompositeValues, usize)>> {
732    let primary_key_index = primary_key_column_index(batch.num_columns());
733    let pk_dict_array = batch
734        .column(primary_key_index)
735        .as_any()
736        .downcast_ref::<PrimaryKeyArray>()
737        .context(InvalidRecordBatchSnafu {
738            reason: "Primary key column is not a dictionary array",
739        })?;
740    let pk_values_array = pk_dict_array
741        .values()
742        .as_any()
743        .downcast_ref::<BinaryArray>()
744        .context(InvalidRecordBatchSnafu {
745            reason: "Primary key values are not binary array",
746        })?;
747    let keys = pk_dict_array.keys();
748
749    // Decodes primary keys and count consecutive occurrences
750    let mut result: Vec<(CompositeValues, usize)> = Vec::new();
751    let mut prev_key: Option<u32> = None;
752
753    for i in 0..keys.len() {
754        let current_key = keys.value(i);
755
756        // Checks if current key is the same as previous key
757        if let Some(prev) = prev_key
758            && prev == current_key
759        {
760            // Safety: We already have a key in the result vector.
761            result.last_mut().unwrap().1 += 1;
762            continue;
763        }
764
765        // New key, decodes it.
766        let pk_bytes = pk_values_array.value(current_key as usize);
767        let decoded_value = codec.decoder().decode(pk_bytes).context(DecodeSnafu)?;
768
769        result.push((decoded_value, 1));
770        prev_key = Some(current_key);
771    }
772
773    Ok(result)
774}
775
776#[cfg(test)]
777mod tests {
778    use std::sync::Arc;
779
780    use api::v1::SemanticType;
781    use common_base::readable_size::ReadableSize;
782    use datafusion_common::HashMap;
783    use datatypes::data_type::ConcreteDataType;
784    use datatypes::schema::{
785        ColumnSchema, FulltextOptions, SkippingIndexOptions, SkippingIndexType,
786    };
787    use object_store::ObjectStore;
788    use object_store::services::Memory;
789    use puffin_manager::PuffinManagerFactory;
790    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
791    use tokio::sync::mpsc;
792
793    use super::*;
794    use crate::access_layer::{FilePathProvider, SstWriteRequest, WriteType};
795    use crate::cache::write_cache::WriteCache;
796    use crate::config::{FulltextIndexConfig, IndexBuildMode, MitoConfig, Mode};
797    use crate::memtable::time_partition::TimePartitions;
798    use crate::region::version::{VersionBuilder, VersionControl};
799    use crate::sst::file::RegionFileId;
800    use crate::sst::file_purger::NoopFilePurger;
801    use crate::sst::location;
802    use crate::sst::parquet::WriteOptions;
803    use crate::test_util::memtable_util::EmptyMemtableBuilder;
804    use crate::test_util::scheduler_util::SchedulerEnv;
805    use crate::test_util::sst_util::{new_batch_by_range, new_source, sst_region_metadata};
806
807    struct MetaConfig {
808        with_inverted: bool,
809        with_fulltext: bool,
810        with_skipping_bloom: bool,
811    }
812
813    fn mock_region_metadata(
814        MetaConfig {
815            with_inverted,
816            with_fulltext,
817            with_skipping_bloom,
818        }: MetaConfig,
819    ) -> RegionMetadataRef {
820        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
821        let mut column_schema = ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false);
822        if with_inverted {
823            column_schema = column_schema.with_inverted_index(true);
824        }
825        builder
826            .push_column_metadata(ColumnMetadata {
827                column_schema,
828                semantic_type: SemanticType::Field,
829                column_id: 1,
830            })
831            .push_column_metadata(ColumnMetadata {
832                column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
833                semantic_type: SemanticType::Field,
834                column_id: 2,
835            })
836            .push_column_metadata(ColumnMetadata {
837                column_schema: ColumnSchema::new(
838                    "c",
839                    ConcreteDataType::timestamp_millisecond_datatype(),
840                    false,
841                ),
842                semantic_type: SemanticType::Timestamp,
843                column_id: 3,
844            });
845
846        if with_fulltext {
847            let column_schema =
848                ColumnSchema::new("text", ConcreteDataType::string_datatype(), true)
849                    .with_fulltext_options(FulltextOptions {
850                        enable: true,
851                        ..Default::default()
852                    })
853                    .unwrap();
854
855            let column = ColumnMetadata {
856                column_schema,
857                semantic_type: SemanticType::Field,
858                column_id: 4,
859            };
860
861            builder.push_column_metadata(column);
862        }
863
864        if with_skipping_bloom {
865            let column_schema =
866                ColumnSchema::new("bloom", ConcreteDataType::string_datatype(), false)
867                    .with_skipping_options(SkippingIndexOptions::new_unchecked(
868                        42,
869                        0.01,
870                        SkippingIndexType::BloomFilter,
871                    ))
872                    .unwrap();
873
874            let column = ColumnMetadata {
875                column_schema,
876                semantic_type: SemanticType::Field,
877                column_id: 5,
878            };
879
880            builder.push_column_metadata(column);
881        }
882
883        Arc::new(builder.build().unwrap())
884    }
885
886    fn mock_object_store() -> ObjectStore {
887        ObjectStore::new(Memory::default()).unwrap().finish()
888    }
889
890    async fn mock_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
891        IntermediateManager::init_fs(path).await.unwrap()
892    }
893    struct NoopPathProvider;
894
895    impl FilePathProvider for NoopPathProvider {
896        fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
897            unreachable!()
898        }
899
900        fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
901            unreachable!()
902        }
903    }
904
905    async fn mock_sst_file(
906        metadata: RegionMetadataRef,
907        env: &SchedulerEnv,
908        build_mode: IndexBuildMode,
909    ) -> SstInfo {
910        let source = new_source(&[
911            new_batch_by_range(&["a", "d"], 0, 60),
912            new_batch_by_range(&["b", "f"], 0, 40),
913            new_batch_by_range(&["b", "h"], 100, 200),
914        ]);
915        let mut index_config = MitoConfig::default().index;
916        index_config.build_mode = build_mode;
917        let write_request = SstWriteRequest {
918            op_type: OperationType::Flush,
919            metadata: metadata.clone(),
920            source: either::Left(source),
921            storage: None,
922            max_sequence: None,
923            cache_manager: Default::default(),
924            index_options: IndexOptions::default(),
925            index_config,
926            inverted_index_config: Default::default(),
927            fulltext_index_config: Default::default(),
928            bloom_filter_index_config: Default::default(),
929        };
930        env.access_layer
931            .write_sst(write_request, &WriteOptions::default(), WriteType::Flush)
932            .await
933            .unwrap()
934            .0
935            .remove(0)
936    }
937
938    async fn mock_version_control(
939        metadata: RegionMetadataRef,
940        file_purger: FilePurgerRef,
941        files: HashMap<FileId, FileMeta>,
942    ) -> VersionControlRef {
943        let mutable = Arc::new(TimePartitions::new(
944            metadata.clone(),
945            Arc::new(EmptyMemtableBuilder::default()),
946            0,
947            None,
948        ));
949        let version_builder = VersionBuilder::new(metadata, mutable)
950            .add_files(file_purger, files.values().cloned())
951            .build();
952        Arc::new(VersionControl::new(version_builder))
953    }
954
955    async fn mock_indexer_builder(
956        metadata: RegionMetadataRef,
957        env: &SchedulerEnv,
958    ) -> Arc<dyn IndexerBuilder + Send + Sync> {
959        let (dir, factory) = PuffinManagerFactory::new_for_test_async("mock_indexer_builder").await;
960        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
961        let puffin_manager = factory.build(
962            env.access_layer.object_store().clone(),
963            RegionFilePathFactory::new(
964                env.access_layer.table_dir().to_string(),
965                env.access_layer.path_type(),
966            ),
967        );
968        Arc::new(IndexerBuilderImpl {
969            build_type: IndexBuildType::Flush,
970            metadata,
971            row_group_size: 1024,
972            puffin_manager,
973            intermediate_manager: intm_manager,
974            index_options: IndexOptions::default(),
975            inverted_index_config: InvertedIndexConfig::default(),
976            fulltext_index_config: FulltextIndexConfig::default(),
977            bloom_filter_index_config: BloomFilterConfig::default(),
978        })
979    }
980
981    #[tokio::test]
982    async fn test_build_indexer_basic() {
983        let (dir, factory) =
984            PuffinManagerFactory::new_for_test_async("test_build_indexer_basic_").await;
985        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
986
987        let metadata = mock_region_metadata(MetaConfig {
988            with_inverted: true,
989            with_fulltext: true,
990            with_skipping_bloom: true,
991        });
992        let indexer = IndexerBuilderImpl {
993            build_type: IndexBuildType::Flush,
994            metadata,
995            row_group_size: 1024,
996            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
997            intermediate_manager: intm_manager,
998            index_options: IndexOptions::default(),
999            inverted_index_config: InvertedIndexConfig::default(),
1000            fulltext_index_config: FulltextIndexConfig::default(),
1001            bloom_filter_index_config: BloomFilterConfig::default(),
1002        }
1003        .build(FileId::random())
1004        .await;
1005
1006        assert!(indexer.inverted_indexer.is_some());
1007        assert!(indexer.fulltext_indexer.is_some());
1008        assert!(indexer.bloom_filter_indexer.is_some());
1009    }
1010
1011    #[tokio::test]
1012    async fn test_build_indexer_disable_create() {
1013        let (dir, factory) =
1014            PuffinManagerFactory::new_for_test_async("test_build_indexer_disable_create_").await;
1015        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1016
1017        let metadata = mock_region_metadata(MetaConfig {
1018            with_inverted: true,
1019            with_fulltext: true,
1020            with_skipping_bloom: true,
1021        });
1022        let indexer = IndexerBuilderImpl {
1023            build_type: IndexBuildType::Flush,
1024            metadata: metadata.clone(),
1025            row_group_size: 1024,
1026            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1027            intermediate_manager: intm_manager.clone(),
1028            index_options: IndexOptions::default(),
1029            inverted_index_config: InvertedIndexConfig {
1030                create_on_flush: Mode::Disable,
1031                ..Default::default()
1032            },
1033            fulltext_index_config: FulltextIndexConfig::default(),
1034            bloom_filter_index_config: BloomFilterConfig::default(),
1035        }
1036        .build(FileId::random())
1037        .await;
1038
1039        assert!(indexer.inverted_indexer.is_none());
1040        assert!(indexer.fulltext_indexer.is_some());
1041        assert!(indexer.bloom_filter_indexer.is_some());
1042
1043        let indexer = IndexerBuilderImpl {
1044            build_type: IndexBuildType::Compact,
1045            metadata: metadata.clone(),
1046            row_group_size: 1024,
1047            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1048            intermediate_manager: intm_manager.clone(),
1049            index_options: IndexOptions::default(),
1050            inverted_index_config: InvertedIndexConfig::default(),
1051            fulltext_index_config: FulltextIndexConfig {
1052                create_on_compaction: Mode::Disable,
1053                ..Default::default()
1054            },
1055            bloom_filter_index_config: BloomFilterConfig::default(),
1056        }
1057        .build(FileId::random())
1058        .await;
1059
1060        assert!(indexer.inverted_indexer.is_some());
1061        assert!(indexer.fulltext_indexer.is_none());
1062        assert!(indexer.bloom_filter_indexer.is_some());
1063
1064        let indexer = IndexerBuilderImpl {
1065            build_type: IndexBuildType::Compact,
1066            metadata,
1067            row_group_size: 1024,
1068            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1069            intermediate_manager: intm_manager,
1070            index_options: IndexOptions::default(),
1071            inverted_index_config: InvertedIndexConfig::default(),
1072            fulltext_index_config: FulltextIndexConfig::default(),
1073            bloom_filter_index_config: BloomFilterConfig {
1074                create_on_compaction: Mode::Disable,
1075                ..Default::default()
1076            },
1077        }
1078        .build(FileId::random())
1079        .await;
1080
1081        assert!(indexer.inverted_indexer.is_some());
1082        assert!(indexer.fulltext_indexer.is_some());
1083        assert!(indexer.bloom_filter_indexer.is_none());
1084    }
1085
1086    #[tokio::test]
1087    async fn test_build_indexer_no_required() {
1088        let (dir, factory) =
1089            PuffinManagerFactory::new_for_test_async("test_build_indexer_no_required_").await;
1090        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1091
1092        let metadata = mock_region_metadata(MetaConfig {
1093            with_inverted: false,
1094            with_fulltext: true,
1095            with_skipping_bloom: true,
1096        });
1097        let indexer = IndexerBuilderImpl {
1098            build_type: IndexBuildType::Flush,
1099            metadata: metadata.clone(),
1100            row_group_size: 1024,
1101            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1102            intermediate_manager: intm_manager.clone(),
1103            index_options: IndexOptions::default(),
1104            inverted_index_config: InvertedIndexConfig::default(),
1105            fulltext_index_config: FulltextIndexConfig::default(),
1106            bloom_filter_index_config: BloomFilterConfig::default(),
1107        }
1108        .build(FileId::random())
1109        .await;
1110
1111        assert!(indexer.inverted_indexer.is_none());
1112        assert!(indexer.fulltext_indexer.is_some());
1113        assert!(indexer.bloom_filter_indexer.is_some());
1114
1115        let metadata = mock_region_metadata(MetaConfig {
1116            with_inverted: true,
1117            with_fulltext: false,
1118            with_skipping_bloom: true,
1119        });
1120        let indexer = IndexerBuilderImpl {
1121            build_type: IndexBuildType::Flush,
1122            metadata: metadata.clone(),
1123            row_group_size: 1024,
1124            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1125            intermediate_manager: intm_manager.clone(),
1126            index_options: IndexOptions::default(),
1127            inverted_index_config: InvertedIndexConfig::default(),
1128            fulltext_index_config: FulltextIndexConfig::default(),
1129            bloom_filter_index_config: BloomFilterConfig::default(),
1130        }
1131        .build(FileId::random())
1132        .await;
1133
1134        assert!(indexer.inverted_indexer.is_some());
1135        assert!(indexer.fulltext_indexer.is_none());
1136        assert!(indexer.bloom_filter_indexer.is_some());
1137
1138        let metadata = mock_region_metadata(MetaConfig {
1139            with_inverted: true,
1140            with_fulltext: true,
1141            with_skipping_bloom: false,
1142        });
1143        let indexer = IndexerBuilderImpl {
1144            build_type: IndexBuildType::Flush,
1145            metadata: metadata.clone(),
1146            row_group_size: 1024,
1147            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1148            intermediate_manager: intm_manager,
1149            index_options: IndexOptions::default(),
1150            inverted_index_config: InvertedIndexConfig::default(),
1151            fulltext_index_config: FulltextIndexConfig::default(),
1152            bloom_filter_index_config: BloomFilterConfig::default(),
1153        }
1154        .build(FileId::random())
1155        .await;
1156
1157        assert!(indexer.inverted_indexer.is_some());
1158        assert!(indexer.fulltext_indexer.is_some());
1159        assert!(indexer.bloom_filter_indexer.is_none());
1160    }
1161
1162    #[tokio::test]
1163    async fn test_build_indexer_zero_row_group() {
1164        let (dir, factory) =
1165            PuffinManagerFactory::new_for_test_async("test_build_indexer_zero_row_group_").await;
1166        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1167
1168        let metadata = mock_region_metadata(MetaConfig {
1169            with_inverted: true,
1170            with_fulltext: true,
1171            with_skipping_bloom: true,
1172        });
1173        let indexer = IndexerBuilderImpl {
1174            build_type: IndexBuildType::Flush,
1175            metadata,
1176            row_group_size: 0,
1177            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1178            intermediate_manager: intm_manager,
1179            index_options: IndexOptions::default(),
1180            inverted_index_config: InvertedIndexConfig::default(),
1181            fulltext_index_config: FulltextIndexConfig::default(),
1182            bloom_filter_index_config: BloomFilterConfig::default(),
1183        }
1184        .build(FileId::random())
1185        .await;
1186
1187        assert!(indexer.inverted_indexer.is_none());
1188    }
1189
1190    #[tokio::test]
1191    async fn test_index_build_task_sst_not_exist() {
1192        let env = SchedulerEnv::new().await;
1193        let (tx, _rx) = mpsc::channel(4);
1194        let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1195        let mut scheduler = env.mock_index_build_scheduler();
1196        let metadata = Arc::new(sst_region_metadata());
1197        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1198        let file_purger = Arc::new(NoopFilePurger {});
1199        let files = HashMap::new();
1200        let version_control =
1201            mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1202        let region_id = metadata.region_id;
1203        let indexer_builder = mock_indexer_builder(metadata, &env).await;
1204
1205        // Create mock task.
1206        let task = IndexBuildTask {
1207            file_meta: FileMeta {
1208                region_id,
1209                file_id: FileId::random(),
1210                file_size: 100,
1211                ..Default::default()
1212            },
1213            reason: IndexBuildType::Flush,
1214            access_layer: env.access_layer.clone(),
1215            listener: WorkerListener::default(),
1216            manifest_ctx,
1217            write_cache: None,
1218            file_purger,
1219            indexer_builder,
1220            request_sender: tx,
1221            result_sender: result_tx,
1222        };
1223
1224        // Schedule the build task and check result.
1225        scheduler.schedule_build(&version_control, task).unwrap();
1226        match result_rx.recv().await.unwrap() {
1227            Ok(outcome) => {
1228                if outcome == IndexBuildOutcome::Finished {
1229                    panic!("Expect aborted result due to missing SST file")
1230                }
1231            }
1232            _ => panic!("Expect aborted result due to missing SST file"),
1233        }
1234    }
1235
1236    #[tokio::test]
1237    async fn test_index_build_task_sst_exist() {
1238        let env = SchedulerEnv::new().await;
1239        let mut scheduler = env.mock_index_build_scheduler();
1240        let metadata = Arc::new(sst_region_metadata());
1241        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1242        let region_id = metadata.region_id;
1243        let file_purger = Arc::new(NoopFilePurger {});
1244        let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
1245        let file_meta = FileMeta {
1246            region_id,
1247            file_id: sst_info.file_id,
1248            file_size: sst_info.file_size,
1249            index_file_size: sst_info.index_metadata.file_size,
1250            num_rows: sst_info.num_rows as u64,
1251            num_row_groups: sst_info.num_row_groups,
1252            ..Default::default()
1253        };
1254        let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1255        let version_control =
1256            mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1257        let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1258
1259        // Create mock task.
1260        let (tx, mut rx) = mpsc::channel(4);
1261        let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1262        let task = IndexBuildTask {
1263            file_meta: file_meta.clone(),
1264            reason: IndexBuildType::Flush,
1265            access_layer: env.access_layer.clone(),
1266            listener: WorkerListener::default(),
1267            manifest_ctx,
1268            write_cache: None,
1269            file_purger,
1270            indexer_builder,
1271            request_sender: tx,
1272            result_sender: result_tx,
1273        };
1274
1275        scheduler.schedule_build(&version_control, task).unwrap();
1276
1277        // The task should finish successfully.
1278        match result_rx.recv().await.unwrap() {
1279            Ok(outcome) => {
1280                assert_eq!(outcome, IndexBuildOutcome::Finished);
1281            }
1282            _ => panic!("Expect finished result"),
1283        }
1284
1285        // A notification should be sent to the worker to update the manifest.
1286        let worker_req = rx.recv().await.unwrap().request;
1287        match worker_req {
1288            WorkerRequest::Background {
1289                region_id: req_region_id,
1290                notify: BackgroundNotify::IndexBuildFinished(finished),
1291            } => {
1292                assert_eq!(req_region_id, region_id);
1293                assert_eq!(finished.edit.files_to_add.len(), 1);
1294                let updated_meta = &finished.edit.files_to_add[0];
1295
1296                // The mock indexer builder creates all index types.
1297                assert!(!updated_meta.available_indexes.is_empty());
1298                assert!(updated_meta.index_file_size > 0);
1299                assert_eq!(updated_meta.file_id, file_meta.file_id);
1300            }
1301            _ => panic!("Unexpected worker request: {:?}", worker_req),
1302        }
1303    }
1304
1305    async fn schedule_index_build_task_with_mode(build_mode: IndexBuildMode) {
1306        let env = SchedulerEnv::new().await;
1307        let mut scheduler = env.mock_index_build_scheduler();
1308        let metadata = Arc::new(sst_region_metadata());
1309        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1310        let file_purger = Arc::new(NoopFilePurger {});
1311        let region_id = metadata.region_id;
1312        let sst_info = mock_sst_file(metadata.clone(), &env, build_mode.clone()).await;
1313        let file_meta = FileMeta {
1314            region_id,
1315            file_id: sst_info.file_id,
1316            file_size: sst_info.file_size,
1317            index_file_size: sst_info.index_metadata.file_size,
1318            num_rows: sst_info.num_rows as u64,
1319            num_row_groups: sst_info.num_row_groups,
1320            ..Default::default()
1321        };
1322        let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1323        let version_control =
1324            mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1325        let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1326
1327        // Create mock task.
1328        let (tx, _rx) = mpsc::channel(4);
1329        let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1330        let task = IndexBuildTask {
1331            file_meta: file_meta.clone(),
1332            reason: IndexBuildType::Flush,
1333            access_layer: env.access_layer.clone(),
1334            listener: WorkerListener::default(),
1335            manifest_ctx,
1336            write_cache: None,
1337            file_purger,
1338            indexer_builder,
1339            request_sender: tx,
1340            result_sender: result_tx,
1341        };
1342
1343        scheduler.schedule_build(&version_control, task).unwrap();
1344
1345        let puffin_path = location::index_file_path(
1346            env.access_layer.table_dir(),
1347            RegionFileId::new(region_id, file_meta.file_id),
1348            env.access_layer.path_type(),
1349        );
1350
1351        if build_mode == IndexBuildMode::Async {
1352            // The index file should not exist before the task finishes.
1353            assert!(
1354                !env.access_layer
1355                    .object_store()
1356                    .exists(&puffin_path)
1357                    .await
1358                    .unwrap()
1359            );
1360        } else {
1361            // The index file should exist before the task finishes.
1362            assert!(
1363                env.access_layer
1364                    .object_store()
1365                    .exists(&puffin_path)
1366                    .await
1367                    .unwrap()
1368            );
1369        }
1370
1371        // The task should finish successfully.
1372        match result_rx.recv().await.unwrap() {
1373            Ok(outcome) => {
1374                assert_eq!(outcome, IndexBuildOutcome::Finished);
1375            }
1376            _ => panic!("Expect finished result"),
1377        }
1378
1379        // The index file should exist after the task finishes.
1380        assert!(
1381            env.access_layer
1382                .object_store()
1383                .exists(&puffin_path)
1384                .await
1385                .unwrap()
1386        );
1387    }
1388
1389    #[tokio::test]
1390    async fn test_index_build_task_build_mode() {
1391        schedule_index_build_task_with_mode(IndexBuildMode::Async).await;
1392        schedule_index_build_task_with_mode(IndexBuildMode::Sync).await;
1393    }
1394
1395    #[tokio::test]
1396    async fn test_index_build_task_no_index() {
1397        let env = SchedulerEnv::new().await;
1398        let mut scheduler = env.mock_index_build_scheduler();
1399        let mut metadata = sst_region_metadata();
1400        // Unset indexes in metadata to simulate no index scenario.
1401        metadata.column_metadatas.iter_mut().for_each(|col| {
1402            col.column_schema.set_inverted_index(false);
1403            let _ = col.column_schema.unset_skipping_options();
1404        });
1405        let region_id = metadata.region_id;
1406        let metadata = Arc::new(metadata);
1407        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1408        let file_purger = Arc::new(NoopFilePurger {});
1409        let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
1410        let file_meta = FileMeta {
1411            region_id,
1412            file_id: sst_info.file_id,
1413            file_size: sst_info.file_size,
1414            index_file_size: sst_info.index_metadata.file_size,
1415            num_rows: sst_info.num_rows as u64,
1416            num_row_groups: sst_info.num_row_groups,
1417            ..Default::default()
1418        };
1419        let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1420        let version_control =
1421            mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1422        let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1423
1424        // Create mock task.
1425        let (tx, mut rx) = mpsc::channel(4);
1426        let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1427        let task = IndexBuildTask {
1428            file_meta: file_meta.clone(),
1429            reason: IndexBuildType::Flush,
1430            access_layer: env.access_layer.clone(),
1431            listener: WorkerListener::default(),
1432            manifest_ctx,
1433            write_cache: None,
1434            file_purger,
1435            indexer_builder,
1436            request_sender: tx,
1437            result_sender: result_tx,
1438        };
1439
1440        scheduler.schedule_build(&version_control, task).unwrap();
1441
1442        // The task should finish successfully.
1443        match result_rx.recv().await.unwrap() {
1444            Ok(outcome) => {
1445                assert_eq!(outcome, IndexBuildOutcome::Finished);
1446            }
1447            _ => panic!("Expect finished result"),
1448        }
1449
1450        // No index is built, so no notification should be sent to the worker.
1451        let _ = rx.recv().await.is_none();
1452    }
1453
1454    #[tokio::test]
1455    async fn test_index_build_task_with_write_cache() {
1456        let env = SchedulerEnv::new().await;
1457        let mut scheduler = env.mock_index_build_scheduler();
1458        let metadata = Arc::new(sst_region_metadata());
1459        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1460        let file_purger = Arc::new(NoopFilePurger {});
1461        let region_id = metadata.region_id;
1462
1463        let (dir, factory) = PuffinManagerFactory::new_for_test_async("test_write_cache").await;
1464        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1465
1466        // Create mock write cache
1467        let write_cache = Arc::new(
1468            WriteCache::new_fs(
1469                dir.path().to_str().unwrap(),
1470                ReadableSize::mb(10),
1471                None,
1472                factory,
1473                intm_manager,
1474            )
1475            .await
1476            .unwrap(),
1477        );
1478        // Indexer builder built from write cache.
1479        let indexer_builder = Arc::new(IndexerBuilderImpl {
1480            build_type: IndexBuildType::Flush,
1481            metadata: metadata.clone(),
1482            row_group_size: 1024,
1483            puffin_manager: write_cache.build_puffin_manager().clone(),
1484            intermediate_manager: write_cache.intermediate_manager().clone(),
1485            index_options: IndexOptions::default(),
1486            inverted_index_config: InvertedIndexConfig::default(),
1487            fulltext_index_config: FulltextIndexConfig::default(),
1488            bloom_filter_index_config: BloomFilterConfig::default(),
1489        });
1490
1491        let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
1492        let file_meta = FileMeta {
1493            region_id,
1494            file_id: sst_info.file_id,
1495            file_size: sst_info.file_size,
1496            index_file_size: sst_info.index_metadata.file_size,
1497            num_rows: sst_info.num_rows as u64,
1498            num_row_groups: sst_info.num_row_groups,
1499            ..Default::default()
1500        };
1501        let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1502        let version_control =
1503            mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1504
1505        // Create mock task.
1506        let (tx, mut _rx) = mpsc::channel(4);
1507        let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1508        let task = IndexBuildTask {
1509            file_meta: file_meta.clone(),
1510            reason: IndexBuildType::Flush,
1511            access_layer: env.access_layer.clone(),
1512            listener: WorkerListener::default(),
1513            manifest_ctx,
1514            write_cache: Some(write_cache.clone()),
1515            file_purger,
1516            indexer_builder,
1517            request_sender: tx,
1518            result_sender: result_tx,
1519        };
1520
1521        scheduler.schedule_build(&version_control, task).unwrap();
1522
1523        // The task should finish successfully.
1524        match result_rx.recv().await.unwrap() {
1525            Ok(outcome) => {
1526                assert_eq!(outcome, IndexBuildOutcome::Finished);
1527            }
1528            _ => panic!("Expect finished result"),
1529        }
1530
1531        // The write cache should contain the uploaded index file.
1532        let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Puffin);
1533        assert!(write_cache.file_cache().contains_key(&index_key));
1534    }
1535}