mito2/sst/
index.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod bloom_filter;
16pub(crate) mod fulltext_index;
17mod indexer;
18pub mod intermediate;
19pub(crate) mod inverted_index;
20pub mod puffin_manager;
21mod statistics;
22pub(crate) mod store;
23
24use std::cmp::Ordering;
25use std::collections::{BinaryHeap, HashMap, HashSet};
26use std::num::NonZeroUsize;
27use std::sync::Arc;
28
29use bloom_filter::creator::BloomFilterIndexer;
30use common_telemetry::{debug, error, info, warn};
31use datatypes::arrow::array::BinaryArray;
32use datatypes::arrow::record_batch::RecordBatch;
33use mito_codec::index::IndexValuesCodec;
34use mito_codec::row_converter::CompositeValues;
35use puffin_manager::SstPuffinManager;
36use smallvec::{SmallVec, smallvec};
37use snafu::{OptionExt, ResultExt};
38use statistics::{ByteCount, RowCount};
39use store_api::metadata::RegionMetadataRef;
40use store_api::storage::{ColumnId, FileId, RegionId};
41use strum::IntoStaticStr;
42use tokio::sync::mpsc::Sender;
43
44use crate::access_layer::{AccessLayerRef, FilePathProvider, OperationType, RegionFilePathFactory};
45use crate::cache::file_cache::{FileType, IndexKey};
46use crate::cache::write_cache::{UploadTracker, WriteCacheRef};
47use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
48use crate::error::{
49    BuildIndexAsyncSnafu, DecodeSnafu, Error, InvalidRecordBatchSnafu, RegionClosedSnafu,
50    RegionDroppedSnafu, RegionTruncatedSnafu, Result,
51};
52use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
53use crate::metrics::INDEX_CREATE_MEMORY_USAGE;
54use crate::read::{Batch, BatchReader};
55use crate::region::options::IndexOptions;
56use crate::region::version::VersionControlRef;
57use crate::region::{ManifestContextRef, RegionLeaderState};
58use crate::request::{
59    BackgroundNotify, IndexBuildFailed, IndexBuildFinished, IndexBuildStopped, WorkerRequest,
60    WorkerRequestWithTime,
61};
62use crate::schedule::scheduler::{Job, SchedulerRef};
63use crate::sst::file::{
64    ColumnIndexMetadata, FileHandle, FileMeta, IndexType, IndexTypes, RegionFileId,
65};
66use crate::sst::file_purger::FilePurgerRef;
67use crate::sst::index::fulltext_index::creator::FulltextIndexer;
68use crate::sst::index::intermediate::IntermediateManager;
69use crate::sst::index::inverted_index::creator::InvertedIndexer;
70use crate::sst::parquet::SstInfo;
71use crate::sst::parquet::flat_format::primary_key_column_index;
72use crate::sst::parquet::format::PrimaryKeyArray;
73use crate::worker::WorkerListener;
74
75pub(crate) const TYPE_INVERTED_INDEX: &str = "inverted_index";
76pub(crate) const TYPE_FULLTEXT_INDEX: &str = "fulltext_index";
77pub(crate) const TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
78
79/// Output of the index creation.
80#[derive(Debug, Clone, Default)]
81pub struct IndexOutput {
82    /// Size of the file.
83    pub file_size: u64,
84    /// Inverted index output.
85    pub inverted_index: InvertedIndexOutput,
86    /// Fulltext index output.
87    pub fulltext_index: FulltextIndexOutput,
88    /// Bloom filter output.
89    pub bloom_filter: BloomFilterOutput,
90}
91
92impl IndexOutput {
93    pub fn build_available_indexes(&self) -> SmallVec<[IndexType; 4]> {
94        let mut indexes = SmallVec::new();
95        if self.inverted_index.is_available() {
96            indexes.push(IndexType::InvertedIndex);
97        }
98        if self.fulltext_index.is_available() {
99            indexes.push(IndexType::FulltextIndex);
100        }
101        if self.bloom_filter.is_available() {
102            indexes.push(IndexType::BloomFilterIndex);
103        }
104        indexes
105    }
106
107    pub fn build_indexes(&self) -> Vec<ColumnIndexMetadata> {
108        let mut map: HashMap<ColumnId, IndexTypes> = HashMap::new();
109
110        if self.inverted_index.is_available() {
111            for &col in &self.inverted_index.columns {
112                map.entry(col).or_default().push(IndexType::InvertedIndex);
113            }
114        }
115        if self.fulltext_index.is_available() {
116            for &col in &self.fulltext_index.columns {
117                map.entry(col).or_default().push(IndexType::FulltextIndex);
118            }
119        }
120        if self.bloom_filter.is_available() {
121            for &col in &self.bloom_filter.columns {
122                map.entry(col)
123                    .or_default()
124                    .push(IndexType::BloomFilterIndex);
125            }
126        }
127
128        map.into_iter()
129            .map(|(column_id, created_indexes)| ColumnIndexMetadata {
130                column_id,
131                created_indexes,
132            })
133            .collect::<Vec<_>>()
134    }
135}
136
137/// Base output of the index creation.
138#[derive(Debug, Clone, Default)]
139pub struct IndexBaseOutput {
140    /// Size of the index.
141    pub index_size: ByteCount,
142    /// Number of rows in the index.
143    pub row_count: RowCount,
144    /// Available columns in the index.
145    pub columns: Vec<ColumnId>,
146}
147
148impl IndexBaseOutput {
149    pub fn is_available(&self) -> bool {
150        self.index_size > 0
151    }
152}
153
154/// Output of the inverted index creation.
155pub type InvertedIndexOutput = IndexBaseOutput;
156/// Output of the fulltext index creation.
157pub type FulltextIndexOutput = IndexBaseOutput;
158/// Output of the bloom filter creation.
159pub type BloomFilterOutput = IndexBaseOutput;
160
161/// The index creator that hides the error handling details.
162#[derive(Default)]
163pub struct Indexer {
164    file_id: FileId,
165    region_id: RegionId,
166    puffin_manager: Option<SstPuffinManager>,
167    inverted_indexer: Option<InvertedIndexer>,
168    last_mem_inverted_index: usize,
169    fulltext_indexer: Option<FulltextIndexer>,
170    last_mem_fulltext_index: usize,
171    bloom_filter_indexer: Option<BloomFilterIndexer>,
172    last_mem_bloom_filter: usize,
173    intermediate_manager: Option<IntermediateManager>,
174}
175
176impl Indexer {
177    /// Updates the index with the given batch.
178    pub async fn update(&mut self, batch: &mut Batch) {
179        self.do_update(batch).await;
180
181        self.flush_mem_metrics();
182    }
183
184    /// Updates the index with the given flat format RecordBatch.
185    pub async fn update_flat(&mut self, batch: &RecordBatch) {
186        self.do_update_flat(batch).await;
187
188        self.flush_mem_metrics();
189    }
190
191    /// Finalizes the index creation.
192    pub async fn finish(&mut self) -> IndexOutput {
193        let output = self.do_finish().await;
194
195        self.flush_mem_metrics();
196        output
197    }
198
199    /// Aborts the index creation.
200    pub async fn abort(&mut self) {
201        self.do_abort().await;
202
203        self.flush_mem_metrics();
204    }
205
206    fn flush_mem_metrics(&mut self) {
207        let inverted_mem = self
208            .inverted_indexer
209            .as_ref()
210            .map_or(0, |creator| creator.memory_usage());
211        INDEX_CREATE_MEMORY_USAGE
212            .with_label_values(&[TYPE_INVERTED_INDEX])
213            .add(inverted_mem as i64 - self.last_mem_inverted_index as i64);
214        self.last_mem_inverted_index = inverted_mem;
215
216        let fulltext_mem = self
217            .fulltext_indexer
218            .as_ref()
219            .map_or(0, |creator| creator.memory_usage());
220        INDEX_CREATE_MEMORY_USAGE
221            .with_label_values(&[TYPE_FULLTEXT_INDEX])
222            .add(fulltext_mem as i64 - self.last_mem_fulltext_index as i64);
223        self.last_mem_fulltext_index = fulltext_mem;
224
225        let bloom_filter_mem = self
226            .bloom_filter_indexer
227            .as_ref()
228            .map_or(0, |creator| creator.memory_usage());
229        INDEX_CREATE_MEMORY_USAGE
230            .with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
231            .add(bloom_filter_mem as i64 - self.last_mem_bloom_filter as i64);
232        self.last_mem_bloom_filter = bloom_filter_mem;
233    }
234}
235
236#[async_trait::async_trait]
237pub trait IndexerBuilder {
238    /// Builds indexer of given file id to [index_file_path].
239    async fn build(&self, file_id: FileId) -> Indexer;
240}
241#[derive(Clone)]
242pub(crate) struct IndexerBuilderImpl {
243    pub(crate) build_type: IndexBuildType,
244    pub(crate) metadata: RegionMetadataRef,
245    pub(crate) row_group_size: usize,
246    pub(crate) puffin_manager: SstPuffinManager,
247    pub(crate) intermediate_manager: IntermediateManager,
248    pub(crate) index_options: IndexOptions,
249    pub(crate) inverted_index_config: InvertedIndexConfig,
250    pub(crate) fulltext_index_config: FulltextIndexConfig,
251    pub(crate) bloom_filter_index_config: BloomFilterConfig,
252}
253
254#[async_trait::async_trait]
255impl IndexerBuilder for IndexerBuilderImpl {
256    /// Sanity check for arguments and create a new [Indexer] if arguments are valid.
257    async fn build(&self, file_id: FileId) -> Indexer {
258        let mut indexer = Indexer {
259            file_id,
260            region_id: self.metadata.region_id,
261            ..Default::default()
262        };
263
264        indexer.inverted_indexer = self.build_inverted_indexer(file_id);
265        indexer.fulltext_indexer = self.build_fulltext_indexer(file_id).await;
266        indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(file_id);
267        indexer.intermediate_manager = Some(self.intermediate_manager.clone());
268        if indexer.inverted_indexer.is_none()
269            && indexer.fulltext_indexer.is_none()
270            && indexer.bloom_filter_indexer.is_none()
271        {
272            indexer.abort().await;
273            return Indexer::default();
274        }
275
276        indexer.puffin_manager = Some(self.puffin_manager.clone());
277        indexer
278    }
279}
280
281impl IndexerBuilderImpl {
282    fn build_inverted_indexer(&self, file_id: FileId) -> Option<InvertedIndexer> {
283        let create = match self.build_type {
284            IndexBuildType::Flush => self.inverted_index_config.create_on_flush.auto(),
285            IndexBuildType::Compact => self.inverted_index_config.create_on_compaction.auto(),
286            _ => true,
287        };
288
289        if !create {
290            debug!(
291                "Skip creating inverted index due to config, region_id: {}, file_id: {}",
292                self.metadata.region_id, file_id,
293            );
294            return None;
295        }
296
297        let indexed_column_ids = self.metadata.inverted_indexed_column_ids(
298            self.index_options.inverted_index.ignore_column_ids.iter(),
299        );
300        if indexed_column_ids.is_empty() {
301            debug!(
302                "No columns to be indexed, skip creating inverted index, region_id: {}, file_id: {}",
303                self.metadata.region_id, file_id,
304            );
305            return None;
306        }
307
308        let Some(mut segment_row_count) =
309            NonZeroUsize::new(self.index_options.inverted_index.segment_row_count)
310        else {
311            warn!(
312                "Segment row count is 0, skip creating index, region_id: {}, file_id: {}",
313                self.metadata.region_id, file_id,
314            );
315            return None;
316        };
317
318        let Some(row_group_size) = NonZeroUsize::new(self.row_group_size) else {
319            warn!(
320                "Row group size is 0, skip creating index, region_id: {}, file_id: {}",
321                self.metadata.region_id, file_id,
322            );
323            return None;
324        };
325
326        // if segment row count not aligned with row group size, adjust it to be aligned.
327        if row_group_size.get() % segment_row_count.get() != 0 {
328            segment_row_count = row_group_size;
329        }
330
331        let indexer = InvertedIndexer::new(
332            file_id,
333            &self.metadata,
334            self.intermediate_manager.clone(),
335            self.inverted_index_config.mem_threshold_on_create(),
336            segment_row_count,
337            indexed_column_ids,
338        );
339
340        Some(indexer)
341    }
342
343    async fn build_fulltext_indexer(&self, file_id: FileId) -> Option<FulltextIndexer> {
344        let create = match self.build_type {
345            IndexBuildType::Flush => self.fulltext_index_config.create_on_flush.auto(),
346            IndexBuildType::Compact => self.fulltext_index_config.create_on_compaction.auto(),
347            _ => true,
348        };
349
350        if !create {
351            debug!(
352                "Skip creating full-text index due to config, region_id: {}, file_id: {}",
353                self.metadata.region_id, file_id,
354            );
355            return None;
356        }
357
358        let mem_limit = self.fulltext_index_config.mem_threshold_on_create();
359        let creator = FulltextIndexer::new(
360            &self.metadata.region_id,
361            &file_id,
362            &self.intermediate_manager,
363            &self.metadata,
364            self.fulltext_index_config.compress,
365            mem_limit,
366        )
367        .await;
368
369        let err = match creator {
370            Ok(creator) => {
371                if creator.is_none() {
372                    debug!(
373                        "Skip creating full-text index due to no columns require indexing, region_id: {}, file_id: {}",
374                        self.metadata.region_id, file_id,
375                    );
376                }
377                return creator;
378            }
379            Err(err) => err,
380        };
381
382        if cfg!(any(test, feature = "test")) {
383            panic!(
384                "Failed to create full-text indexer, region_id: {}, file_id: {}, err: {:?}",
385                self.metadata.region_id, file_id, err
386            );
387        } else {
388            warn!(
389                err; "Failed to create full-text indexer, region_id: {}, file_id: {}",
390                self.metadata.region_id, file_id,
391            );
392        }
393
394        None
395    }
396
397    fn build_bloom_filter_indexer(&self, file_id: FileId) -> Option<BloomFilterIndexer> {
398        let create = match self.build_type {
399            IndexBuildType::Flush => self.bloom_filter_index_config.create_on_flush.auto(),
400            IndexBuildType::Compact => self.bloom_filter_index_config.create_on_compaction.auto(),
401            _ => true,
402        };
403
404        if !create {
405            debug!(
406                "Skip creating bloom filter due to config, region_id: {}, file_id: {}",
407                self.metadata.region_id, file_id,
408            );
409            return None;
410        }
411
412        let mem_limit = self.bloom_filter_index_config.mem_threshold_on_create();
413        let indexer = BloomFilterIndexer::new(
414            file_id,
415            &self.metadata,
416            self.intermediate_manager.clone(),
417            mem_limit,
418        );
419
420        let err = match indexer {
421            Ok(indexer) => {
422                if indexer.is_none() {
423                    debug!(
424                        "Skip creating bloom filter due to no columns require indexing, region_id: {}, file_id: {}",
425                        self.metadata.region_id, file_id,
426                    );
427                }
428                return indexer;
429            }
430            Err(err) => err,
431        };
432
433        if cfg!(any(test, feature = "test")) {
434            panic!(
435                "Failed to create bloom filter, region_id: {}, file_id: {}, err: {:?}",
436                self.metadata.region_id, file_id, err
437            );
438        } else {
439            warn!(
440                err; "Failed to create bloom filter, region_id: {}, file_id: {}",
441                self.metadata.region_id, file_id,
442            );
443        }
444
445        None
446    }
447}
448
449/// Type of an index build task.
450#[derive(Debug, Clone, IntoStaticStr, PartialEq)]
451pub enum IndexBuildType {
452    /// Build index when schema change.
453    SchemaChange,
454    /// Create or update index after flush.
455    Flush,
456    /// Create or update index after compact.
457    Compact,
458    /// Manually build index.
459    Manual,
460}
461
462impl IndexBuildType {
463    fn as_str(&self) -> &'static str {
464        self.into()
465    }
466
467    // Higher value means higher priority.
468    fn priority(&self) -> u8 {
469        match self {
470            IndexBuildType::Manual => 3,
471            IndexBuildType::SchemaChange => 2,
472            IndexBuildType::Flush => 1,
473            IndexBuildType::Compact => 0,
474        }
475    }
476}
477
478impl From<OperationType> for IndexBuildType {
479    fn from(op_type: OperationType) -> Self {
480        match op_type {
481            OperationType::Flush => IndexBuildType::Flush,
482            OperationType::Compact => IndexBuildType::Compact,
483        }
484    }
485}
486
487/// Outcome of an index build task.
488#[derive(Debug, Clone, PartialEq, Eq, Hash)]
489pub enum IndexBuildOutcome {
490    Finished,
491    Aborted(String),
492}
493
494/// Mpsc output result sender.
495pub type ResultMpscSender = Sender<Result<IndexBuildOutcome>>;
496
497#[derive(Clone)]
498pub struct IndexBuildTask {
499    /// The file meta to build index for.
500    pub file_meta: FileMeta,
501    pub reason: IndexBuildType,
502    pub access_layer: AccessLayerRef,
503    pub(crate) listener: WorkerListener,
504    pub(crate) manifest_ctx: ManifestContextRef,
505    pub write_cache: Option<WriteCacheRef>,
506    pub file_purger: FilePurgerRef,
507    /// When write cache is enabled, the indexer builder should be built from the write cache.
508    /// Otherwise, it should be built from the access layer.
509    pub indexer_builder: Arc<dyn IndexerBuilder + Send + Sync>,
510    /// Request sender to notify the region worker.
511    pub(crate) request_sender: Sender<WorkerRequestWithTime>,
512    /// Index build result sender.
513    pub(crate) result_sender: ResultMpscSender,
514}
515
516impl std::fmt::Debug for IndexBuildTask {
517    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
518        f.debug_struct("IndexBuildTask")
519            .field("region_id", &self.file_meta.region_id)
520            .field("file_id", &self.file_meta.file_id)
521            .field("reason", &self.reason)
522            .finish()
523    }
524}
525
526impl IndexBuildTask {
527    /// Notify the caller the job is success.
528    pub async fn on_success(&self, outcome: IndexBuildOutcome) {
529        let _ = self.result_sender.send(Ok(outcome)).await;
530    }
531
532    /// Send index build error to waiter.
533    pub async fn on_failure(&self, err: Arc<Error>) {
534        let _ = self
535            .result_sender
536            .send(Err(err.clone()).context(BuildIndexAsyncSnafu {
537                region_id: self.file_meta.region_id,
538            }))
539            .await;
540    }
541
542    fn into_index_build_job(mut self, version_control: VersionControlRef) -> Job {
543        Box::pin(async move {
544            self.do_index_build(version_control).await;
545        })
546    }
547
548    async fn do_index_build(&mut self, version_control: VersionControlRef) {
549        self.listener
550            .on_index_build_begin(RegionFileId::new(
551                self.file_meta.region_id,
552                self.file_meta.file_id,
553            ))
554            .await;
555        match self.index_build(version_control).await {
556            Ok(outcome) => self.on_success(outcome).await,
557            Err(e) => {
558                warn!(
559                    e; "Index build task failed, region: {}, file_id: {}",
560                    self.file_meta.region_id, self.file_meta.file_id,
561                );
562                self.on_failure(e.into()).await
563            }
564        }
565        let worker_request = WorkerRequest::Background {
566            region_id: self.file_meta.region_id,
567            notify: BackgroundNotify::IndexBuildStopped(IndexBuildStopped {
568                region_id: self.file_meta.region_id,
569                file_id: self.file_meta.file_id,
570            }),
571        };
572        let _ = self
573            .request_sender
574            .send(WorkerRequestWithTime::new(worker_request))
575            .await;
576    }
577
578    // Checks if the SST file still exists in object store and version to avoid conflict with compaction.
579    async fn check_sst_file_exists(&self, version_control: &VersionControlRef) -> bool {
580        let file_id = self.file_meta.file_id;
581        let level = self.file_meta.level;
582        // We should check current version instead of the version when the job is created.
583        let version = version_control.current().version;
584
585        let Some(level_files) = version.ssts.levels().get(level as usize) else {
586            warn!(
587                "File id {} not found in level {} for index build, region: {}",
588                file_id, level, self.file_meta.region_id
589            );
590            return false;
591        };
592
593        match level_files.files.get(&file_id) {
594            Some(handle) if !handle.is_deleted() && !handle.compacting() => {
595                // If the file's metadata is present in the current version, the physical SST file
596                // is guaranteed to exist on object store. The file purger removes the physical
597                // file only after its metadata is removed from the version.
598                true
599            }
600            _ => {
601                warn!(
602                    "File id {} not found in region version for index build, region: {}",
603                    file_id, self.file_meta.region_id
604                );
605                false
606            }
607        }
608    }
609
610    async fn index_build(
611        &mut self,
612        version_control: VersionControlRef,
613    ) -> Result<IndexBuildOutcome> {
614        let index_file_id = if self.file_meta.index_file_size > 0 {
615            // Generate new file ID if index file exists to avoid overwrite.
616            FileId::random()
617        } else {
618            self.file_meta.file_id
619        };
620        let mut indexer = self.indexer_builder.build(index_file_id).await;
621
622        // Check SST file existence before building index to avoid failure of parquet reader.
623        if !self.check_sst_file_exists(&version_control).await {
624            // Calls abort to clean up index files.
625            indexer.abort().await;
626            self.listener
627                .on_index_build_abort(RegionFileId::new(
628                    self.file_meta.region_id,
629                    self.file_meta.file_id,
630                ))
631                .await;
632            return Ok(IndexBuildOutcome::Aborted(format!(
633                "SST file not found during index build, region: {}, file_id: {}",
634                self.file_meta.region_id, self.file_meta.file_id
635            )));
636        }
637
638        let mut parquet_reader = self
639            .access_layer
640            .read_sst(FileHandle::new(
641                self.file_meta.clone(),
642                self.file_purger.clone(),
643            ))
644            .build()
645            .await?;
646
647        // TODO(SNC123): optimize index batch
648        loop {
649            match parquet_reader.next_batch().await {
650                Ok(Some(batch)) => {
651                    indexer.update(&mut batch.clone()).await;
652                }
653                Ok(None) => break,
654                Err(e) => {
655                    indexer.abort().await;
656                    return Err(e);
657                }
658            }
659        }
660        let index_output = indexer.finish().await;
661
662        if index_output.file_size > 0 {
663            // Check SST file existence again after building index.
664            if !self.check_sst_file_exists(&version_control).await {
665                // Calls abort to clean up index files.
666                indexer.abort().await;
667                self.listener
668                    .on_index_build_abort(RegionFileId::new(
669                        self.file_meta.region_id,
670                        self.file_meta.file_id,
671                    ))
672                    .await;
673                return Ok(IndexBuildOutcome::Aborted(format!(
674                    "SST file not found during index build, region: {}, file_id: {}",
675                    self.file_meta.region_id, self.file_meta.file_id
676                )));
677            }
678
679            // Upload index file if write cache is enabled.
680            self.maybe_upload_index_file(index_output.clone(), index_file_id)
681                .await?;
682
683            let worker_request = match self.update_manifest(index_output, index_file_id).await {
684                Ok(edit) => {
685                    let index_build_finished = IndexBuildFinished {
686                        region_id: self.file_meta.region_id,
687                        edit,
688                    };
689                    WorkerRequest::Background {
690                        region_id: self.file_meta.region_id,
691                        notify: BackgroundNotify::IndexBuildFinished(index_build_finished),
692                    }
693                }
694                Err(e) => {
695                    let err = Arc::new(e);
696                    WorkerRequest::Background {
697                        region_id: self.file_meta.region_id,
698                        notify: BackgroundNotify::IndexBuildFailed(IndexBuildFailed { err }),
699                    }
700                }
701            };
702
703            let _ = self
704                .request_sender
705                .send(WorkerRequestWithTime::new(worker_request))
706                .await;
707        }
708        Ok(IndexBuildOutcome::Finished)
709    }
710
711    async fn maybe_upload_index_file(
712        &self,
713        output: IndexOutput,
714        index_file_id: FileId,
715    ) -> Result<()> {
716        if let Some(write_cache) = &self.write_cache {
717            let file_id = self.file_meta.file_id;
718            let region_id = self.file_meta.region_id;
719            let remote_store = self.access_layer.object_store();
720            let mut upload_tracker = UploadTracker::new(region_id);
721            let mut err = None;
722            let puffin_key = IndexKey::new(region_id, index_file_id, FileType::Puffin);
723            let puffin_path = RegionFilePathFactory::new(
724                self.access_layer.table_dir().to_string(),
725                self.access_layer.path_type(),
726            )
727            .build_index_file_path(RegionFileId::new(region_id, file_id));
728            if let Err(e) = write_cache
729                .upload(puffin_key, &puffin_path, remote_store)
730                .await
731            {
732                err = Some(e);
733            }
734            upload_tracker.push_uploaded_file(puffin_path);
735            if let Some(err) = err {
736                // Cleans index files on failure.
737                upload_tracker
738                    .clean(
739                        &smallvec![SstInfo {
740                            file_id,
741                            index_metadata: output,
742                            ..Default::default()
743                        }],
744                        &write_cache.file_cache(),
745                        remote_store,
746                    )
747                    .await;
748                return Err(err);
749            }
750        } else {
751            debug!("write cache is not available, skip uploading index file");
752        }
753        Ok(())
754    }
755
756    async fn update_manifest(
757        &mut self,
758        output: IndexOutput,
759        index_file_id: FileId,
760    ) -> Result<RegionEdit> {
761        self.file_meta.available_indexes = output.build_available_indexes();
762        self.file_meta.indexes = output.build_indexes();
763        self.file_meta.index_file_size = output.file_size;
764        self.file_meta.index_file_id = Some(index_file_id);
765        let edit = RegionEdit {
766            files_to_add: vec![self.file_meta.clone()],
767            files_to_remove: vec![],
768            timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
769            flushed_sequence: None,
770            flushed_entry_id: None,
771            committed_sequence: None,
772            compaction_time_window: None,
773        };
774        let version = self
775            .manifest_ctx
776            .update_manifest(
777                RegionLeaderState::Writable,
778                RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())),
779            )
780            .await?;
781        info!(
782            "Successfully update manifest version to {version}, region: {}, reason: {}",
783            self.file_meta.region_id,
784            self.reason.as_str()
785        );
786        Ok(edit)
787    }
788}
789
790impl PartialEq for IndexBuildTask {
791    fn eq(&self, other: &Self) -> bool {
792        self.reason.priority() == other.reason.priority()
793    }
794}
795
796impl Eq for IndexBuildTask {}
797
798impl PartialOrd for IndexBuildTask {
799    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
800        Some(self.cmp(other))
801    }
802}
803
804impl Ord for IndexBuildTask {
805    fn cmp(&self, other: &Self) -> Ordering {
806        self.reason.priority().cmp(&other.reason.priority())
807    }
808}
809
810/// Tracks the index build status of a region scheduled by the [IndexBuildScheduler].
811pub struct IndexBuildStatus {
812    pub region_id: RegionId,
813    pub building_files: HashSet<FileId>,
814    pub pending_tasks: BinaryHeap<IndexBuildTask>,
815}
816
817impl IndexBuildStatus {
818    pub fn new(region_id: RegionId) -> Self {
819        IndexBuildStatus {
820            region_id,
821            building_files: HashSet::new(),
822            pending_tasks: BinaryHeap::new(),
823        }
824    }
825
826    async fn on_failure(self, err: Arc<Error>) {
827        for task in self.pending_tasks {
828            task.on_failure(err.clone()).await;
829        }
830    }
831}
832
833pub struct IndexBuildScheduler {
834    /// Background job scheduler.
835    scheduler: SchedulerRef,
836    /// Tracks regions need to build index.
837    region_status: HashMap<RegionId, IndexBuildStatus>,
838    /// Limit of files allowed to build index concurrently for a region.
839    files_limit: usize,
840}
841
842/// Manager background index build tasks of a worker.
843impl IndexBuildScheduler {
844    pub fn new(scheduler: SchedulerRef, files_limit: usize) -> Self {
845        IndexBuildScheduler {
846            scheduler,
847            region_status: HashMap::new(),
848            files_limit,
849        }
850    }
851
852    pub(crate) async fn schedule_build(
853        &mut self,
854        version_control: &VersionControlRef,
855        task: IndexBuildTask,
856    ) -> Result<()> {
857        let status = self
858            .region_status
859            .entry(task.file_meta.region_id)
860            .or_insert_with(|| IndexBuildStatus::new(task.file_meta.region_id));
861
862        if status.building_files.contains(&task.file_meta.file_id) {
863            let region_file_id =
864                RegionFileId::new(task.file_meta.region_id, task.file_meta.file_id);
865            debug!(
866                "Aborting index build task since index is already being built for region file {:?}",
867                region_file_id
868            );
869            task.on_success(IndexBuildOutcome::Aborted(format!(
870                "Index is already being built for region file {:?}",
871                region_file_id
872            )))
873            .await;
874            task.listener.on_index_build_abort(region_file_id).await;
875            return Ok(());
876        }
877
878        status.pending_tasks.push(task);
879
880        self.schedule_next_build_batch(version_control);
881        Ok(())
882    }
883
884    /// Schedule tasks until reaching the files limit or no more tasks.
885    fn schedule_next_build_batch(&mut self, version_control: &VersionControlRef) {
886        let mut building_count = 0;
887        for status in self.region_status.values() {
888            building_count += status.building_files.len();
889        }
890
891        while building_count < self.files_limit {
892            if let Some(task) = self.find_next_task() {
893                let region_id = task.file_meta.region_id;
894                let file_id = task.file_meta.file_id;
895                let job = task.into_index_build_job(version_control.clone());
896                if self.scheduler.schedule(job).is_ok() {
897                    if let Some(status) = self.region_status.get_mut(&region_id) {
898                        status.building_files.insert(file_id);
899                        building_count += 1;
900                        status
901                            .pending_tasks
902                            .retain(|t| t.file_meta.file_id != file_id);
903                    } else {
904                        error!(
905                            "Region status not found when scheduling index build task, region: {}",
906                            region_id
907                        );
908                    }
909                } else {
910                    error!(
911                        "Failed to schedule index build job, region: {}, file_id: {}",
912                        region_id, file_id
913                    );
914                }
915            } else {
916                // No more tasks to schedule.
917                break;
918            }
919        }
920    }
921
922    /// Find the next task which has the highest priority to run.
923    fn find_next_task(&self) -> Option<IndexBuildTask> {
924        self.region_status
925            .iter()
926            .filter_map(|(_, status)| status.pending_tasks.peek())
927            .max()
928            .cloned()
929    }
930
931    pub(crate) fn on_task_stopped(
932        &mut self,
933        region_id: RegionId,
934        file_id: FileId,
935        version_control: &VersionControlRef,
936    ) {
937        if let Some(status) = self.region_status.get_mut(&region_id) {
938            status.building_files.remove(&file_id);
939            if status.building_files.is_empty() && status.pending_tasks.is_empty() {
940                // No more tasks for this region, remove it.
941                self.region_status.remove(&region_id);
942            }
943        }
944
945        self.schedule_next_build_batch(version_control);
946    }
947
948    pub(crate) async fn on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
949        error!(
950            err; "Index build scheduler encountered failure for region {}, removing all pending tasks.",
951            region_id
952        );
953        let Some(status) = self.region_status.remove(&region_id) else {
954            return;
955        };
956        status.on_failure(err).await;
957    }
958
959    /// Notifies the scheduler that the region is dropped.
960    pub(crate) async fn on_region_dropped(&mut self, region_id: RegionId) {
961        self.remove_region_on_failure(
962            region_id,
963            Arc::new(RegionDroppedSnafu { region_id }.build()),
964        )
965        .await;
966    }
967
968    /// Notifies the scheduler that the region is closed.
969    pub(crate) async fn on_region_closed(&mut self, region_id: RegionId) {
970        self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()))
971            .await;
972    }
973
974    /// Notifies the scheduler that the region is truncated.
975    pub(crate) async fn on_region_truncated(&mut self, region_id: RegionId) {
976        self.remove_region_on_failure(
977            region_id,
978            Arc::new(RegionTruncatedSnafu { region_id }.build()),
979        )
980        .await;
981    }
982
983    async fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
984        let Some(status) = self.region_status.remove(&region_id) else {
985            return;
986        };
987        status.on_failure(err).await;
988    }
989}
990
991/// Decodes primary keys from a flat format RecordBatch.
992/// Returns a list of (decoded_pk_value, count) tuples where count is the number of occurrences.
993pub(crate) fn decode_primary_keys_with_counts(
994    batch: &RecordBatch,
995    codec: &IndexValuesCodec,
996) -> Result<Vec<(CompositeValues, usize)>> {
997    let primary_key_index = primary_key_column_index(batch.num_columns());
998    let pk_dict_array = batch
999        .column(primary_key_index)
1000        .as_any()
1001        .downcast_ref::<PrimaryKeyArray>()
1002        .context(InvalidRecordBatchSnafu {
1003            reason: "Primary key column is not a dictionary array",
1004        })?;
1005    let pk_values_array = pk_dict_array
1006        .values()
1007        .as_any()
1008        .downcast_ref::<BinaryArray>()
1009        .context(InvalidRecordBatchSnafu {
1010            reason: "Primary key values are not binary array",
1011        })?;
1012    let keys = pk_dict_array.keys();
1013
1014    // Decodes primary keys and count consecutive occurrences
1015    let mut result: Vec<(CompositeValues, usize)> = Vec::new();
1016    let mut prev_key: Option<u32> = None;
1017
1018    for i in 0..keys.len() {
1019        let current_key = keys.value(i);
1020
1021        // Checks if current key is the same as previous key
1022        if let Some(prev) = prev_key
1023            && prev == current_key
1024        {
1025            // Safety: We already have a key in the result vector.
1026            result.last_mut().unwrap().1 += 1;
1027            continue;
1028        }
1029
1030        // New key, decodes it.
1031        let pk_bytes = pk_values_array.value(current_key as usize);
1032        let decoded_value = codec.decoder().decode(pk_bytes).context(DecodeSnafu)?;
1033
1034        result.push((decoded_value, 1));
1035        prev_key = Some(current_key);
1036    }
1037
1038    Ok(result)
1039}
1040
1041#[cfg(test)]
1042mod tests {
1043    use std::sync::Arc;
1044
1045    use api::v1::SemanticType;
1046    use common_base::readable_size::ReadableSize;
1047    use datafusion_common::HashMap;
1048    use datatypes::data_type::ConcreteDataType;
1049    use datatypes::schema::{
1050        ColumnSchema, FulltextOptions, SkippingIndexOptions, SkippingIndexType,
1051    };
1052    use object_store::ObjectStore;
1053    use object_store::services::Memory;
1054    use puffin_manager::PuffinManagerFactory;
1055    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1056    use tokio::sync::mpsc;
1057
1058    use super::*;
1059    use crate::access_layer::{FilePathProvider, Metrics, SstWriteRequest, WriteType};
1060    use crate::cache::write_cache::WriteCache;
1061    use crate::config::{FulltextIndexConfig, IndexBuildMode, MitoConfig, Mode};
1062    use crate::memtable::time_partition::TimePartitions;
1063    use crate::region::version::{VersionBuilder, VersionControl};
1064    use crate::sst::file::RegionFileId;
1065    use crate::sst::file_purger::NoopFilePurger;
1066    use crate::sst::location;
1067    use crate::sst::parquet::WriteOptions;
1068    use crate::test_util::memtable_util::EmptyMemtableBuilder;
1069    use crate::test_util::scheduler_util::SchedulerEnv;
1070    use crate::test_util::sst_util::{new_batch_by_range, new_source, sst_region_metadata};
1071
1072    struct MetaConfig {
1073        with_inverted: bool,
1074        with_fulltext: bool,
1075        with_skipping_bloom: bool,
1076    }
1077
1078    fn mock_region_metadata(
1079        MetaConfig {
1080            with_inverted,
1081            with_fulltext,
1082            with_skipping_bloom,
1083        }: MetaConfig,
1084    ) -> RegionMetadataRef {
1085        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
1086        let mut column_schema = ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false);
1087        if with_inverted {
1088            column_schema = column_schema.with_inverted_index(true);
1089        }
1090        builder
1091            .push_column_metadata(ColumnMetadata {
1092                column_schema,
1093                semantic_type: SemanticType::Field,
1094                column_id: 1,
1095            })
1096            .push_column_metadata(ColumnMetadata {
1097                column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
1098                semantic_type: SemanticType::Field,
1099                column_id: 2,
1100            })
1101            .push_column_metadata(ColumnMetadata {
1102                column_schema: ColumnSchema::new(
1103                    "c",
1104                    ConcreteDataType::timestamp_millisecond_datatype(),
1105                    false,
1106                ),
1107                semantic_type: SemanticType::Timestamp,
1108                column_id: 3,
1109            });
1110
1111        if with_fulltext {
1112            let column_schema =
1113                ColumnSchema::new("text", ConcreteDataType::string_datatype(), true)
1114                    .with_fulltext_options(FulltextOptions {
1115                        enable: true,
1116                        ..Default::default()
1117                    })
1118                    .unwrap();
1119
1120            let column = ColumnMetadata {
1121                column_schema,
1122                semantic_type: SemanticType::Field,
1123                column_id: 4,
1124            };
1125
1126            builder.push_column_metadata(column);
1127        }
1128
1129        if with_skipping_bloom {
1130            let column_schema =
1131                ColumnSchema::new("bloom", ConcreteDataType::string_datatype(), false)
1132                    .with_skipping_options(SkippingIndexOptions::new_unchecked(
1133                        42,
1134                        0.01,
1135                        SkippingIndexType::BloomFilter,
1136                    ))
1137                    .unwrap();
1138
1139            let column = ColumnMetadata {
1140                column_schema,
1141                semantic_type: SemanticType::Field,
1142                column_id: 5,
1143            };
1144
1145            builder.push_column_metadata(column);
1146        }
1147
1148        Arc::new(builder.build().unwrap())
1149    }
1150
1151    fn mock_object_store() -> ObjectStore {
1152        ObjectStore::new(Memory::default()).unwrap().finish()
1153    }
1154
1155    async fn mock_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
1156        IntermediateManager::init_fs(path).await.unwrap()
1157    }
1158    struct NoopPathProvider;
1159
1160    impl FilePathProvider for NoopPathProvider {
1161        fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
1162            unreachable!()
1163        }
1164
1165        fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
1166            unreachable!()
1167        }
1168    }
1169
1170    async fn mock_sst_file(
1171        metadata: RegionMetadataRef,
1172        env: &SchedulerEnv,
1173        build_mode: IndexBuildMode,
1174    ) -> SstInfo {
1175        let source = new_source(&[
1176            new_batch_by_range(&["a", "d"], 0, 60),
1177            new_batch_by_range(&["b", "f"], 0, 40),
1178            new_batch_by_range(&["b", "h"], 100, 200),
1179        ]);
1180        let mut index_config = MitoConfig::default().index;
1181        index_config.build_mode = build_mode;
1182        let write_request = SstWriteRequest {
1183            op_type: OperationType::Flush,
1184            metadata: metadata.clone(),
1185            source: either::Left(source),
1186            storage: None,
1187            max_sequence: None,
1188            cache_manager: Default::default(),
1189            index_options: IndexOptions::default(),
1190            index_config,
1191            inverted_index_config: Default::default(),
1192            fulltext_index_config: Default::default(),
1193            bloom_filter_index_config: Default::default(),
1194        };
1195        let mut metrics = Metrics::new(WriteType::Flush);
1196        env.access_layer
1197            .write_sst(write_request, &WriteOptions::default(), &mut metrics)
1198            .await
1199            .unwrap()
1200            .remove(0)
1201    }
1202
1203    async fn mock_version_control(
1204        metadata: RegionMetadataRef,
1205        file_purger: FilePurgerRef,
1206        files: HashMap<FileId, FileMeta>,
1207    ) -> VersionControlRef {
1208        let mutable = Arc::new(TimePartitions::new(
1209            metadata.clone(),
1210            Arc::new(EmptyMemtableBuilder::default()),
1211            0,
1212            None,
1213        ));
1214        let version_builder = VersionBuilder::new(metadata, mutable)
1215            .add_files(file_purger, files.values().cloned())
1216            .build();
1217        Arc::new(VersionControl::new(version_builder))
1218    }
1219
1220    async fn mock_indexer_builder(
1221        metadata: RegionMetadataRef,
1222        env: &SchedulerEnv,
1223    ) -> Arc<dyn IndexerBuilder + Send + Sync> {
1224        let (dir, factory) = PuffinManagerFactory::new_for_test_async("mock_indexer_builder").await;
1225        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1226        let puffin_manager = factory.build(
1227            env.access_layer.object_store().clone(),
1228            RegionFilePathFactory::new(
1229                env.access_layer.table_dir().to_string(),
1230                env.access_layer.path_type(),
1231            ),
1232        );
1233        Arc::new(IndexerBuilderImpl {
1234            build_type: IndexBuildType::Flush,
1235            metadata,
1236            row_group_size: 1024,
1237            puffin_manager,
1238            intermediate_manager: intm_manager,
1239            index_options: IndexOptions::default(),
1240            inverted_index_config: InvertedIndexConfig::default(),
1241            fulltext_index_config: FulltextIndexConfig::default(),
1242            bloom_filter_index_config: BloomFilterConfig::default(),
1243        })
1244    }
1245
1246    #[tokio::test]
1247    async fn test_build_indexer_basic() {
1248        let (dir, factory) =
1249            PuffinManagerFactory::new_for_test_async("test_build_indexer_basic_").await;
1250        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1251
1252        let metadata = mock_region_metadata(MetaConfig {
1253            with_inverted: true,
1254            with_fulltext: true,
1255            with_skipping_bloom: true,
1256        });
1257        let indexer = IndexerBuilderImpl {
1258            build_type: IndexBuildType::Flush,
1259            metadata,
1260            row_group_size: 1024,
1261            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1262            intermediate_manager: intm_manager,
1263            index_options: IndexOptions::default(),
1264            inverted_index_config: InvertedIndexConfig::default(),
1265            fulltext_index_config: FulltextIndexConfig::default(),
1266            bloom_filter_index_config: BloomFilterConfig::default(),
1267        }
1268        .build(FileId::random())
1269        .await;
1270
1271        assert!(indexer.inverted_indexer.is_some());
1272        assert!(indexer.fulltext_indexer.is_some());
1273        assert!(indexer.bloom_filter_indexer.is_some());
1274    }
1275
1276    #[tokio::test]
1277    async fn test_build_indexer_disable_create() {
1278        let (dir, factory) =
1279            PuffinManagerFactory::new_for_test_async("test_build_indexer_disable_create_").await;
1280        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1281
1282        let metadata = mock_region_metadata(MetaConfig {
1283            with_inverted: true,
1284            with_fulltext: true,
1285            with_skipping_bloom: true,
1286        });
1287        let indexer = IndexerBuilderImpl {
1288            build_type: IndexBuildType::Flush,
1289            metadata: metadata.clone(),
1290            row_group_size: 1024,
1291            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1292            intermediate_manager: intm_manager.clone(),
1293            index_options: IndexOptions::default(),
1294            inverted_index_config: InvertedIndexConfig {
1295                create_on_flush: Mode::Disable,
1296                ..Default::default()
1297            },
1298            fulltext_index_config: FulltextIndexConfig::default(),
1299            bloom_filter_index_config: BloomFilterConfig::default(),
1300        }
1301        .build(FileId::random())
1302        .await;
1303
1304        assert!(indexer.inverted_indexer.is_none());
1305        assert!(indexer.fulltext_indexer.is_some());
1306        assert!(indexer.bloom_filter_indexer.is_some());
1307
1308        let indexer = IndexerBuilderImpl {
1309            build_type: IndexBuildType::Compact,
1310            metadata: metadata.clone(),
1311            row_group_size: 1024,
1312            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1313            intermediate_manager: intm_manager.clone(),
1314            index_options: IndexOptions::default(),
1315            inverted_index_config: InvertedIndexConfig::default(),
1316            fulltext_index_config: FulltextIndexConfig {
1317                create_on_compaction: Mode::Disable,
1318                ..Default::default()
1319            },
1320            bloom_filter_index_config: BloomFilterConfig::default(),
1321        }
1322        .build(FileId::random())
1323        .await;
1324
1325        assert!(indexer.inverted_indexer.is_some());
1326        assert!(indexer.fulltext_indexer.is_none());
1327        assert!(indexer.bloom_filter_indexer.is_some());
1328
1329        let indexer = IndexerBuilderImpl {
1330            build_type: IndexBuildType::Compact,
1331            metadata,
1332            row_group_size: 1024,
1333            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1334            intermediate_manager: intm_manager,
1335            index_options: IndexOptions::default(),
1336            inverted_index_config: InvertedIndexConfig::default(),
1337            fulltext_index_config: FulltextIndexConfig::default(),
1338            bloom_filter_index_config: BloomFilterConfig {
1339                create_on_compaction: Mode::Disable,
1340                ..Default::default()
1341            },
1342        }
1343        .build(FileId::random())
1344        .await;
1345
1346        assert!(indexer.inverted_indexer.is_some());
1347        assert!(indexer.fulltext_indexer.is_some());
1348        assert!(indexer.bloom_filter_indexer.is_none());
1349    }
1350
1351    #[tokio::test]
1352    async fn test_build_indexer_no_required() {
1353        let (dir, factory) =
1354            PuffinManagerFactory::new_for_test_async("test_build_indexer_no_required_").await;
1355        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1356
1357        let metadata = mock_region_metadata(MetaConfig {
1358            with_inverted: false,
1359            with_fulltext: true,
1360            with_skipping_bloom: true,
1361        });
1362        let indexer = IndexerBuilderImpl {
1363            build_type: IndexBuildType::Flush,
1364            metadata: metadata.clone(),
1365            row_group_size: 1024,
1366            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1367            intermediate_manager: intm_manager.clone(),
1368            index_options: IndexOptions::default(),
1369            inverted_index_config: InvertedIndexConfig::default(),
1370            fulltext_index_config: FulltextIndexConfig::default(),
1371            bloom_filter_index_config: BloomFilterConfig::default(),
1372        }
1373        .build(FileId::random())
1374        .await;
1375
1376        assert!(indexer.inverted_indexer.is_none());
1377        assert!(indexer.fulltext_indexer.is_some());
1378        assert!(indexer.bloom_filter_indexer.is_some());
1379
1380        let metadata = mock_region_metadata(MetaConfig {
1381            with_inverted: true,
1382            with_fulltext: false,
1383            with_skipping_bloom: true,
1384        });
1385        let indexer = IndexerBuilderImpl {
1386            build_type: IndexBuildType::Flush,
1387            metadata: metadata.clone(),
1388            row_group_size: 1024,
1389            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1390            intermediate_manager: intm_manager.clone(),
1391            index_options: IndexOptions::default(),
1392            inverted_index_config: InvertedIndexConfig::default(),
1393            fulltext_index_config: FulltextIndexConfig::default(),
1394            bloom_filter_index_config: BloomFilterConfig::default(),
1395        }
1396        .build(FileId::random())
1397        .await;
1398
1399        assert!(indexer.inverted_indexer.is_some());
1400        assert!(indexer.fulltext_indexer.is_none());
1401        assert!(indexer.bloom_filter_indexer.is_some());
1402
1403        let metadata = mock_region_metadata(MetaConfig {
1404            with_inverted: true,
1405            with_fulltext: true,
1406            with_skipping_bloom: false,
1407        });
1408        let indexer = IndexerBuilderImpl {
1409            build_type: IndexBuildType::Flush,
1410            metadata: metadata.clone(),
1411            row_group_size: 1024,
1412            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1413            intermediate_manager: intm_manager,
1414            index_options: IndexOptions::default(),
1415            inverted_index_config: InvertedIndexConfig::default(),
1416            fulltext_index_config: FulltextIndexConfig::default(),
1417            bloom_filter_index_config: BloomFilterConfig::default(),
1418        }
1419        .build(FileId::random())
1420        .await;
1421
1422        assert!(indexer.inverted_indexer.is_some());
1423        assert!(indexer.fulltext_indexer.is_some());
1424        assert!(indexer.bloom_filter_indexer.is_none());
1425    }
1426
1427    #[tokio::test]
1428    async fn test_build_indexer_zero_row_group() {
1429        let (dir, factory) =
1430            PuffinManagerFactory::new_for_test_async("test_build_indexer_zero_row_group_").await;
1431        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1432
1433        let metadata = mock_region_metadata(MetaConfig {
1434            with_inverted: true,
1435            with_fulltext: true,
1436            with_skipping_bloom: true,
1437        });
1438        let indexer = IndexerBuilderImpl {
1439            build_type: IndexBuildType::Flush,
1440            metadata,
1441            row_group_size: 0,
1442            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1443            intermediate_manager: intm_manager,
1444            index_options: IndexOptions::default(),
1445            inverted_index_config: InvertedIndexConfig::default(),
1446            fulltext_index_config: FulltextIndexConfig::default(),
1447            bloom_filter_index_config: BloomFilterConfig::default(),
1448        }
1449        .build(FileId::random())
1450        .await;
1451
1452        assert!(indexer.inverted_indexer.is_none());
1453    }
1454
1455    #[tokio::test]
1456    async fn test_index_build_task_sst_not_exist() {
1457        let env = SchedulerEnv::new().await;
1458        let (tx, _rx) = mpsc::channel(4);
1459        let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1460        let mut scheduler = env.mock_index_build_scheduler(4);
1461        let metadata = Arc::new(sst_region_metadata());
1462        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1463        let file_purger = Arc::new(NoopFilePurger {});
1464        let files = HashMap::new();
1465        let version_control =
1466            mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1467        let region_id = metadata.region_id;
1468        let indexer_builder = mock_indexer_builder(metadata, &env).await;
1469
1470        // Create mock task.
1471        let task = IndexBuildTask {
1472            file_meta: FileMeta {
1473                region_id,
1474                file_id: FileId::random(),
1475                file_size: 100,
1476                ..Default::default()
1477            },
1478            reason: IndexBuildType::Flush,
1479            access_layer: env.access_layer.clone(),
1480            listener: WorkerListener::default(),
1481            manifest_ctx,
1482            write_cache: None,
1483            file_purger,
1484            indexer_builder,
1485            request_sender: tx,
1486            result_sender: result_tx,
1487        };
1488
1489        // Schedule the build task and check result.
1490        scheduler
1491            .schedule_build(&version_control, task)
1492            .await
1493            .unwrap();
1494        match result_rx.recv().await.unwrap() {
1495            Ok(outcome) => {
1496                if outcome == IndexBuildOutcome::Finished {
1497                    panic!("Expect aborted result due to missing SST file")
1498                }
1499            }
1500            _ => panic!("Expect aborted result due to missing SST file"),
1501        }
1502    }
1503
1504    #[tokio::test]
1505    async fn test_index_build_task_sst_exist() {
1506        let env = SchedulerEnv::new().await;
1507        let mut scheduler = env.mock_index_build_scheduler(4);
1508        let metadata = Arc::new(sst_region_metadata());
1509        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1510        let region_id = metadata.region_id;
1511        let file_purger = Arc::new(NoopFilePurger {});
1512        let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
1513        let file_meta = FileMeta {
1514            region_id,
1515            file_id: sst_info.file_id,
1516            file_size: sst_info.file_size,
1517            index_file_size: sst_info.index_metadata.file_size,
1518            num_rows: sst_info.num_rows as u64,
1519            num_row_groups: sst_info.num_row_groups,
1520            ..Default::default()
1521        };
1522        let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1523        let version_control =
1524            mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1525        let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1526
1527        // Create mock task.
1528        let (tx, mut rx) = mpsc::channel(4);
1529        let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1530        let task = IndexBuildTask {
1531            file_meta: file_meta.clone(),
1532            reason: IndexBuildType::Flush,
1533            access_layer: env.access_layer.clone(),
1534            listener: WorkerListener::default(),
1535            manifest_ctx,
1536            write_cache: None,
1537            file_purger,
1538            indexer_builder,
1539            request_sender: tx,
1540            result_sender: result_tx,
1541        };
1542
1543        scheduler
1544            .schedule_build(&version_control, task)
1545            .await
1546            .unwrap();
1547
1548        // The task should finish successfully.
1549        match result_rx.recv().await.unwrap() {
1550            Ok(outcome) => {
1551                assert_eq!(outcome, IndexBuildOutcome::Finished);
1552            }
1553            _ => panic!("Expect finished result"),
1554        }
1555
1556        // A notification should be sent to the worker to update the manifest.
1557        let worker_req = rx.recv().await.unwrap().request;
1558        match worker_req {
1559            WorkerRequest::Background {
1560                region_id: req_region_id,
1561                notify: BackgroundNotify::IndexBuildFinished(finished),
1562            } => {
1563                assert_eq!(req_region_id, region_id);
1564                assert_eq!(finished.edit.files_to_add.len(), 1);
1565                let updated_meta = &finished.edit.files_to_add[0];
1566
1567                // The mock indexer builder creates all index types.
1568                assert!(!updated_meta.available_indexes.is_empty());
1569                assert!(updated_meta.index_file_size > 0);
1570                assert_eq!(updated_meta.file_id, file_meta.file_id);
1571            }
1572            _ => panic!("Unexpected worker request: {:?}", worker_req),
1573        }
1574    }
1575
1576    async fn schedule_index_build_task_with_mode(build_mode: IndexBuildMode) {
1577        let env = SchedulerEnv::new().await;
1578        let mut scheduler = env.mock_index_build_scheduler(4);
1579        let metadata = Arc::new(sst_region_metadata());
1580        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1581        let file_purger = Arc::new(NoopFilePurger {});
1582        let region_id = metadata.region_id;
1583        let sst_info = mock_sst_file(metadata.clone(), &env, build_mode.clone()).await;
1584        let file_meta = FileMeta {
1585            region_id,
1586            file_id: sst_info.file_id,
1587            file_size: sst_info.file_size,
1588            index_file_size: sst_info.index_metadata.file_size,
1589            num_rows: sst_info.num_rows as u64,
1590            num_row_groups: sst_info.num_row_groups,
1591            ..Default::default()
1592        };
1593        let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1594        let version_control =
1595            mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1596        let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1597
1598        // Create mock task.
1599        let (tx, _rx) = mpsc::channel(4);
1600        let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1601        let task = IndexBuildTask {
1602            file_meta: file_meta.clone(),
1603            reason: IndexBuildType::Flush,
1604            access_layer: env.access_layer.clone(),
1605            listener: WorkerListener::default(),
1606            manifest_ctx,
1607            write_cache: None,
1608            file_purger,
1609            indexer_builder,
1610            request_sender: tx,
1611            result_sender: result_tx,
1612        };
1613
1614        scheduler
1615            .schedule_build(&version_control, task)
1616            .await
1617            .unwrap();
1618
1619        let puffin_path = location::index_file_path(
1620            env.access_layer.table_dir(),
1621            RegionFileId::new(region_id, file_meta.file_id),
1622            env.access_layer.path_type(),
1623        );
1624
1625        if build_mode == IndexBuildMode::Async {
1626            // The index file should not exist before the task finishes.
1627            assert!(
1628                !env.access_layer
1629                    .object_store()
1630                    .exists(&puffin_path)
1631                    .await
1632                    .unwrap()
1633            );
1634        } else {
1635            // The index file should exist before the task finishes.
1636            assert!(
1637                env.access_layer
1638                    .object_store()
1639                    .exists(&puffin_path)
1640                    .await
1641                    .unwrap()
1642            );
1643        }
1644
1645        // The task should finish successfully.
1646        match result_rx.recv().await.unwrap() {
1647            Ok(outcome) => {
1648                assert_eq!(outcome, IndexBuildOutcome::Finished);
1649            }
1650            _ => panic!("Expect finished result"),
1651        }
1652
1653        // The index file should exist after the task finishes.
1654        assert!(
1655            env.access_layer
1656                .object_store()
1657                .exists(&puffin_path)
1658                .await
1659                .unwrap()
1660        );
1661    }
1662
1663    #[tokio::test]
1664    async fn test_index_build_task_build_mode() {
1665        schedule_index_build_task_with_mode(IndexBuildMode::Async).await;
1666        schedule_index_build_task_with_mode(IndexBuildMode::Sync).await;
1667    }
1668
1669    #[tokio::test]
1670    async fn test_index_build_task_no_index() {
1671        let env = SchedulerEnv::new().await;
1672        let mut scheduler = env.mock_index_build_scheduler(4);
1673        let mut metadata = sst_region_metadata();
1674        // Unset indexes in metadata to simulate no index scenario.
1675        metadata.column_metadatas.iter_mut().for_each(|col| {
1676            col.column_schema.set_inverted_index(false);
1677            let _ = col.column_schema.unset_skipping_options();
1678        });
1679        let region_id = metadata.region_id;
1680        let metadata = Arc::new(metadata);
1681        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1682        let file_purger = Arc::new(NoopFilePurger {});
1683        let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
1684        let file_meta = FileMeta {
1685            region_id,
1686            file_id: sst_info.file_id,
1687            file_size: sst_info.file_size,
1688            index_file_size: sst_info.index_metadata.file_size,
1689            num_rows: sst_info.num_rows as u64,
1690            num_row_groups: sst_info.num_row_groups,
1691            ..Default::default()
1692        };
1693        let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1694        let version_control =
1695            mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1696        let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1697
1698        // Create mock task.
1699        let (tx, mut rx) = mpsc::channel(4);
1700        let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1701        let task = IndexBuildTask {
1702            file_meta: file_meta.clone(),
1703            reason: IndexBuildType::Flush,
1704            access_layer: env.access_layer.clone(),
1705            listener: WorkerListener::default(),
1706            manifest_ctx,
1707            write_cache: None,
1708            file_purger,
1709            indexer_builder,
1710            request_sender: tx,
1711            result_sender: result_tx,
1712        };
1713
1714        scheduler
1715            .schedule_build(&version_control, task)
1716            .await
1717            .unwrap();
1718
1719        // The task should finish successfully.
1720        match result_rx.recv().await.unwrap() {
1721            Ok(outcome) => {
1722                assert_eq!(outcome, IndexBuildOutcome::Finished);
1723            }
1724            _ => panic!("Expect finished result"),
1725        }
1726
1727        // No index is built, so no notification should be sent to the worker.
1728        let _ = rx.recv().await.is_none();
1729    }
1730
1731    #[tokio::test]
1732    async fn test_index_build_task_with_write_cache() {
1733        let env = SchedulerEnv::new().await;
1734        let mut scheduler = env.mock_index_build_scheduler(4);
1735        let metadata = Arc::new(sst_region_metadata());
1736        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1737        let file_purger = Arc::new(NoopFilePurger {});
1738        let region_id = metadata.region_id;
1739
1740        let (dir, factory) = PuffinManagerFactory::new_for_test_async("test_write_cache").await;
1741        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1742
1743        // Create mock write cache
1744        let write_cache = Arc::new(
1745            WriteCache::new_fs(
1746                dir.path().to_str().unwrap(),
1747                ReadableSize::mb(10),
1748                None,
1749                None,
1750                factory,
1751                intm_manager,
1752            )
1753            .await
1754            .unwrap(),
1755        );
1756        // Indexer builder built from write cache.
1757        let indexer_builder = Arc::new(IndexerBuilderImpl {
1758            build_type: IndexBuildType::Flush,
1759            metadata: metadata.clone(),
1760            row_group_size: 1024,
1761            puffin_manager: write_cache.build_puffin_manager().clone(),
1762            intermediate_manager: write_cache.intermediate_manager().clone(),
1763            index_options: IndexOptions::default(),
1764            inverted_index_config: InvertedIndexConfig::default(),
1765            fulltext_index_config: FulltextIndexConfig::default(),
1766            bloom_filter_index_config: BloomFilterConfig::default(),
1767        });
1768
1769        let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
1770        let file_meta = FileMeta {
1771            region_id,
1772            file_id: sst_info.file_id,
1773            file_size: sst_info.file_size,
1774            index_file_size: sst_info.index_metadata.file_size,
1775            num_rows: sst_info.num_rows as u64,
1776            num_row_groups: sst_info.num_row_groups,
1777            ..Default::default()
1778        };
1779        let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1780        let version_control =
1781            mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1782
1783        // Create mock task.
1784        let (tx, mut _rx) = mpsc::channel(4);
1785        let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1786        let task = IndexBuildTask {
1787            file_meta: file_meta.clone(),
1788            reason: IndexBuildType::Flush,
1789            access_layer: env.access_layer.clone(),
1790            listener: WorkerListener::default(),
1791            manifest_ctx,
1792            write_cache: Some(write_cache.clone()),
1793            file_purger,
1794            indexer_builder,
1795            request_sender: tx,
1796            result_sender: result_tx,
1797        };
1798
1799        scheduler
1800            .schedule_build(&version_control, task)
1801            .await
1802            .unwrap();
1803
1804        // The task should finish successfully.
1805        match result_rx.recv().await.unwrap() {
1806            Ok(outcome) => {
1807                assert_eq!(outcome, IndexBuildOutcome::Finished);
1808            }
1809            _ => panic!("Expect finished result"),
1810        }
1811
1812        // The write cache should contain the uploaded index file.
1813        let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Puffin);
1814        assert!(write_cache.file_cache().contains_key(&index_key));
1815    }
1816
1817    async fn create_mock_task_for_schedule(
1818        env: &SchedulerEnv,
1819        file_id: FileId,
1820        region_id: RegionId,
1821        reason: IndexBuildType,
1822    ) -> IndexBuildTask {
1823        let metadata = Arc::new(sst_region_metadata());
1824        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1825        let file_purger = Arc::new(NoopFilePurger {});
1826        let indexer_builder = mock_indexer_builder(metadata, env).await;
1827        let (tx, _rx) = mpsc::channel(4);
1828        let (result_tx, _result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1829
1830        IndexBuildTask {
1831            file_meta: FileMeta {
1832                region_id,
1833                file_id,
1834                file_size: 100,
1835                ..Default::default()
1836            },
1837            reason,
1838            access_layer: env.access_layer.clone(),
1839            listener: WorkerListener::default(),
1840            manifest_ctx,
1841            write_cache: None,
1842            file_purger,
1843            indexer_builder,
1844            request_sender: tx,
1845            result_sender: result_tx,
1846        }
1847    }
1848
1849    #[tokio::test]
1850    async fn test_scheduler_comprehensive() {
1851        let env = SchedulerEnv::new().await;
1852        let mut scheduler = env.mock_index_build_scheduler(2);
1853        let metadata = Arc::new(sst_region_metadata());
1854        let region_id = metadata.region_id;
1855        let file_purger = Arc::new(NoopFilePurger {});
1856
1857        // Prepare multiple files for testing
1858        let file_id1 = FileId::random();
1859        let file_id2 = FileId::random();
1860        let file_id3 = FileId::random();
1861        let file_id4 = FileId::random();
1862        let file_id5 = FileId::random();
1863
1864        let mut files = HashMap::new();
1865        for file_id in [file_id1, file_id2, file_id3, file_id4, file_id5] {
1866            files.insert(
1867                file_id,
1868                FileMeta {
1869                    region_id,
1870                    file_id,
1871                    file_size: 100,
1872                    ..Default::default()
1873                },
1874            );
1875        }
1876
1877        let version_control = mock_version_control(metadata, file_purger, files).await;
1878
1879        // Test 1: Basic scheduling
1880        let task1 =
1881            create_mock_task_for_schedule(&env, file_id1, region_id, IndexBuildType::Flush).await;
1882        assert!(
1883            scheduler
1884                .schedule_build(&version_control, task1)
1885                .await
1886                .is_ok()
1887        );
1888        assert!(scheduler.region_status.contains_key(&region_id));
1889        let status = scheduler.region_status.get(&region_id).unwrap();
1890        assert_eq!(status.building_files.len(), 1);
1891        assert!(status.building_files.contains(&file_id1));
1892
1893        // Test 2: Duplicate file scheduling (should be skipped)
1894        let task1_dup =
1895            create_mock_task_for_schedule(&env, file_id1, region_id, IndexBuildType::Flush).await;
1896        scheduler
1897            .schedule_build(&version_control, task1_dup)
1898            .await
1899            .unwrap();
1900        let status = scheduler.region_status.get(&region_id).unwrap();
1901        assert_eq!(status.building_files.len(), 1); // Still only one
1902
1903        // Test 3: Fill up to limit (2 building tasks)
1904        let task2 =
1905            create_mock_task_for_schedule(&env, file_id2, region_id, IndexBuildType::Flush).await;
1906        scheduler
1907            .schedule_build(&version_control, task2)
1908            .await
1909            .unwrap();
1910        let status = scheduler.region_status.get(&region_id).unwrap();
1911        assert_eq!(status.building_files.len(), 2); // Reached limit
1912        assert_eq!(status.pending_tasks.len(), 0);
1913
1914        // Test 4: Add tasks with different priorities to pending queue
1915        // Now all new tasks will be pending since we reached the limit
1916        let task3 =
1917            create_mock_task_for_schedule(&env, file_id3, region_id, IndexBuildType::Compact).await;
1918        let task4 =
1919            create_mock_task_for_schedule(&env, file_id4, region_id, IndexBuildType::SchemaChange)
1920                .await;
1921        let task5 =
1922            create_mock_task_for_schedule(&env, file_id5, region_id, IndexBuildType::Manual).await;
1923
1924        scheduler
1925            .schedule_build(&version_control, task3)
1926            .await
1927            .unwrap();
1928        scheduler
1929            .schedule_build(&version_control, task4)
1930            .await
1931            .unwrap();
1932        scheduler
1933            .schedule_build(&version_control, task5)
1934            .await
1935            .unwrap();
1936
1937        let status = scheduler.region_status.get(&region_id).unwrap();
1938        assert_eq!(status.building_files.len(), 2); // Still at limit
1939        assert_eq!(status.pending_tasks.len(), 3); // Three pending
1940
1941        // Test 5: Task completion triggers scheduling next highest priority task (Manual)
1942        scheduler.on_task_stopped(region_id, file_id1, &version_control);
1943        let status = scheduler.region_status.get(&region_id).unwrap();
1944        assert!(!status.building_files.contains(&file_id1));
1945        assert_eq!(status.building_files.len(), 2); // Should schedule next task
1946        assert_eq!(status.pending_tasks.len(), 2); // One less pending
1947        // The highest priority task (Manual) should now be building
1948        assert!(status.building_files.contains(&file_id5));
1949
1950        // Test 6: Complete another task, should schedule SchemaChange (second highest priority)
1951        scheduler.on_task_stopped(region_id, file_id2, &version_control);
1952        let status = scheduler.region_status.get(&region_id).unwrap();
1953        assert_eq!(status.building_files.len(), 2);
1954        assert_eq!(status.pending_tasks.len(), 1); // One less pending
1955        assert!(status.building_files.contains(&file_id4)); // SchemaChange should be building
1956
1957        // Test 7: Complete remaining tasks and cleanup
1958        scheduler.on_task_stopped(region_id, file_id5, &version_control);
1959        scheduler.on_task_stopped(region_id, file_id4, &version_control);
1960
1961        let status = scheduler.region_status.get(&region_id).unwrap();
1962        assert_eq!(status.building_files.len(), 1); // Last task (Compact) should be building
1963        assert_eq!(status.pending_tasks.len(), 0);
1964        assert!(status.building_files.contains(&file_id3));
1965
1966        scheduler.on_task_stopped(region_id, file_id3, &version_control);
1967
1968        // Region should be removed when all tasks complete
1969        assert!(!scheduler.region_status.contains_key(&region_id));
1970
1971        // Test 8: Region dropped with pending tasks
1972        let task6 =
1973            create_mock_task_for_schedule(&env, file_id1, region_id, IndexBuildType::Flush).await;
1974        let task7 =
1975            create_mock_task_for_schedule(&env, file_id2, region_id, IndexBuildType::Flush).await;
1976        let task8 =
1977            create_mock_task_for_schedule(&env, file_id3, region_id, IndexBuildType::Manual).await;
1978
1979        scheduler
1980            .schedule_build(&version_control, task6)
1981            .await
1982            .unwrap();
1983        scheduler
1984            .schedule_build(&version_control, task7)
1985            .await
1986            .unwrap();
1987        scheduler
1988            .schedule_build(&version_control, task8)
1989            .await
1990            .unwrap();
1991
1992        assert!(scheduler.region_status.contains_key(&region_id));
1993        let status = scheduler.region_status.get(&region_id).unwrap();
1994        assert_eq!(status.building_files.len(), 2);
1995        assert_eq!(status.pending_tasks.len(), 1);
1996
1997        scheduler.on_region_dropped(region_id).await;
1998        assert!(!scheduler.region_status.contains_key(&region_id));
1999    }
2000}