mito2/sst/
index.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod bloom_filter;
16pub(crate) mod fulltext_index;
17mod indexer;
18pub mod intermediate;
19pub(crate) mod inverted_index;
20pub mod puffin_manager;
21mod statistics;
22pub(crate) mod store;
23
24use std::cmp::Ordering;
25use std::collections::{BinaryHeap, HashMap, HashSet};
26use std::num::NonZeroUsize;
27use std::sync::Arc;
28
29use bloom_filter::creator::BloomFilterIndexer;
30use common_telemetry::{debug, error, info, warn};
31use datatypes::arrow::array::BinaryArray;
32use datatypes::arrow::record_batch::RecordBatch;
33use mito_codec::index::IndexValuesCodec;
34use mito_codec::row_converter::CompositeValues;
35use puffin_manager::SstPuffinManager;
36use smallvec::{SmallVec, smallvec};
37use snafu::{OptionExt, ResultExt};
38use statistics::{ByteCount, RowCount};
39use store_api::metadata::RegionMetadataRef;
40use store_api::storage::{ColumnId, FileId, RegionId};
41use strum::IntoStaticStr;
42use tokio::sync::mpsc::Sender;
43
44use crate::access_layer::{AccessLayerRef, FilePathProvider, OperationType, RegionFilePathFactory};
45use crate::cache::file_cache::{FileType, IndexKey};
46use crate::cache::write_cache::{UploadTracker, WriteCacheRef};
47use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
48use crate::error::{
49    BuildIndexAsyncSnafu, DecodeSnafu, Error, InvalidRecordBatchSnafu, RegionClosedSnafu,
50    RegionDroppedSnafu, RegionTruncatedSnafu, Result,
51};
52use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
53use crate::metrics::INDEX_CREATE_MEMORY_USAGE;
54use crate::read::{Batch, BatchReader};
55use crate::region::options::IndexOptions;
56use crate::region::version::VersionControlRef;
57use crate::region::{ManifestContextRef, RegionLeaderState};
58use crate::request::{
59    BackgroundNotify, IndexBuildFailed, IndexBuildFinished, IndexBuildStopped, WorkerRequest,
60    WorkerRequestWithTime,
61};
62use crate::schedule::scheduler::{Job, SchedulerRef};
63use crate::sst::file::{FileHandle, FileMeta, IndexType, RegionFileId};
64use crate::sst::file_purger::FilePurgerRef;
65use crate::sst::index::fulltext_index::creator::FulltextIndexer;
66use crate::sst::index::intermediate::IntermediateManager;
67use crate::sst::index::inverted_index::creator::InvertedIndexer;
68use crate::sst::parquet::SstInfo;
69use crate::sst::parquet::flat_format::primary_key_column_index;
70use crate::sst::parquet::format::PrimaryKeyArray;
71use crate::worker::WorkerListener;
72
73pub(crate) const TYPE_INVERTED_INDEX: &str = "inverted_index";
74pub(crate) const TYPE_FULLTEXT_INDEX: &str = "fulltext_index";
75pub(crate) const TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
76
77/// Output of the index creation.
78#[derive(Debug, Clone, Default)]
79pub struct IndexOutput {
80    /// Size of the file.
81    pub file_size: u64,
82    /// Inverted index output.
83    pub inverted_index: InvertedIndexOutput,
84    /// Fulltext index output.
85    pub fulltext_index: FulltextIndexOutput,
86    /// Bloom filter output.
87    pub bloom_filter: BloomFilterOutput,
88}
89
90impl IndexOutput {
91    pub fn build_available_indexes(&self) -> SmallVec<[IndexType; 4]> {
92        let mut indexes = SmallVec::new();
93        if self.inverted_index.is_available() {
94            indexes.push(IndexType::InvertedIndex);
95        }
96        if self.fulltext_index.is_available() {
97            indexes.push(IndexType::FulltextIndex);
98        }
99        if self.bloom_filter.is_available() {
100            indexes.push(IndexType::BloomFilterIndex);
101        }
102        indexes
103    }
104}
105
106/// Base output of the index creation.
107#[derive(Debug, Clone, Default)]
108pub struct IndexBaseOutput {
109    /// Size of the index.
110    pub index_size: ByteCount,
111    /// Number of rows in the index.
112    pub row_count: RowCount,
113    /// Available columns in the index.
114    pub columns: Vec<ColumnId>,
115}
116
117impl IndexBaseOutput {
118    pub fn is_available(&self) -> bool {
119        self.index_size > 0
120    }
121}
122
123/// Output of the inverted index creation.
124pub type InvertedIndexOutput = IndexBaseOutput;
125/// Output of the fulltext index creation.
126pub type FulltextIndexOutput = IndexBaseOutput;
127/// Output of the bloom filter creation.
128pub type BloomFilterOutput = IndexBaseOutput;
129
130/// The index creator that hides the error handling details.
131#[derive(Default)]
132pub struct Indexer {
133    file_id: FileId,
134    region_id: RegionId,
135    puffin_manager: Option<SstPuffinManager>,
136    inverted_indexer: Option<InvertedIndexer>,
137    last_mem_inverted_index: usize,
138    fulltext_indexer: Option<FulltextIndexer>,
139    last_mem_fulltext_index: usize,
140    bloom_filter_indexer: Option<BloomFilterIndexer>,
141    last_mem_bloom_filter: usize,
142    intermediate_manager: Option<IntermediateManager>,
143}
144
145impl Indexer {
146    /// Updates the index with the given batch.
147    pub async fn update(&mut self, batch: &mut Batch) {
148        self.do_update(batch).await;
149
150        self.flush_mem_metrics();
151    }
152
153    /// Updates the index with the given flat format RecordBatch.
154    pub async fn update_flat(&mut self, batch: &RecordBatch) {
155        self.do_update_flat(batch).await;
156
157        self.flush_mem_metrics();
158    }
159
160    /// Finalizes the index creation.
161    pub async fn finish(&mut self) -> IndexOutput {
162        let output = self.do_finish().await;
163
164        self.flush_mem_metrics();
165        output
166    }
167
168    /// Aborts the index creation.
169    pub async fn abort(&mut self) {
170        self.do_abort().await;
171
172        self.flush_mem_metrics();
173    }
174
175    fn flush_mem_metrics(&mut self) {
176        let inverted_mem = self
177            .inverted_indexer
178            .as_ref()
179            .map_or(0, |creator| creator.memory_usage());
180        INDEX_CREATE_MEMORY_USAGE
181            .with_label_values(&[TYPE_INVERTED_INDEX])
182            .add(inverted_mem as i64 - self.last_mem_inverted_index as i64);
183        self.last_mem_inverted_index = inverted_mem;
184
185        let fulltext_mem = self
186            .fulltext_indexer
187            .as_ref()
188            .map_or(0, |creator| creator.memory_usage());
189        INDEX_CREATE_MEMORY_USAGE
190            .with_label_values(&[TYPE_FULLTEXT_INDEX])
191            .add(fulltext_mem as i64 - self.last_mem_fulltext_index as i64);
192        self.last_mem_fulltext_index = fulltext_mem;
193
194        let bloom_filter_mem = self
195            .bloom_filter_indexer
196            .as_ref()
197            .map_or(0, |creator| creator.memory_usage());
198        INDEX_CREATE_MEMORY_USAGE
199            .with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
200            .add(bloom_filter_mem as i64 - self.last_mem_bloom_filter as i64);
201        self.last_mem_bloom_filter = bloom_filter_mem;
202    }
203}
204
205#[async_trait::async_trait]
206pub trait IndexerBuilder {
207    /// Builds indexer of given file id to [index_file_path].
208    async fn build(&self, file_id: FileId) -> Indexer;
209}
210#[derive(Clone)]
211pub(crate) struct IndexerBuilderImpl {
212    pub(crate) build_type: IndexBuildType,
213    pub(crate) metadata: RegionMetadataRef,
214    pub(crate) row_group_size: usize,
215    pub(crate) puffin_manager: SstPuffinManager,
216    pub(crate) intermediate_manager: IntermediateManager,
217    pub(crate) index_options: IndexOptions,
218    pub(crate) inverted_index_config: InvertedIndexConfig,
219    pub(crate) fulltext_index_config: FulltextIndexConfig,
220    pub(crate) bloom_filter_index_config: BloomFilterConfig,
221}
222
223#[async_trait::async_trait]
224impl IndexerBuilder for IndexerBuilderImpl {
225    /// Sanity check for arguments and create a new [Indexer] if arguments are valid.
226    async fn build(&self, file_id: FileId) -> Indexer {
227        let mut indexer = Indexer {
228            file_id,
229            region_id: self.metadata.region_id,
230            ..Default::default()
231        };
232
233        indexer.inverted_indexer = self.build_inverted_indexer(file_id);
234        indexer.fulltext_indexer = self.build_fulltext_indexer(file_id).await;
235        indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(file_id);
236        indexer.intermediate_manager = Some(self.intermediate_manager.clone());
237        if indexer.inverted_indexer.is_none()
238            && indexer.fulltext_indexer.is_none()
239            && indexer.bloom_filter_indexer.is_none()
240        {
241            indexer.abort().await;
242            return Indexer::default();
243        }
244
245        indexer.puffin_manager = Some(self.puffin_manager.clone());
246        indexer
247    }
248}
249
250impl IndexerBuilderImpl {
251    fn build_inverted_indexer(&self, file_id: FileId) -> Option<InvertedIndexer> {
252        let create = match self.build_type {
253            IndexBuildType::Flush => self.inverted_index_config.create_on_flush.auto(),
254            IndexBuildType::Compact => self.inverted_index_config.create_on_compaction.auto(),
255            _ => true,
256        };
257
258        if !create {
259            debug!(
260                "Skip creating inverted index due to config, region_id: {}, file_id: {}",
261                self.metadata.region_id, file_id,
262            );
263            return None;
264        }
265
266        let indexed_column_ids = self.metadata.inverted_indexed_column_ids(
267            self.index_options.inverted_index.ignore_column_ids.iter(),
268        );
269        if indexed_column_ids.is_empty() {
270            debug!(
271                "No columns to be indexed, skip creating inverted index, region_id: {}, file_id: {}",
272                self.metadata.region_id, file_id,
273            );
274            return None;
275        }
276
277        let Some(mut segment_row_count) =
278            NonZeroUsize::new(self.index_options.inverted_index.segment_row_count)
279        else {
280            warn!(
281                "Segment row count is 0, skip creating index, region_id: {}, file_id: {}",
282                self.metadata.region_id, file_id,
283            );
284            return None;
285        };
286
287        let Some(row_group_size) = NonZeroUsize::new(self.row_group_size) else {
288            warn!(
289                "Row group size is 0, skip creating index, region_id: {}, file_id: {}",
290                self.metadata.region_id, file_id,
291            );
292            return None;
293        };
294
295        // if segment row count not aligned with row group size, adjust it to be aligned.
296        if row_group_size.get() % segment_row_count.get() != 0 {
297            segment_row_count = row_group_size;
298        }
299
300        let indexer = InvertedIndexer::new(
301            file_id,
302            &self.metadata,
303            self.intermediate_manager.clone(),
304            self.inverted_index_config.mem_threshold_on_create(),
305            segment_row_count,
306            indexed_column_ids,
307        );
308
309        Some(indexer)
310    }
311
312    async fn build_fulltext_indexer(&self, file_id: FileId) -> Option<FulltextIndexer> {
313        let create = match self.build_type {
314            IndexBuildType::Flush => self.fulltext_index_config.create_on_flush.auto(),
315            IndexBuildType::Compact => self.fulltext_index_config.create_on_compaction.auto(),
316            _ => true,
317        };
318
319        if !create {
320            debug!(
321                "Skip creating full-text index due to config, region_id: {}, file_id: {}",
322                self.metadata.region_id, file_id,
323            );
324            return None;
325        }
326
327        let mem_limit = self.fulltext_index_config.mem_threshold_on_create();
328        let creator = FulltextIndexer::new(
329            &self.metadata.region_id,
330            &file_id,
331            &self.intermediate_manager,
332            &self.metadata,
333            self.fulltext_index_config.compress,
334            mem_limit,
335        )
336        .await;
337
338        let err = match creator {
339            Ok(creator) => {
340                if creator.is_none() {
341                    debug!(
342                        "Skip creating full-text index due to no columns require indexing, region_id: {}, file_id: {}",
343                        self.metadata.region_id, file_id,
344                    );
345                }
346                return creator;
347            }
348            Err(err) => err,
349        };
350
351        if cfg!(any(test, feature = "test")) {
352            panic!(
353                "Failed to create full-text indexer, region_id: {}, file_id: {}, err: {:?}",
354                self.metadata.region_id, file_id, err
355            );
356        } else {
357            warn!(
358                err; "Failed to create full-text indexer, region_id: {}, file_id: {}",
359                self.metadata.region_id, file_id,
360            );
361        }
362
363        None
364    }
365
366    fn build_bloom_filter_indexer(&self, file_id: FileId) -> Option<BloomFilterIndexer> {
367        let create = match self.build_type {
368            IndexBuildType::Flush => self.bloom_filter_index_config.create_on_flush.auto(),
369            IndexBuildType::Compact => self.bloom_filter_index_config.create_on_compaction.auto(),
370            _ => true,
371        };
372
373        if !create {
374            debug!(
375                "Skip creating bloom filter due to config, region_id: {}, file_id: {}",
376                self.metadata.region_id, file_id,
377            );
378            return None;
379        }
380
381        let mem_limit = self.bloom_filter_index_config.mem_threshold_on_create();
382        let indexer = BloomFilterIndexer::new(
383            file_id,
384            &self.metadata,
385            self.intermediate_manager.clone(),
386            mem_limit,
387        );
388
389        let err = match indexer {
390            Ok(indexer) => {
391                if indexer.is_none() {
392                    debug!(
393                        "Skip creating bloom filter due to no columns require indexing, region_id: {}, file_id: {}",
394                        self.metadata.region_id, file_id,
395                    );
396                }
397                return indexer;
398            }
399            Err(err) => err,
400        };
401
402        if cfg!(any(test, feature = "test")) {
403            panic!(
404                "Failed to create bloom filter, region_id: {}, file_id: {}, err: {:?}",
405                self.metadata.region_id, file_id, err
406            );
407        } else {
408            warn!(
409                err; "Failed to create bloom filter, region_id: {}, file_id: {}",
410                self.metadata.region_id, file_id,
411            );
412        }
413
414        None
415    }
416}
417
418/// Type of an index build task.
419#[derive(Debug, Clone, IntoStaticStr)]
420pub enum IndexBuildType {
421    /// Build index when schema change.
422    SchemaChange,
423    /// Create or update index after flush.
424    Flush,
425    /// Create or update index after compact.
426    Compact,
427    /// Manually build index.
428    Manual,
429}
430
431impl IndexBuildType {
432    fn as_str(&self) -> &'static str {
433        self.into()
434    }
435
436    // Higher value means higher priority.
437    fn priority(&self) -> u8 {
438        match self {
439            IndexBuildType::Manual => 3,
440            IndexBuildType::SchemaChange => 2,
441            IndexBuildType::Flush => 1,
442            IndexBuildType::Compact => 0,
443        }
444    }
445}
446
447impl From<OperationType> for IndexBuildType {
448    fn from(op_type: OperationType) -> Self {
449        match op_type {
450            OperationType::Flush => IndexBuildType::Flush,
451            OperationType::Compact => IndexBuildType::Compact,
452        }
453    }
454}
455
456/// Outcome of an index build task.
457#[derive(Debug, Clone, PartialEq, Eq, Hash)]
458pub enum IndexBuildOutcome {
459    Finished,
460    Aborted(String),
461}
462
463/// Mpsc output result sender.
464pub type ResultMpscSender = Sender<Result<IndexBuildOutcome>>;
465
466#[derive(Clone)]
467pub struct IndexBuildTask {
468    /// The file meta to build index for.
469    pub file_meta: FileMeta,
470    pub reason: IndexBuildType,
471    pub access_layer: AccessLayerRef,
472    pub(crate) listener: WorkerListener,
473    pub(crate) manifest_ctx: ManifestContextRef,
474    pub write_cache: Option<WriteCacheRef>,
475    pub file_purger: FilePurgerRef,
476    /// When write cache is enabled, the indexer builder should be built from the write cache.
477    /// Otherwise, it should be built from the access layer.
478    pub indexer_builder: Arc<dyn IndexerBuilder + Send + Sync>,
479    /// Request sender to notify the region worker.
480    pub(crate) request_sender: Sender<WorkerRequestWithTime>,
481    /// Index build result sender.
482    pub(crate) result_sender: ResultMpscSender,
483}
484
485impl std::fmt::Debug for IndexBuildTask {
486    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
487        f.debug_struct("IndexBuildTask")
488            .field("region_id", &self.file_meta.region_id)
489            .field("file_id", &self.file_meta.file_id)
490            .field("reason", &self.reason)
491            .finish()
492    }
493}
494
495impl IndexBuildTask {
496    /// Notify the caller the job is success.
497    pub async fn on_success(&self, outcome: IndexBuildOutcome) {
498        let _ = self.result_sender.send(Ok(outcome)).await;
499    }
500
501    /// Send index build error to waiter.
502    pub async fn on_failure(&self, err: Arc<Error>) {
503        let _ = self
504            .result_sender
505            .send(Err(err.clone()).context(BuildIndexAsyncSnafu {
506                region_id: self.file_meta.region_id,
507            }))
508            .await;
509    }
510
511    fn into_index_build_job(mut self, version_control: VersionControlRef) -> Job {
512        Box::pin(async move {
513            self.do_index_build(version_control).await;
514        })
515    }
516
517    async fn do_index_build(&mut self, version_control: VersionControlRef) {
518        self.listener
519            .on_index_build_begin(RegionFileId::new(
520                self.file_meta.region_id,
521                self.file_meta.file_id,
522            ))
523            .await;
524        match self.index_build(version_control).await {
525            Ok(outcome) => self.on_success(outcome).await,
526            Err(e) => {
527                warn!(
528                    e; "Index build task failed, region: {}, file_id: {}",
529                    self.file_meta.region_id, self.file_meta.file_id,
530                );
531                self.on_failure(e.into()).await
532            }
533        }
534        let worker_request = WorkerRequest::Background {
535            region_id: self.file_meta.region_id,
536            notify: BackgroundNotify::IndexBuildStopped(IndexBuildStopped {
537                region_id: self.file_meta.region_id,
538                file_id: self.file_meta.file_id,
539            }),
540        };
541        let _ = self
542            .request_sender
543            .send(WorkerRequestWithTime::new(worker_request))
544            .await;
545    }
546
547    // Checks if the SST file still exists in object store and version to avoid conflict with compaction.
548    async fn check_sst_file_exists(&self, version_control: &VersionControlRef) -> bool {
549        let file_id = self.file_meta.file_id;
550        let level = self.file_meta.level;
551        // We should check current version instead of the version when the job is created.
552        let version = version_control.current().version;
553
554        let Some(level_files) = version.ssts.levels().get(level as usize) else {
555            warn!(
556                "File id {} not found in level {} for index build, region: {}",
557                file_id, level, self.file_meta.region_id
558            );
559            return false;
560        };
561
562        match level_files.files.get(&file_id) {
563            Some(handle) if !handle.is_deleted() && !handle.compacting() => {
564                // If the file's metadata is present in the current version, the physical SST file
565                // is guaranteed to exist on object store. The file purger removes the physical
566                // file only after its metadata is removed from the version.
567                true
568            }
569            _ => {
570                warn!(
571                    "File id {} not found in region version for index build, region: {}",
572                    file_id, self.file_meta.region_id
573                );
574                false
575            }
576        }
577    }
578
579    async fn index_build(
580        &mut self,
581        version_control: VersionControlRef,
582    ) -> Result<IndexBuildOutcome> {
583        let index_file_id = if self.file_meta.index_file_size > 0 {
584            // Generate new file ID if index file exists to avoid overwrite.
585            FileId::random()
586        } else {
587            self.file_meta.file_id
588        };
589        let mut indexer = self.indexer_builder.build(index_file_id).await;
590
591        // Check SST file existence before building index to avoid failure of parquet reader.
592        if !self.check_sst_file_exists(&version_control).await {
593            // Calls abort to clean up index files.
594            indexer.abort().await;
595            self.listener
596                .on_index_build_abort(RegionFileId::new(
597                    self.file_meta.region_id,
598                    self.file_meta.file_id,
599                ))
600                .await;
601            return Ok(IndexBuildOutcome::Aborted(format!(
602                "SST file not found during index build, region: {}, file_id: {}",
603                self.file_meta.region_id, self.file_meta.file_id
604            )));
605        }
606
607        let mut parquet_reader = self
608            .access_layer
609            .read_sst(FileHandle::new(
610                self.file_meta.clone(),
611                self.file_purger.clone(),
612            ))
613            .build()
614            .await?;
615
616        // TODO(SNC123): optimize index batch
617        loop {
618            match parquet_reader.next_batch().await {
619                Ok(Some(batch)) => {
620                    indexer.update(&mut batch.clone()).await;
621                }
622                Ok(None) => break,
623                Err(e) => {
624                    indexer.abort().await;
625                    return Err(e);
626                }
627            }
628        }
629        let index_output = indexer.finish().await;
630
631        if index_output.file_size > 0 {
632            // Check SST file existence again after building index.
633            if !self.check_sst_file_exists(&version_control).await {
634                // Calls abort to clean up index files.
635                indexer.abort().await;
636                self.listener
637                    .on_index_build_abort(RegionFileId::new(
638                        self.file_meta.region_id,
639                        self.file_meta.file_id,
640                    ))
641                    .await;
642                return Ok(IndexBuildOutcome::Aborted(format!(
643                    "SST file not found during index build, region: {}, file_id: {}",
644                    self.file_meta.region_id, self.file_meta.file_id
645                )));
646            }
647
648            // Upload index file if write cache is enabled.
649            self.maybe_upload_index_file(index_output.clone(), index_file_id)
650                .await?;
651
652            let worker_request = match self.update_manifest(index_output, index_file_id).await {
653                Ok(edit) => {
654                    let index_build_finished = IndexBuildFinished {
655                        region_id: self.file_meta.region_id,
656                        edit,
657                    };
658                    WorkerRequest::Background {
659                        region_id: self.file_meta.region_id,
660                        notify: BackgroundNotify::IndexBuildFinished(index_build_finished),
661                    }
662                }
663                Err(e) => {
664                    let err = Arc::new(e);
665                    WorkerRequest::Background {
666                        region_id: self.file_meta.region_id,
667                        notify: BackgroundNotify::IndexBuildFailed(IndexBuildFailed { err }),
668                    }
669                }
670            };
671
672            let _ = self
673                .request_sender
674                .send(WorkerRequestWithTime::new(worker_request))
675                .await;
676        }
677        Ok(IndexBuildOutcome::Finished)
678    }
679
680    async fn maybe_upload_index_file(
681        &self,
682        output: IndexOutput,
683        index_file_id: FileId,
684    ) -> Result<()> {
685        if let Some(write_cache) = &self.write_cache {
686            let file_id = self.file_meta.file_id;
687            let region_id = self.file_meta.region_id;
688            let remote_store = self.access_layer.object_store();
689            let mut upload_tracker = UploadTracker::new(region_id);
690            let mut err = None;
691            let puffin_key = IndexKey::new(region_id, index_file_id, FileType::Puffin);
692            let puffin_path = RegionFilePathFactory::new(
693                self.access_layer.table_dir().to_string(),
694                self.access_layer.path_type(),
695            )
696            .build_index_file_path(RegionFileId::new(region_id, file_id));
697            if let Err(e) = write_cache
698                .upload(puffin_key, &puffin_path, remote_store)
699                .await
700            {
701                err = Some(e);
702            }
703            upload_tracker.push_uploaded_file(puffin_path);
704            if let Some(err) = err {
705                // Cleans index files on failure.
706                upload_tracker
707                    .clean(
708                        &smallvec![SstInfo {
709                            file_id,
710                            index_metadata: output,
711                            ..Default::default()
712                        }],
713                        &write_cache.file_cache(),
714                        remote_store,
715                    )
716                    .await;
717                return Err(err);
718            }
719        } else {
720            debug!("write cache is not available, skip uploading index file");
721        }
722        Ok(())
723    }
724
725    async fn update_manifest(
726        &mut self,
727        output: IndexOutput,
728        index_file_id: FileId,
729    ) -> Result<RegionEdit> {
730        self.file_meta.available_indexes = output.build_available_indexes();
731        self.file_meta.index_file_size = output.file_size;
732        self.file_meta.index_file_id = Some(index_file_id);
733        let edit = RegionEdit {
734            files_to_add: vec![self.file_meta.clone()],
735            files_to_remove: vec![],
736            timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
737            flushed_sequence: None,
738            flushed_entry_id: None,
739            committed_sequence: None,
740            compaction_time_window: None,
741        };
742        let version = self
743            .manifest_ctx
744            .update_manifest(
745                RegionLeaderState::Writable,
746                RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())),
747            )
748            .await?;
749        info!(
750            "Successfully update manifest version to {version}, region: {}, reason: {}",
751            self.file_meta.region_id,
752            self.reason.as_str()
753        );
754        Ok(edit)
755    }
756}
757
758impl PartialEq for IndexBuildTask {
759    fn eq(&self, other: &Self) -> bool {
760        self.reason.priority() == other.reason.priority()
761    }
762}
763
764impl Eq for IndexBuildTask {}
765
766impl PartialOrd for IndexBuildTask {
767    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
768        Some(self.cmp(other))
769    }
770}
771
772impl Ord for IndexBuildTask {
773    fn cmp(&self, other: &Self) -> Ordering {
774        self.reason.priority().cmp(&other.reason.priority())
775    }
776}
777
778/// Tracks the index build status of a region scheduled by the [IndexBuildScheduler].
779pub struct IndexBuildStatus {
780    pub region_id: RegionId,
781    pub building_files: HashSet<FileId>,
782    pub pending_tasks: BinaryHeap<IndexBuildTask>,
783}
784
785impl IndexBuildStatus {
786    pub fn new(region_id: RegionId) -> Self {
787        IndexBuildStatus {
788            region_id,
789            building_files: HashSet::new(),
790            pending_tasks: BinaryHeap::new(),
791        }
792    }
793
794    async fn on_failure(self, err: Arc<Error>) {
795        for task in self.pending_tasks {
796            task.on_failure(err.clone()).await;
797        }
798    }
799}
800
801pub struct IndexBuildScheduler {
802    /// Background job scheduler.
803    scheduler: SchedulerRef,
804    /// Tracks regions need to build index.
805    region_status: HashMap<RegionId, IndexBuildStatus>,
806    /// Limit of files allowed to build index concurrently for a region.
807    files_limit: usize,
808}
809
810/// Manager background index build tasks of a worker.
811impl IndexBuildScheduler {
812    pub fn new(scheduler: SchedulerRef, files_limit: usize) -> Self {
813        IndexBuildScheduler {
814            scheduler,
815            region_status: HashMap::new(),
816            files_limit,
817        }
818    }
819
820    pub(crate) async fn schedule_build(
821        &mut self,
822        version_control: &VersionControlRef,
823        task: IndexBuildTask,
824    ) -> Result<()> {
825        let status = self
826            .region_status
827            .entry(task.file_meta.region_id)
828            .or_insert_with(|| IndexBuildStatus::new(task.file_meta.region_id));
829
830        if status.building_files.contains(&task.file_meta.file_id) {
831            let region_file_id =
832                RegionFileId::new(task.file_meta.region_id, task.file_meta.file_id);
833            debug!(
834                "Aborting index build task since index is already being built for region file {:?}",
835                region_file_id
836            );
837            task.on_success(IndexBuildOutcome::Aborted(format!(
838                "Index is already being built for region file {:?}",
839                region_file_id
840            )))
841            .await;
842            task.listener.on_index_build_abort(region_file_id).await;
843            return Ok(());
844        }
845
846        status.pending_tasks.push(task);
847
848        self.schedule_next_build_batch(version_control);
849        Ok(())
850    }
851
852    /// Schedule tasks until reaching the files limit or no more tasks.
853    fn schedule_next_build_batch(&mut self, version_control: &VersionControlRef) {
854        let mut building_count = 0;
855        for status in self.region_status.values() {
856            building_count += status.building_files.len();
857        }
858
859        while building_count < self.files_limit {
860            if let Some(task) = self.find_next_task() {
861                let region_id = task.file_meta.region_id;
862                let file_id = task.file_meta.file_id;
863                let job = task.into_index_build_job(version_control.clone());
864                if self.scheduler.schedule(job).is_ok() {
865                    if let Some(status) = self.region_status.get_mut(&region_id) {
866                        status.building_files.insert(file_id);
867                        building_count += 1;
868                        status
869                            .pending_tasks
870                            .retain(|t| t.file_meta.file_id != file_id);
871                    } else {
872                        error!(
873                            "Region status not found when scheduling index build task, region: {}",
874                            region_id
875                        );
876                    }
877                } else {
878                    error!(
879                        "Failed to schedule index build job, region: {}, file_id: {}",
880                        region_id, file_id
881                    );
882                }
883            } else {
884                // No more tasks to schedule.
885                break;
886            }
887        }
888    }
889
890    /// Find the next task which has the highest priority to run.
891    fn find_next_task(&self) -> Option<IndexBuildTask> {
892        self.region_status
893            .iter()
894            .filter_map(|(_, status)| status.pending_tasks.peek())
895            .max()
896            .cloned()
897    }
898
899    pub(crate) fn on_task_stopped(
900        &mut self,
901        region_id: RegionId,
902        file_id: FileId,
903        version_control: &VersionControlRef,
904    ) {
905        if let Some(status) = self.region_status.get_mut(&region_id) {
906            status.building_files.remove(&file_id);
907            if status.building_files.is_empty() && status.pending_tasks.is_empty() {
908                // No more tasks for this region, remove it.
909                self.region_status.remove(&region_id);
910            }
911        }
912
913        self.schedule_next_build_batch(version_control);
914    }
915
916    pub(crate) async fn on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
917        error!(
918            err; "Index build scheduler encountered failure for region {}, removing all pending tasks.",
919            region_id
920        );
921        let Some(status) = self.region_status.remove(&region_id) else {
922            return;
923        };
924        status.on_failure(err).await;
925    }
926
927    /// Notifies the scheduler that the region is dropped.
928    pub(crate) async fn on_region_dropped(&mut self, region_id: RegionId) {
929        self.remove_region_on_failure(
930            region_id,
931            Arc::new(RegionDroppedSnafu { region_id }.build()),
932        )
933        .await;
934    }
935
936    /// Notifies the scheduler that the region is closed.
937    pub(crate) async fn on_region_closed(&mut self, region_id: RegionId) {
938        self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()))
939            .await;
940    }
941
942    /// Notifies the scheduler that the region is truncated.
943    pub(crate) async fn on_region_truncated(&mut self, region_id: RegionId) {
944        self.remove_region_on_failure(
945            region_id,
946            Arc::new(RegionTruncatedSnafu { region_id }.build()),
947        )
948        .await;
949    }
950
951    async fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
952        let Some(status) = self.region_status.remove(&region_id) else {
953            return;
954        };
955        status.on_failure(err).await;
956    }
957}
958
959/// Decodes primary keys from a flat format RecordBatch.
960/// Returns a list of (decoded_pk_value, count) tuples where count is the number of occurrences.
961pub(crate) fn decode_primary_keys_with_counts(
962    batch: &RecordBatch,
963    codec: &IndexValuesCodec,
964) -> Result<Vec<(CompositeValues, usize)>> {
965    let primary_key_index = primary_key_column_index(batch.num_columns());
966    let pk_dict_array = batch
967        .column(primary_key_index)
968        .as_any()
969        .downcast_ref::<PrimaryKeyArray>()
970        .context(InvalidRecordBatchSnafu {
971            reason: "Primary key column is not a dictionary array",
972        })?;
973    let pk_values_array = pk_dict_array
974        .values()
975        .as_any()
976        .downcast_ref::<BinaryArray>()
977        .context(InvalidRecordBatchSnafu {
978            reason: "Primary key values are not binary array",
979        })?;
980    let keys = pk_dict_array.keys();
981
982    // Decodes primary keys and count consecutive occurrences
983    let mut result: Vec<(CompositeValues, usize)> = Vec::new();
984    let mut prev_key: Option<u32> = None;
985
986    for i in 0..keys.len() {
987        let current_key = keys.value(i);
988
989        // Checks if current key is the same as previous key
990        if let Some(prev) = prev_key
991            && prev == current_key
992        {
993            // Safety: We already have a key in the result vector.
994            result.last_mut().unwrap().1 += 1;
995            continue;
996        }
997
998        // New key, decodes it.
999        let pk_bytes = pk_values_array.value(current_key as usize);
1000        let decoded_value = codec.decoder().decode(pk_bytes).context(DecodeSnafu)?;
1001
1002        result.push((decoded_value, 1));
1003        prev_key = Some(current_key);
1004    }
1005
1006    Ok(result)
1007}
1008
1009#[cfg(test)]
1010mod tests {
1011    use std::sync::Arc;
1012
1013    use api::v1::SemanticType;
1014    use common_base::readable_size::ReadableSize;
1015    use datafusion_common::HashMap;
1016    use datatypes::data_type::ConcreteDataType;
1017    use datatypes::schema::{
1018        ColumnSchema, FulltextOptions, SkippingIndexOptions, SkippingIndexType,
1019    };
1020    use object_store::ObjectStore;
1021    use object_store::services::Memory;
1022    use puffin_manager::PuffinManagerFactory;
1023    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1024    use tokio::sync::mpsc;
1025
1026    use super::*;
1027    use crate::access_layer::{FilePathProvider, Metrics, SstWriteRequest, WriteType};
1028    use crate::cache::write_cache::WriteCache;
1029    use crate::config::{FulltextIndexConfig, IndexBuildMode, MitoConfig, Mode};
1030    use crate::memtable::time_partition::TimePartitions;
1031    use crate::region::version::{VersionBuilder, VersionControl};
1032    use crate::sst::file::RegionFileId;
1033    use crate::sst::file_purger::NoopFilePurger;
1034    use crate::sst::location;
1035    use crate::sst::parquet::WriteOptions;
1036    use crate::test_util::memtable_util::EmptyMemtableBuilder;
1037    use crate::test_util::scheduler_util::SchedulerEnv;
1038    use crate::test_util::sst_util::{new_batch_by_range, new_source, sst_region_metadata};
1039
1040    struct MetaConfig {
1041        with_inverted: bool,
1042        with_fulltext: bool,
1043        with_skipping_bloom: bool,
1044    }
1045
1046    fn mock_region_metadata(
1047        MetaConfig {
1048            with_inverted,
1049            with_fulltext,
1050            with_skipping_bloom,
1051        }: MetaConfig,
1052    ) -> RegionMetadataRef {
1053        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
1054        let mut column_schema = ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false);
1055        if with_inverted {
1056            column_schema = column_schema.with_inverted_index(true);
1057        }
1058        builder
1059            .push_column_metadata(ColumnMetadata {
1060                column_schema,
1061                semantic_type: SemanticType::Field,
1062                column_id: 1,
1063            })
1064            .push_column_metadata(ColumnMetadata {
1065                column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
1066                semantic_type: SemanticType::Field,
1067                column_id: 2,
1068            })
1069            .push_column_metadata(ColumnMetadata {
1070                column_schema: ColumnSchema::new(
1071                    "c",
1072                    ConcreteDataType::timestamp_millisecond_datatype(),
1073                    false,
1074                ),
1075                semantic_type: SemanticType::Timestamp,
1076                column_id: 3,
1077            });
1078
1079        if with_fulltext {
1080            let column_schema =
1081                ColumnSchema::new("text", ConcreteDataType::string_datatype(), true)
1082                    .with_fulltext_options(FulltextOptions {
1083                        enable: true,
1084                        ..Default::default()
1085                    })
1086                    .unwrap();
1087
1088            let column = ColumnMetadata {
1089                column_schema,
1090                semantic_type: SemanticType::Field,
1091                column_id: 4,
1092            };
1093
1094            builder.push_column_metadata(column);
1095        }
1096
1097        if with_skipping_bloom {
1098            let column_schema =
1099                ColumnSchema::new("bloom", ConcreteDataType::string_datatype(), false)
1100                    .with_skipping_options(SkippingIndexOptions::new_unchecked(
1101                        42,
1102                        0.01,
1103                        SkippingIndexType::BloomFilter,
1104                    ))
1105                    .unwrap();
1106
1107            let column = ColumnMetadata {
1108                column_schema,
1109                semantic_type: SemanticType::Field,
1110                column_id: 5,
1111            };
1112
1113            builder.push_column_metadata(column);
1114        }
1115
1116        Arc::new(builder.build().unwrap())
1117    }
1118
1119    fn mock_object_store() -> ObjectStore {
1120        ObjectStore::new(Memory::default()).unwrap().finish()
1121    }
1122
1123    async fn mock_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
1124        IntermediateManager::init_fs(path).await.unwrap()
1125    }
1126    struct NoopPathProvider;
1127
1128    impl FilePathProvider for NoopPathProvider {
1129        fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
1130            unreachable!()
1131        }
1132
1133        fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
1134            unreachable!()
1135        }
1136    }
1137
1138    async fn mock_sst_file(
1139        metadata: RegionMetadataRef,
1140        env: &SchedulerEnv,
1141        build_mode: IndexBuildMode,
1142    ) -> SstInfo {
1143        let source = new_source(&[
1144            new_batch_by_range(&["a", "d"], 0, 60),
1145            new_batch_by_range(&["b", "f"], 0, 40),
1146            new_batch_by_range(&["b", "h"], 100, 200),
1147        ]);
1148        let mut index_config = MitoConfig::default().index;
1149        index_config.build_mode = build_mode;
1150        let write_request = SstWriteRequest {
1151            op_type: OperationType::Flush,
1152            metadata: metadata.clone(),
1153            source: either::Left(source),
1154            storage: None,
1155            max_sequence: None,
1156            cache_manager: Default::default(),
1157            index_options: IndexOptions::default(),
1158            index_config,
1159            inverted_index_config: Default::default(),
1160            fulltext_index_config: Default::default(),
1161            bloom_filter_index_config: Default::default(),
1162        };
1163        let mut metrics = Metrics::new(WriteType::Flush);
1164        env.access_layer
1165            .write_sst(write_request, &WriteOptions::default(), &mut metrics)
1166            .await
1167            .unwrap()
1168            .remove(0)
1169    }
1170
1171    async fn mock_version_control(
1172        metadata: RegionMetadataRef,
1173        file_purger: FilePurgerRef,
1174        files: HashMap<FileId, FileMeta>,
1175    ) -> VersionControlRef {
1176        let mutable = Arc::new(TimePartitions::new(
1177            metadata.clone(),
1178            Arc::new(EmptyMemtableBuilder::default()),
1179            0,
1180            None,
1181        ));
1182        let version_builder = VersionBuilder::new(metadata, mutable)
1183            .add_files(file_purger, files.values().cloned())
1184            .build();
1185        Arc::new(VersionControl::new(version_builder))
1186    }
1187
1188    async fn mock_indexer_builder(
1189        metadata: RegionMetadataRef,
1190        env: &SchedulerEnv,
1191    ) -> Arc<dyn IndexerBuilder + Send + Sync> {
1192        let (dir, factory) = PuffinManagerFactory::new_for_test_async("mock_indexer_builder").await;
1193        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1194        let puffin_manager = factory.build(
1195            env.access_layer.object_store().clone(),
1196            RegionFilePathFactory::new(
1197                env.access_layer.table_dir().to_string(),
1198                env.access_layer.path_type(),
1199            ),
1200        );
1201        Arc::new(IndexerBuilderImpl {
1202            build_type: IndexBuildType::Flush,
1203            metadata,
1204            row_group_size: 1024,
1205            puffin_manager,
1206            intermediate_manager: intm_manager,
1207            index_options: IndexOptions::default(),
1208            inverted_index_config: InvertedIndexConfig::default(),
1209            fulltext_index_config: FulltextIndexConfig::default(),
1210            bloom_filter_index_config: BloomFilterConfig::default(),
1211        })
1212    }
1213
1214    #[tokio::test]
1215    async fn test_build_indexer_basic() {
1216        let (dir, factory) =
1217            PuffinManagerFactory::new_for_test_async("test_build_indexer_basic_").await;
1218        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1219
1220        let metadata = mock_region_metadata(MetaConfig {
1221            with_inverted: true,
1222            with_fulltext: true,
1223            with_skipping_bloom: true,
1224        });
1225        let indexer = IndexerBuilderImpl {
1226            build_type: IndexBuildType::Flush,
1227            metadata,
1228            row_group_size: 1024,
1229            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1230            intermediate_manager: intm_manager,
1231            index_options: IndexOptions::default(),
1232            inverted_index_config: InvertedIndexConfig::default(),
1233            fulltext_index_config: FulltextIndexConfig::default(),
1234            bloom_filter_index_config: BloomFilterConfig::default(),
1235        }
1236        .build(FileId::random())
1237        .await;
1238
1239        assert!(indexer.inverted_indexer.is_some());
1240        assert!(indexer.fulltext_indexer.is_some());
1241        assert!(indexer.bloom_filter_indexer.is_some());
1242    }
1243
1244    #[tokio::test]
1245    async fn test_build_indexer_disable_create() {
1246        let (dir, factory) =
1247            PuffinManagerFactory::new_for_test_async("test_build_indexer_disable_create_").await;
1248        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1249
1250        let metadata = mock_region_metadata(MetaConfig {
1251            with_inverted: true,
1252            with_fulltext: true,
1253            with_skipping_bloom: true,
1254        });
1255        let indexer = IndexerBuilderImpl {
1256            build_type: IndexBuildType::Flush,
1257            metadata: metadata.clone(),
1258            row_group_size: 1024,
1259            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1260            intermediate_manager: intm_manager.clone(),
1261            index_options: IndexOptions::default(),
1262            inverted_index_config: InvertedIndexConfig {
1263                create_on_flush: Mode::Disable,
1264                ..Default::default()
1265            },
1266            fulltext_index_config: FulltextIndexConfig::default(),
1267            bloom_filter_index_config: BloomFilterConfig::default(),
1268        }
1269        .build(FileId::random())
1270        .await;
1271
1272        assert!(indexer.inverted_indexer.is_none());
1273        assert!(indexer.fulltext_indexer.is_some());
1274        assert!(indexer.bloom_filter_indexer.is_some());
1275
1276        let indexer = IndexerBuilderImpl {
1277            build_type: IndexBuildType::Compact,
1278            metadata: metadata.clone(),
1279            row_group_size: 1024,
1280            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1281            intermediate_manager: intm_manager.clone(),
1282            index_options: IndexOptions::default(),
1283            inverted_index_config: InvertedIndexConfig::default(),
1284            fulltext_index_config: FulltextIndexConfig {
1285                create_on_compaction: Mode::Disable,
1286                ..Default::default()
1287            },
1288            bloom_filter_index_config: BloomFilterConfig::default(),
1289        }
1290        .build(FileId::random())
1291        .await;
1292
1293        assert!(indexer.inverted_indexer.is_some());
1294        assert!(indexer.fulltext_indexer.is_none());
1295        assert!(indexer.bloom_filter_indexer.is_some());
1296
1297        let indexer = IndexerBuilderImpl {
1298            build_type: IndexBuildType::Compact,
1299            metadata,
1300            row_group_size: 1024,
1301            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1302            intermediate_manager: intm_manager,
1303            index_options: IndexOptions::default(),
1304            inverted_index_config: InvertedIndexConfig::default(),
1305            fulltext_index_config: FulltextIndexConfig::default(),
1306            bloom_filter_index_config: BloomFilterConfig {
1307                create_on_compaction: Mode::Disable,
1308                ..Default::default()
1309            },
1310        }
1311        .build(FileId::random())
1312        .await;
1313
1314        assert!(indexer.inverted_indexer.is_some());
1315        assert!(indexer.fulltext_indexer.is_some());
1316        assert!(indexer.bloom_filter_indexer.is_none());
1317    }
1318
1319    #[tokio::test]
1320    async fn test_build_indexer_no_required() {
1321        let (dir, factory) =
1322            PuffinManagerFactory::new_for_test_async("test_build_indexer_no_required_").await;
1323        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1324
1325        let metadata = mock_region_metadata(MetaConfig {
1326            with_inverted: false,
1327            with_fulltext: true,
1328            with_skipping_bloom: true,
1329        });
1330        let indexer = IndexerBuilderImpl {
1331            build_type: IndexBuildType::Flush,
1332            metadata: metadata.clone(),
1333            row_group_size: 1024,
1334            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1335            intermediate_manager: intm_manager.clone(),
1336            index_options: IndexOptions::default(),
1337            inverted_index_config: InvertedIndexConfig::default(),
1338            fulltext_index_config: FulltextIndexConfig::default(),
1339            bloom_filter_index_config: BloomFilterConfig::default(),
1340        }
1341        .build(FileId::random())
1342        .await;
1343
1344        assert!(indexer.inverted_indexer.is_none());
1345        assert!(indexer.fulltext_indexer.is_some());
1346        assert!(indexer.bloom_filter_indexer.is_some());
1347
1348        let metadata = mock_region_metadata(MetaConfig {
1349            with_inverted: true,
1350            with_fulltext: false,
1351            with_skipping_bloom: true,
1352        });
1353        let indexer = IndexerBuilderImpl {
1354            build_type: IndexBuildType::Flush,
1355            metadata: metadata.clone(),
1356            row_group_size: 1024,
1357            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1358            intermediate_manager: intm_manager.clone(),
1359            index_options: IndexOptions::default(),
1360            inverted_index_config: InvertedIndexConfig::default(),
1361            fulltext_index_config: FulltextIndexConfig::default(),
1362            bloom_filter_index_config: BloomFilterConfig::default(),
1363        }
1364        .build(FileId::random())
1365        .await;
1366
1367        assert!(indexer.inverted_indexer.is_some());
1368        assert!(indexer.fulltext_indexer.is_none());
1369        assert!(indexer.bloom_filter_indexer.is_some());
1370
1371        let metadata = mock_region_metadata(MetaConfig {
1372            with_inverted: true,
1373            with_fulltext: true,
1374            with_skipping_bloom: false,
1375        });
1376        let indexer = IndexerBuilderImpl {
1377            build_type: IndexBuildType::Flush,
1378            metadata: metadata.clone(),
1379            row_group_size: 1024,
1380            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1381            intermediate_manager: intm_manager,
1382            index_options: IndexOptions::default(),
1383            inverted_index_config: InvertedIndexConfig::default(),
1384            fulltext_index_config: FulltextIndexConfig::default(),
1385            bloom_filter_index_config: BloomFilterConfig::default(),
1386        }
1387        .build(FileId::random())
1388        .await;
1389
1390        assert!(indexer.inverted_indexer.is_some());
1391        assert!(indexer.fulltext_indexer.is_some());
1392        assert!(indexer.bloom_filter_indexer.is_none());
1393    }
1394
1395    #[tokio::test]
1396    async fn test_build_indexer_zero_row_group() {
1397        let (dir, factory) =
1398            PuffinManagerFactory::new_for_test_async("test_build_indexer_zero_row_group_").await;
1399        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1400
1401        let metadata = mock_region_metadata(MetaConfig {
1402            with_inverted: true,
1403            with_fulltext: true,
1404            with_skipping_bloom: true,
1405        });
1406        let indexer = IndexerBuilderImpl {
1407            build_type: IndexBuildType::Flush,
1408            metadata,
1409            row_group_size: 0,
1410            puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1411            intermediate_manager: intm_manager,
1412            index_options: IndexOptions::default(),
1413            inverted_index_config: InvertedIndexConfig::default(),
1414            fulltext_index_config: FulltextIndexConfig::default(),
1415            bloom_filter_index_config: BloomFilterConfig::default(),
1416        }
1417        .build(FileId::random())
1418        .await;
1419
1420        assert!(indexer.inverted_indexer.is_none());
1421    }
1422
1423    #[tokio::test]
1424    async fn test_index_build_task_sst_not_exist() {
1425        let env = SchedulerEnv::new().await;
1426        let (tx, _rx) = mpsc::channel(4);
1427        let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1428        let mut scheduler = env.mock_index_build_scheduler(4);
1429        let metadata = Arc::new(sst_region_metadata());
1430        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1431        let file_purger = Arc::new(NoopFilePurger {});
1432        let files = HashMap::new();
1433        let version_control =
1434            mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1435        let region_id = metadata.region_id;
1436        let indexer_builder = mock_indexer_builder(metadata, &env).await;
1437
1438        // Create mock task.
1439        let task = IndexBuildTask {
1440            file_meta: FileMeta {
1441                region_id,
1442                file_id: FileId::random(),
1443                file_size: 100,
1444                ..Default::default()
1445            },
1446            reason: IndexBuildType::Flush,
1447            access_layer: env.access_layer.clone(),
1448            listener: WorkerListener::default(),
1449            manifest_ctx,
1450            write_cache: None,
1451            file_purger,
1452            indexer_builder,
1453            request_sender: tx,
1454            result_sender: result_tx,
1455        };
1456
1457        // Schedule the build task and check result.
1458        scheduler
1459            .schedule_build(&version_control, task)
1460            .await
1461            .unwrap();
1462        match result_rx.recv().await.unwrap() {
1463            Ok(outcome) => {
1464                if outcome == IndexBuildOutcome::Finished {
1465                    panic!("Expect aborted result due to missing SST file")
1466                }
1467            }
1468            _ => panic!("Expect aborted result due to missing SST file"),
1469        }
1470    }
1471
1472    #[tokio::test]
1473    async fn test_index_build_task_sst_exist() {
1474        let env = SchedulerEnv::new().await;
1475        let mut scheduler = env.mock_index_build_scheduler(4);
1476        let metadata = Arc::new(sst_region_metadata());
1477        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1478        let region_id = metadata.region_id;
1479        let file_purger = Arc::new(NoopFilePurger {});
1480        let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
1481        let file_meta = FileMeta {
1482            region_id,
1483            file_id: sst_info.file_id,
1484            file_size: sst_info.file_size,
1485            index_file_size: sst_info.index_metadata.file_size,
1486            num_rows: sst_info.num_rows as u64,
1487            num_row_groups: sst_info.num_row_groups,
1488            ..Default::default()
1489        };
1490        let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1491        let version_control =
1492            mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1493        let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1494
1495        // Create mock task.
1496        let (tx, mut rx) = mpsc::channel(4);
1497        let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1498        let task = IndexBuildTask {
1499            file_meta: file_meta.clone(),
1500            reason: IndexBuildType::Flush,
1501            access_layer: env.access_layer.clone(),
1502            listener: WorkerListener::default(),
1503            manifest_ctx,
1504            write_cache: None,
1505            file_purger,
1506            indexer_builder,
1507            request_sender: tx,
1508            result_sender: result_tx,
1509        };
1510
1511        scheduler
1512            .schedule_build(&version_control, task)
1513            .await
1514            .unwrap();
1515
1516        // The task should finish successfully.
1517        match result_rx.recv().await.unwrap() {
1518            Ok(outcome) => {
1519                assert_eq!(outcome, IndexBuildOutcome::Finished);
1520            }
1521            _ => panic!("Expect finished result"),
1522        }
1523
1524        // A notification should be sent to the worker to update the manifest.
1525        let worker_req = rx.recv().await.unwrap().request;
1526        match worker_req {
1527            WorkerRequest::Background {
1528                region_id: req_region_id,
1529                notify: BackgroundNotify::IndexBuildFinished(finished),
1530            } => {
1531                assert_eq!(req_region_id, region_id);
1532                assert_eq!(finished.edit.files_to_add.len(), 1);
1533                let updated_meta = &finished.edit.files_to_add[0];
1534
1535                // The mock indexer builder creates all index types.
1536                assert!(!updated_meta.available_indexes.is_empty());
1537                assert!(updated_meta.index_file_size > 0);
1538                assert_eq!(updated_meta.file_id, file_meta.file_id);
1539            }
1540            _ => panic!("Unexpected worker request: {:?}", worker_req),
1541        }
1542    }
1543
1544    async fn schedule_index_build_task_with_mode(build_mode: IndexBuildMode) {
1545        let env = SchedulerEnv::new().await;
1546        let mut scheduler = env.mock_index_build_scheduler(4);
1547        let metadata = Arc::new(sst_region_metadata());
1548        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1549        let file_purger = Arc::new(NoopFilePurger {});
1550        let region_id = metadata.region_id;
1551        let sst_info = mock_sst_file(metadata.clone(), &env, build_mode.clone()).await;
1552        let file_meta = FileMeta {
1553            region_id,
1554            file_id: sst_info.file_id,
1555            file_size: sst_info.file_size,
1556            index_file_size: sst_info.index_metadata.file_size,
1557            num_rows: sst_info.num_rows as u64,
1558            num_row_groups: sst_info.num_row_groups,
1559            ..Default::default()
1560        };
1561        let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1562        let version_control =
1563            mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1564        let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1565
1566        // Create mock task.
1567        let (tx, _rx) = mpsc::channel(4);
1568        let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1569        let task = IndexBuildTask {
1570            file_meta: file_meta.clone(),
1571            reason: IndexBuildType::Flush,
1572            access_layer: env.access_layer.clone(),
1573            listener: WorkerListener::default(),
1574            manifest_ctx,
1575            write_cache: None,
1576            file_purger,
1577            indexer_builder,
1578            request_sender: tx,
1579            result_sender: result_tx,
1580        };
1581
1582        scheduler
1583            .schedule_build(&version_control, task)
1584            .await
1585            .unwrap();
1586
1587        let puffin_path = location::index_file_path(
1588            env.access_layer.table_dir(),
1589            RegionFileId::new(region_id, file_meta.file_id),
1590            env.access_layer.path_type(),
1591        );
1592
1593        if build_mode == IndexBuildMode::Async {
1594            // The index file should not exist before the task finishes.
1595            assert!(
1596                !env.access_layer
1597                    .object_store()
1598                    .exists(&puffin_path)
1599                    .await
1600                    .unwrap()
1601            );
1602        } else {
1603            // The index file should exist before the task finishes.
1604            assert!(
1605                env.access_layer
1606                    .object_store()
1607                    .exists(&puffin_path)
1608                    .await
1609                    .unwrap()
1610            );
1611        }
1612
1613        // The task should finish successfully.
1614        match result_rx.recv().await.unwrap() {
1615            Ok(outcome) => {
1616                assert_eq!(outcome, IndexBuildOutcome::Finished);
1617            }
1618            _ => panic!("Expect finished result"),
1619        }
1620
1621        // The index file should exist after the task finishes.
1622        assert!(
1623            env.access_layer
1624                .object_store()
1625                .exists(&puffin_path)
1626                .await
1627                .unwrap()
1628        );
1629    }
1630
1631    #[tokio::test]
1632    async fn test_index_build_task_build_mode() {
1633        schedule_index_build_task_with_mode(IndexBuildMode::Async).await;
1634        schedule_index_build_task_with_mode(IndexBuildMode::Sync).await;
1635    }
1636
1637    #[tokio::test]
1638    async fn test_index_build_task_no_index() {
1639        let env = SchedulerEnv::new().await;
1640        let mut scheduler = env.mock_index_build_scheduler(4);
1641        let mut metadata = sst_region_metadata();
1642        // Unset indexes in metadata to simulate no index scenario.
1643        metadata.column_metadatas.iter_mut().for_each(|col| {
1644            col.column_schema.set_inverted_index(false);
1645            let _ = col.column_schema.unset_skipping_options();
1646        });
1647        let region_id = metadata.region_id;
1648        let metadata = Arc::new(metadata);
1649        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1650        let file_purger = Arc::new(NoopFilePurger {});
1651        let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
1652        let file_meta = FileMeta {
1653            region_id,
1654            file_id: sst_info.file_id,
1655            file_size: sst_info.file_size,
1656            index_file_size: sst_info.index_metadata.file_size,
1657            num_rows: sst_info.num_rows as u64,
1658            num_row_groups: sst_info.num_row_groups,
1659            ..Default::default()
1660        };
1661        let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1662        let version_control =
1663            mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1664        let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1665
1666        // Create mock task.
1667        let (tx, mut rx) = mpsc::channel(4);
1668        let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1669        let task = IndexBuildTask {
1670            file_meta: file_meta.clone(),
1671            reason: IndexBuildType::Flush,
1672            access_layer: env.access_layer.clone(),
1673            listener: WorkerListener::default(),
1674            manifest_ctx,
1675            write_cache: None,
1676            file_purger,
1677            indexer_builder,
1678            request_sender: tx,
1679            result_sender: result_tx,
1680        };
1681
1682        scheduler
1683            .schedule_build(&version_control, task)
1684            .await
1685            .unwrap();
1686
1687        // The task should finish successfully.
1688        match result_rx.recv().await.unwrap() {
1689            Ok(outcome) => {
1690                assert_eq!(outcome, IndexBuildOutcome::Finished);
1691            }
1692            _ => panic!("Expect finished result"),
1693        }
1694
1695        // No index is built, so no notification should be sent to the worker.
1696        let _ = rx.recv().await.is_none();
1697    }
1698
1699    #[tokio::test]
1700    async fn test_index_build_task_with_write_cache() {
1701        let env = SchedulerEnv::new().await;
1702        let mut scheduler = env.mock_index_build_scheduler(4);
1703        let metadata = Arc::new(sst_region_metadata());
1704        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1705        let file_purger = Arc::new(NoopFilePurger {});
1706        let region_id = metadata.region_id;
1707
1708        let (dir, factory) = PuffinManagerFactory::new_for_test_async("test_write_cache").await;
1709        let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1710
1711        // Create mock write cache
1712        let write_cache = Arc::new(
1713            WriteCache::new_fs(
1714                dir.path().to_str().unwrap(),
1715                ReadableSize::mb(10),
1716                None,
1717                factory,
1718                intm_manager,
1719            )
1720            .await
1721            .unwrap(),
1722        );
1723        // Indexer builder built from write cache.
1724        let indexer_builder = Arc::new(IndexerBuilderImpl {
1725            build_type: IndexBuildType::Flush,
1726            metadata: metadata.clone(),
1727            row_group_size: 1024,
1728            puffin_manager: write_cache.build_puffin_manager().clone(),
1729            intermediate_manager: write_cache.intermediate_manager().clone(),
1730            index_options: IndexOptions::default(),
1731            inverted_index_config: InvertedIndexConfig::default(),
1732            fulltext_index_config: FulltextIndexConfig::default(),
1733            bloom_filter_index_config: BloomFilterConfig::default(),
1734        });
1735
1736        let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
1737        let file_meta = FileMeta {
1738            region_id,
1739            file_id: sst_info.file_id,
1740            file_size: sst_info.file_size,
1741            index_file_size: sst_info.index_metadata.file_size,
1742            num_rows: sst_info.num_rows as u64,
1743            num_row_groups: sst_info.num_row_groups,
1744            ..Default::default()
1745        };
1746        let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1747        let version_control =
1748            mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1749
1750        // Create mock task.
1751        let (tx, mut _rx) = mpsc::channel(4);
1752        let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1753        let task = IndexBuildTask {
1754            file_meta: file_meta.clone(),
1755            reason: IndexBuildType::Flush,
1756            access_layer: env.access_layer.clone(),
1757            listener: WorkerListener::default(),
1758            manifest_ctx,
1759            write_cache: Some(write_cache.clone()),
1760            file_purger,
1761            indexer_builder,
1762            request_sender: tx,
1763            result_sender: result_tx,
1764        };
1765
1766        scheduler
1767            .schedule_build(&version_control, task)
1768            .await
1769            .unwrap();
1770
1771        // The task should finish successfully.
1772        match result_rx.recv().await.unwrap() {
1773            Ok(outcome) => {
1774                assert_eq!(outcome, IndexBuildOutcome::Finished);
1775            }
1776            _ => panic!("Expect finished result"),
1777        }
1778
1779        // The write cache should contain the uploaded index file.
1780        let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Puffin);
1781        assert!(write_cache.file_cache().contains_key(&index_key));
1782    }
1783
1784    async fn create_mock_task_for_schedule(
1785        env: &SchedulerEnv,
1786        file_id: FileId,
1787        region_id: RegionId,
1788        reason: IndexBuildType,
1789    ) -> IndexBuildTask {
1790        let metadata = Arc::new(sst_region_metadata());
1791        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1792        let file_purger = Arc::new(NoopFilePurger {});
1793        let indexer_builder = mock_indexer_builder(metadata, env).await;
1794        let (tx, _rx) = mpsc::channel(4);
1795        let (result_tx, _result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1796
1797        IndexBuildTask {
1798            file_meta: FileMeta {
1799                region_id,
1800                file_id,
1801                file_size: 100,
1802                ..Default::default()
1803            },
1804            reason,
1805            access_layer: env.access_layer.clone(),
1806            listener: WorkerListener::default(),
1807            manifest_ctx,
1808            write_cache: None,
1809            file_purger,
1810            indexer_builder,
1811            request_sender: tx,
1812            result_sender: result_tx,
1813        }
1814    }
1815
1816    #[tokio::test]
1817    async fn test_scheduler_comprehensive() {
1818        let env = SchedulerEnv::new().await;
1819        let mut scheduler = env.mock_index_build_scheduler(2);
1820        let metadata = Arc::new(sst_region_metadata());
1821        let region_id = metadata.region_id;
1822        let file_purger = Arc::new(NoopFilePurger {});
1823
1824        // Prepare multiple files for testing
1825        let file_id1 = FileId::random();
1826        let file_id2 = FileId::random();
1827        let file_id3 = FileId::random();
1828        let file_id4 = FileId::random();
1829        let file_id5 = FileId::random();
1830
1831        let mut files = HashMap::new();
1832        for file_id in [file_id1, file_id2, file_id3, file_id4, file_id5] {
1833            files.insert(
1834                file_id,
1835                FileMeta {
1836                    region_id,
1837                    file_id,
1838                    file_size: 100,
1839                    ..Default::default()
1840                },
1841            );
1842        }
1843
1844        let version_control = mock_version_control(metadata, file_purger, files).await;
1845
1846        // Test 1: Basic scheduling
1847        let task1 =
1848            create_mock_task_for_schedule(&env, file_id1, region_id, IndexBuildType::Flush).await;
1849        assert!(
1850            scheduler
1851                .schedule_build(&version_control, task1)
1852                .await
1853                .is_ok()
1854        );
1855        assert!(scheduler.region_status.contains_key(&region_id));
1856        let status = scheduler.region_status.get(&region_id).unwrap();
1857        assert_eq!(status.building_files.len(), 1);
1858        assert!(status.building_files.contains(&file_id1));
1859
1860        // Test 2: Duplicate file scheduling (should be skipped)
1861        let task1_dup =
1862            create_mock_task_for_schedule(&env, file_id1, region_id, IndexBuildType::Flush).await;
1863        scheduler
1864            .schedule_build(&version_control, task1_dup)
1865            .await
1866            .unwrap();
1867        let status = scheduler.region_status.get(&region_id).unwrap();
1868        assert_eq!(status.building_files.len(), 1); // Still only one
1869
1870        // Test 3: Fill up to limit (2 building tasks)
1871        let task2 =
1872            create_mock_task_for_schedule(&env, file_id2, region_id, IndexBuildType::Flush).await;
1873        scheduler
1874            .schedule_build(&version_control, task2)
1875            .await
1876            .unwrap();
1877        let status = scheduler.region_status.get(&region_id).unwrap();
1878        assert_eq!(status.building_files.len(), 2); // Reached limit
1879        assert_eq!(status.pending_tasks.len(), 0);
1880
1881        // Test 4: Add tasks with different priorities to pending queue
1882        // Now all new tasks will be pending since we reached the limit
1883        let task3 =
1884            create_mock_task_for_schedule(&env, file_id3, region_id, IndexBuildType::Compact).await;
1885        let task4 =
1886            create_mock_task_for_schedule(&env, file_id4, region_id, IndexBuildType::SchemaChange)
1887                .await;
1888        let task5 =
1889            create_mock_task_for_schedule(&env, file_id5, region_id, IndexBuildType::Manual).await;
1890
1891        scheduler
1892            .schedule_build(&version_control, task3)
1893            .await
1894            .unwrap();
1895        scheduler
1896            .schedule_build(&version_control, task4)
1897            .await
1898            .unwrap();
1899        scheduler
1900            .schedule_build(&version_control, task5)
1901            .await
1902            .unwrap();
1903
1904        let status = scheduler.region_status.get(&region_id).unwrap();
1905        assert_eq!(status.building_files.len(), 2); // Still at limit
1906        assert_eq!(status.pending_tasks.len(), 3); // Three pending
1907
1908        // Test 5: Task completion triggers scheduling next highest priority task (Manual)
1909        scheduler.on_task_stopped(region_id, file_id1, &version_control);
1910        let status = scheduler.region_status.get(&region_id).unwrap();
1911        assert!(!status.building_files.contains(&file_id1));
1912        assert_eq!(status.building_files.len(), 2); // Should schedule next task
1913        assert_eq!(status.pending_tasks.len(), 2); // One less pending
1914        // The highest priority task (Manual) should now be building
1915        assert!(status.building_files.contains(&file_id5));
1916
1917        // Test 6: Complete another task, should schedule SchemaChange (second highest priority)
1918        scheduler.on_task_stopped(region_id, file_id2, &version_control);
1919        let status = scheduler.region_status.get(&region_id).unwrap();
1920        assert_eq!(status.building_files.len(), 2);
1921        assert_eq!(status.pending_tasks.len(), 1); // One less pending
1922        assert!(status.building_files.contains(&file_id4)); // SchemaChange should be building
1923
1924        // Test 7: Complete remaining tasks and cleanup
1925        scheduler.on_task_stopped(region_id, file_id5, &version_control);
1926        scheduler.on_task_stopped(region_id, file_id4, &version_control);
1927
1928        let status = scheduler.region_status.get(&region_id).unwrap();
1929        assert_eq!(status.building_files.len(), 1); // Last task (Compact) should be building
1930        assert_eq!(status.pending_tasks.len(), 0);
1931        assert!(status.building_files.contains(&file_id3));
1932
1933        scheduler.on_task_stopped(region_id, file_id3, &version_control);
1934
1935        // Region should be removed when all tasks complete
1936        assert!(!scheduler.region_status.contains_key(&region_id));
1937
1938        // Test 8: Region dropped with pending tasks
1939        let task6 =
1940            create_mock_task_for_schedule(&env, file_id1, region_id, IndexBuildType::Flush).await;
1941        let task7 =
1942            create_mock_task_for_schedule(&env, file_id2, region_id, IndexBuildType::Flush).await;
1943        let task8 =
1944            create_mock_task_for_schedule(&env, file_id3, region_id, IndexBuildType::Manual).await;
1945
1946        scheduler
1947            .schedule_build(&version_control, task6)
1948            .await
1949            .unwrap();
1950        scheduler
1951            .schedule_build(&version_control, task7)
1952            .await
1953            .unwrap();
1954        scheduler
1955            .schedule_build(&version_control, task8)
1956            .await
1957            .unwrap();
1958
1959        assert!(scheduler.region_status.contains_key(&region_id));
1960        let status = scheduler.region_status.get(&region_id).unwrap();
1961        assert_eq!(status.building_files.len(), 2);
1962        assert_eq!(status.pending_tasks.len(), 1);
1963
1964        scheduler.on_region_dropped(region_id).await;
1965        assert!(!scheduler.region_status.contains_key(&region_id));
1966    }
1967}