1pub(crate) mod bloom_filter;
16pub(crate) mod fulltext_index;
17mod indexer;
18pub mod intermediate;
19pub(crate) mod inverted_index;
20pub mod puffin_manager;
21mod statistics;
22pub(crate) mod store;
23#[cfg(feature = "vector_index")]
24pub(crate) mod vector_index;
25
26use std::cmp::Ordering;
27use std::collections::{BinaryHeap, HashMap, HashSet};
28use std::num::NonZeroUsize;
29use std::sync::Arc;
30
31use bloom_filter::creator::BloomFilterIndexer;
32use common_telemetry::{debug, error, info, warn};
33use datatypes::arrow::array::BinaryArray;
34use datatypes::arrow::record_batch::RecordBatch;
35use mito_codec::index::IndexValuesCodec;
36use mito_codec::row_converter::CompositeValues;
37use object_store::ObjectStore;
38use puffin_manager::SstPuffinManager;
39use smallvec::{SmallVec, smallvec};
40use snafu::{OptionExt, ResultExt};
41use statistics::{ByteCount, RowCount};
42use store_api::metadata::RegionMetadataRef;
43use store_api::storage::{ColumnId, FileId, RegionId};
44use strum::IntoStaticStr;
45use tokio::sync::mpsc::Sender;
46#[cfg(feature = "vector_index")]
47use vector_index::creator::VectorIndexer;
48
49use crate::access_layer::{AccessLayerRef, FilePathProvider, OperationType, RegionFilePathFactory};
50use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
51use crate::cache::write_cache::{UploadTracker, WriteCacheRef};
52#[cfg(feature = "vector_index")]
53use crate::config::VectorIndexConfig;
54use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
55use crate::error::{
56 BuildIndexAsyncSnafu, DecodeSnafu, Error, InvalidRecordBatchSnafu, RegionClosedSnafu,
57 RegionDroppedSnafu, RegionTruncatedSnafu, Result,
58};
59use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
60use crate::metrics::INDEX_CREATE_MEMORY_USAGE;
61use crate::read::Batch;
62use crate::region::options::IndexOptions;
63use crate::region::version::VersionControlRef;
64use crate::region::{ManifestContextRef, RegionLeaderState};
65use crate::request::{
66 BackgroundNotify, IndexBuildFailed, IndexBuildFinished, IndexBuildStopped, WorkerRequest,
67 WorkerRequestWithTime,
68};
69use crate::schedule::scheduler::{Job, SchedulerRef};
70use crate::sst::file::{
71 ColumnIndexMetadata, FileHandle, FileMeta, IndexType, IndexTypes, RegionFileId, RegionIndexId,
72};
73use crate::sst::file_purger::FilePurgerRef;
74use crate::sst::index::fulltext_index::creator::FulltextIndexer;
75use crate::sst::index::intermediate::IntermediateManager;
76use crate::sst::index::inverted_index::creator::InvertedIndexer;
77use crate::sst::parquet::SstInfo;
78use crate::sst::parquet::flat_format::primary_key_column_index;
79use crate::sst::parquet::format::PrimaryKeyArray;
80use crate::worker::WorkerListener;
81
82pub(crate) const TYPE_INVERTED_INDEX: &str = "inverted_index";
83pub(crate) const TYPE_FULLTEXT_INDEX: &str = "fulltext_index";
84pub(crate) const TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
85#[cfg(feature = "vector_index")]
86pub(crate) const TYPE_VECTOR_INDEX: &str = "vector_index";
87
88pub(crate) fn trigger_index_background_download(
90 file_cache: Option<&FileCacheRef>,
91 file_id: &RegionIndexId,
92 file_size_hint: Option<u64>,
93 path_factory: &RegionFilePathFactory,
94 object_store: &ObjectStore,
95) {
96 if let (Some(file_cache), Some(file_size)) = (file_cache, file_size_hint) {
97 let index_key = IndexKey::new(
98 file_id.region_id(),
99 file_id.file_id(),
100 FileType::Puffin(file_id.version),
101 );
102 let remote_path = path_factory.build_index_file_path(file_id.file_id);
103 file_cache.maybe_download_background(
104 index_key,
105 remote_path,
106 object_store.clone(),
107 file_size,
108 );
109 }
110}
111
112#[derive(Debug, Clone, Default)]
114pub struct IndexOutput {
115 pub file_size: u64,
117 pub version: u64,
119 pub inverted_index: InvertedIndexOutput,
121 pub fulltext_index: FulltextIndexOutput,
123 pub bloom_filter: BloomFilterOutput,
125 #[cfg(feature = "vector_index")]
127 pub vector_index: VectorIndexOutput,
128}
129
130impl IndexOutput {
131 pub fn build_available_indexes(&self) -> SmallVec<[IndexType; 4]> {
132 let mut indexes = SmallVec::new();
133 if self.inverted_index.is_available() {
134 indexes.push(IndexType::InvertedIndex);
135 }
136 if self.fulltext_index.is_available() {
137 indexes.push(IndexType::FulltextIndex);
138 }
139 if self.bloom_filter.is_available() {
140 indexes.push(IndexType::BloomFilterIndex);
141 }
142 #[cfg(feature = "vector_index")]
143 if self.vector_index.is_available() {
144 indexes.push(IndexType::VectorIndex);
145 }
146 indexes
147 }
148
149 pub fn build_indexes(&self) -> Vec<ColumnIndexMetadata> {
150 let mut map: HashMap<ColumnId, IndexTypes> = HashMap::new();
151
152 if self.inverted_index.is_available() {
153 for &col in &self.inverted_index.columns {
154 map.entry(col).or_default().push(IndexType::InvertedIndex);
155 }
156 }
157 if self.fulltext_index.is_available() {
158 for &col in &self.fulltext_index.columns {
159 map.entry(col).or_default().push(IndexType::FulltextIndex);
160 }
161 }
162 if self.bloom_filter.is_available() {
163 for &col in &self.bloom_filter.columns {
164 map.entry(col)
165 .or_default()
166 .push(IndexType::BloomFilterIndex);
167 }
168 }
169 #[cfg(feature = "vector_index")]
170 if self.vector_index.is_available() {
171 for &col in &self.vector_index.columns {
172 map.entry(col).or_default().push(IndexType::VectorIndex);
173 }
174 }
175
176 map.into_iter()
177 .map(|(column_id, created_indexes)| ColumnIndexMetadata {
178 column_id,
179 created_indexes,
180 })
181 .collect::<Vec<_>>()
182 }
183}
184
185#[derive(Debug, Clone, Default)]
187pub struct IndexBaseOutput {
188 pub index_size: ByteCount,
190 pub row_count: RowCount,
192 pub columns: Vec<ColumnId>,
194}
195
196impl IndexBaseOutput {
197 pub fn is_available(&self) -> bool {
198 self.index_size > 0
199 }
200}
201
202pub type InvertedIndexOutput = IndexBaseOutput;
204pub type FulltextIndexOutput = IndexBaseOutput;
206pub type BloomFilterOutput = IndexBaseOutput;
208#[cfg(feature = "vector_index")]
210pub type VectorIndexOutput = IndexBaseOutput;
211
212#[derive(Default)]
214pub struct Indexer {
215 file_id: FileId,
216 region_id: RegionId,
217 index_version: u64,
218 puffin_manager: Option<SstPuffinManager>,
219 write_cache_enabled: bool,
220 inverted_indexer: Option<InvertedIndexer>,
221 last_mem_inverted_index: usize,
222 fulltext_indexer: Option<FulltextIndexer>,
223 last_mem_fulltext_index: usize,
224 bloom_filter_indexer: Option<BloomFilterIndexer>,
225 last_mem_bloom_filter: usize,
226 #[cfg(feature = "vector_index")]
227 vector_indexer: Option<VectorIndexer>,
228 #[cfg(feature = "vector_index")]
229 last_mem_vector_index: usize,
230 intermediate_manager: Option<IntermediateManager>,
231}
232
233impl Indexer {
234 pub async fn update(&mut self, batch: &mut Batch) {
236 self.do_update(batch).await;
237
238 self.flush_mem_metrics();
239 }
240
241 pub async fn update_flat(&mut self, batch: &RecordBatch) {
243 self.do_update_flat(batch).await;
244
245 self.flush_mem_metrics();
246 }
247
248 pub async fn finish(&mut self) -> IndexOutput {
250 let output = self.do_finish().await;
251
252 self.flush_mem_metrics();
253 output
254 }
255
256 pub async fn abort(&mut self) {
258 self.do_abort().await;
259
260 self.flush_mem_metrics();
261 }
262
263 fn flush_mem_metrics(&mut self) {
264 let inverted_mem = self
265 .inverted_indexer
266 .as_ref()
267 .map_or(0, |creator| creator.memory_usage());
268 INDEX_CREATE_MEMORY_USAGE
269 .with_label_values(&[TYPE_INVERTED_INDEX])
270 .add(inverted_mem as i64 - self.last_mem_inverted_index as i64);
271 self.last_mem_inverted_index = inverted_mem;
272
273 let fulltext_mem = self
274 .fulltext_indexer
275 .as_ref()
276 .map_or(0, |creator| creator.memory_usage());
277 INDEX_CREATE_MEMORY_USAGE
278 .with_label_values(&[TYPE_FULLTEXT_INDEX])
279 .add(fulltext_mem as i64 - self.last_mem_fulltext_index as i64);
280 self.last_mem_fulltext_index = fulltext_mem;
281
282 let bloom_filter_mem = self
283 .bloom_filter_indexer
284 .as_ref()
285 .map_or(0, |creator| creator.memory_usage());
286 INDEX_CREATE_MEMORY_USAGE
287 .with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
288 .add(bloom_filter_mem as i64 - self.last_mem_bloom_filter as i64);
289 self.last_mem_bloom_filter = bloom_filter_mem;
290
291 #[cfg(feature = "vector_index")]
292 {
293 let vector_mem = self
294 .vector_indexer
295 .as_ref()
296 .map_or(0, |creator| creator.memory_usage());
297 INDEX_CREATE_MEMORY_USAGE
298 .with_label_values(&[TYPE_VECTOR_INDEX])
299 .add(vector_mem as i64 - self.last_mem_vector_index as i64);
300 self.last_mem_vector_index = vector_mem;
301 }
302 }
303}
304
305#[async_trait::async_trait]
306pub trait IndexerBuilder {
307 async fn build(&self, file_id: FileId, index_version: u64) -> Indexer;
309}
310#[derive(Clone)]
311pub(crate) struct IndexerBuilderImpl {
312 pub(crate) build_type: IndexBuildType,
313 pub(crate) metadata: RegionMetadataRef,
314 pub(crate) row_group_size: usize,
315 pub(crate) puffin_manager: SstPuffinManager,
316 pub(crate) write_cache_enabled: bool,
317 pub(crate) intermediate_manager: IntermediateManager,
318 pub(crate) index_options: IndexOptions,
319 pub(crate) inverted_index_config: InvertedIndexConfig,
320 pub(crate) fulltext_index_config: FulltextIndexConfig,
321 pub(crate) bloom_filter_index_config: BloomFilterConfig,
322 #[cfg(feature = "vector_index")]
323 pub(crate) vector_index_config: VectorIndexConfig,
324}
325
326#[async_trait::async_trait]
327impl IndexerBuilder for IndexerBuilderImpl {
328 async fn build(&self, file_id: FileId, index_version: u64) -> Indexer {
330 let mut indexer = Indexer {
331 file_id,
332 region_id: self.metadata.region_id,
333 index_version,
334 write_cache_enabled: self.write_cache_enabled,
335 ..Default::default()
336 };
337
338 indexer.inverted_indexer = self.build_inverted_indexer(file_id);
339 indexer.fulltext_indexer = self.build_fulltext_indexer(file_id).await;
340 indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(file_id);
341 #[cfg(feature = "vector_index")]
342 {
343 indexer.vector_indexer = self.build_vector_indexer(file_id);
344 }
345 indexer.intermediate_manager = Some(self.intermediate_manager.clone());
346
347 #[cfg(feature = "vector_index")]
348 let has_any_indexer = indexer.inverted_indexer.is_some()
349 || indexer.fulltext_indexer.is_some()
350 || indexer.bloom_filter_indexer.is_some()
351 || indexer.vector_indexer.is_some();
352 #[cfg(not(feature = "vector_index"))]
353 let has_any_indexer = indexer.inverted_indexer.is_some()
354 || indexer.fulltext_indexer.is_some()
355 || indexer.bloom_filter_indexer.is_some();
356
357 if !has_any_indexer {
358 indexer.abort().await;
359 return Indexer::default();
360 }
361
362 indexer.puffin_manager = Some(self.puffin_manager.clone());
363 indexer
364 }
365}
366
367impl IndexerBuilderImpl {
368 fn build_inverted_indexer(&self, file_id: FileId) -> Option<InvertedIndexer> {
369 let create = match self.build_type {
370 IndexBuildType::Flush => self.inverted_index_config.create_on_flush.auto(),
371 IndexBuildType::Compact => self.inverted_index_config.create_on_compaction.auto(),
372 _ => true,
373 };
374
375 if !create {
376 debug!(
377 "Skip creating inverted index due to config, region_id: {}, file_id: {}",
378 self.metadata.region_id, file_id,
379 );
380 return None;
381 }
382
383 let indexed_column_ids = self.metadata.inverted_indexed_column_ids(
384 self.index_options.inverted_index.ignore_column_ids.iter(),
385 );
386 if indexed_column_ids.is_empty() {
387 debug!(
388 "No columns to be indexed, skip creating inverted index, region_id: {}, file_id: {}",
389 self.metadata.region_id, file_id,
390 );
391 return None;
392 }
393
394 let Some(mut segment_row_count) =
395 NonZeroUsize::new(self.index_options.inverted_index.segment_row_count)
396 else {
397 warn!(
398 "Segment row count is 0, skip creating index, region_id: {}, file_id: {}",
399 self.metadata.region_id, file_id,
400 );
401 return None;
402 };
403
404 let Some(row_group_size) = NonZeroUsize::new(self.row_group_size) else {
405 warn!(
406 "Row group size is 0, skip creating index, region_id: {}, file_id: {}",
407 self.metadata.region_id, file_id,
408 );
409 return None;
410 };
411
412 if row_group_size.get() % segment_row_count.get() != 0 {
414 segment_row_count = row_group_size;
415 }
416
417 let indexer = InvertedIndexer::new(
418 file_id,
419 &self.metadata,
420 self.intermediate_manager.clone(),
421 self.inverted_index_config.mem_threshold_on_create(),
422 segment_row_count,
423 indexed_column_ids,
424 );
425
426 Some(indexer)
427 }
428
429 async fn build_fulltext_indexer(&self, file_id: FileId) -> Option<FulltextIndexer> {
430 let create = match self.build_type {
431 IndexBuildType::Flush => self.fulltext_index_config.create_on_flush.auto(),
432 IndexBuildType::Compact => self.fulltext_index_config.create_on_compaction.auto(),
433 _ => true,
434 };
435
436 if !create {
437 debug!(
438 "Skip creating full-text index due to config, region_id: {}, file_id: {}",
439 self.metadata.region_id, file_id,
440 );
441 return None;
442 }
443
444 let mem_limit = self.fulltext_index_config.mem_threshold_on_create();
445 let creator = FulltextIndexer::new(
446 &self.metadata.region_id,
447 &file_id,
448 &self.intermediate_manager,
449 &self.metadata,
450 self.fulltext_index_config.compress,
451 mem_limit,
452 )
453 .await;
454
455 let err = match creator {
456 Ok(creator) => {
457 if creator.is_none() {
458 debug!(
459 "Skip creating full-text index due to no columns require indexing, region_id: {}, file_id: {}",
460 self.metadata.region_id, file_id,
461 );
462 }
463 return creator;
464 }
465 Err(err) => err,
466 };
467
468 if cfg!(any(test, feature = "test")) {
469 panic!(
470 "Failed to create full-text indexer, region_id: {}, file_id: {}, err: {:?}",
471 self.metadata.region_id, file_id, err
472 );
473 } else {
474 warn!(
475 err; "Failed to create full-text indexer, region_id: {}, file_id: {}",
476 self.metadata.region_id, file_id,
477 );
478 }
479
480 None
481 }
482
483 fn build_bloom_filter_indexer(&self, file_id: FileId) -> Option<BloomFilterIndexer> {
484 let create = match self.build_type {
485 IndexBuildType::Flush => self.bloom_filter_index_config.create_on_flush.auto(),
486 IndexBuildType::Compact => self.bloom_filter_index_config.create_on_compaction.auto(),
487 _ => true,
488 };
489
490 if !create {
491 debug!(
492 "Skip creating bloom filter due to config, region_id: {}, file_id: {}",
493 self.metadata.region_id, file_id,
494 );
495 return None;
496 }
497
498 let mem_limit = self.bloom_filter_index_config.mem_threshold_on_create();
499 let indexer = BloomFilterIndexer::new(
500 file_id,
501 &self.metadata,
502 self.intermediate_manager.clone(),
503 mem_limit,
504 );
505
506 let err = match indexer {
507 Ok(indexer) => {
508 if indexer.is_none() {
509 debug!(
510 "Skip creating bloom filter due to no columns require indexing, region_id: {}, file_id: {}",
511 self.metadata.region_id, file_id,
512 );
513 }
514 return indexer;
515 }
516 Err(err) => err,
517 };
518
519 if cfg!(any(test, feature = "test")) {
520 panic!(
521 "Failed to create bloom filter, region_id: {}, file_id: {}, err: {:?}",
522 self.metadata.region_id, file_id, err
523 );
524 } else {
525 warn!(
526 err; "Failed to create bloom filter, region_id: {}, file_id: {}",
527 self.metadata.region_id, file_id,
528 );
529 }
530
531 None
532 }
533
534 #[cfg(feature = "vector_index")]
535 fn build_vector_indexer(&self, file_id: FileId) -> Option<VectorIndexer> {
536 let create = match self.build_type {
537 IndexBuildType::Flush => self.vector_index_config.create_on_flush.auto(),
538 IndexBuildType::Compact => self.vector_index_config.create_on_compaction.auto(),
539 _ => true,
540 };
541
542 if !create {
543 debug!(
544 "Skip creating vector index due to config, region_id: {}, file_id: {}",
545 self.metadata.region_id, file_id,
546 );
547 return None;
548 }
549
550 let vector_index_options = self.metadata.vector_indexed_column_ids();
552 if vector_index_options.is_empty() {
553 debug!(
554 "No vector columns to index, skip creating vector index, region_id: {}, file_id: {}",
555 self.metadata.region_id, file_id,
556 );
557 return None;
558 }
559
560 let mem_limit = self.vector_index_config.mem_threshold_on_create();
561 let indexer = VectorIndexer::new(
562 file_id,
563 &self.metadata,
564 self.intermediate_manager.clone(),
565 mem_limit,
566 &vector_index_options,
567 );
568
569 let err = match indexer {
570 Ok(indexer) => {
571 if indexer.is_none() {
572 debug!(
573 "Skip creating vector index due to no columns require indexing, region_id: {}, file_id: {}",
574 self.metadata.region_id, file_id,
575 );
576 }
577 return indexer;
578 }
579 Err(err) => err,
580 };
581
582 if cfg!(any(test, feature = "test")) {
583 panic!(
584 "Failed to create vector index, region_id: {}, file_id: {}, err: {:?}",
585 self.metadata.region_id, file_id, err
586 );
587 } else {
588 warn!(
589 err; "Failed to create vector index, region_id: {}, file_id: {}",
590 self.metadata.region_id, file_id,
591 );
592 }
593
594 None
595 }
596}
597
598#[derive(Debug, Clone, IntoStaticStr, PartialEq)]
600pub enum IndexBuildType {
601 SchemaChange,
603 Flush,
605 Compact,
607 Manual,
609}
610
611impl IndexBuildType {
612 fn as_str(&self) -> &'static str {
613 self.into()
614 }
615
616 fn priority(&self) -> u8 {
618 match self {
619 IndexBuildType::Manual => 3,
620 IndexBuildType::SchemaChange => 2,
621 IndexBuildType::Flush => 1,
622 IndexBuildType::Compact => 0,
623 }
624 }
625}
626
627impl From<OperationType> for IndexBuildType {
628 fn from(op_type: OperationType) -> Self {
629 match op_type {
630 OperationType::Flush => IndexBuildType::Flush,
631 OperationType::Compact => IndexBuildType::Compact,
632 }
633 }
634}
635
636#[derive(Debug, Clone, PartialEq, Eq, Hash)]
638pub enum IndexBuildOutcome {
639 Finished,
640 Aborted(String),
641}
642
643pub type ResultMpscSender = Sender<Result<IndexBuildOutcome>>;
645
646#[derive(Clone)]
647pub struct IndexBuildTask {
648 pub file: FileHandle,
650 pub file_meta: FileMeta,
652 pub reason: IndexBuildType,
653 pub access_layer: AccessLayerRef,
654 pub(crate) listener: WorkerListener,
655 pub(crate) manifest_ctx: ManifestContextRef,
656 pub write_cache: Option<WriteCacheRef>,
657 pub file_purger: FilePurgerRef,
658 pub indexer_builder: Arc<dyn IndexerBuilder + Send + Sync>,
661 pub(crate) request_sender: Sender<WorkerRequestWithTime>,
663 pub(crate) result_sender: ResultMpscSender,
665}
666
667impl std::fmt::Debug for IndexBuildTask {
668 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
669 f.debug_struct("IndexBuildTask")
670 .field("region_id", &self.file_meta.region_id)
671 .field("file_id", &self.file_meta.file_id)
672 .field("reason", &self.reason)
673 .finish()
674 }
675}
676
677impl IndexBuildTask {
678 pub async fn on_success(&self, outcome: IndexBuildOutcome) {
680 let _ = self.result_sender.send(Ok(outcome)).await;
681 }
682
683 pub async fn on_failure(&self, err: Arc<Error>) {
685 let _ = self
686 .result_sender
687 .send(Err(err.clone()).context(BuildIndexAsyncSnafu {
688 region_id: self.file_meta.region_id,
689 }))
690 .await;
691 }
692
693 fn into_index_build_job(mut self, version_control: VersionControlRef) -> Job {
694 Box::pin(async move {
695 self.do_index_build(version_control).await;
696 })
697 }
698
699 async fn do_index_build(&mut self, version_control: VersionControlRef) {
700 self.listener
701 .on_index_build_begin(RegionFileId::new(
702 self.file_meta.region_id,
703 self.file_meta.file_id,
704 ))
705 .await;
706 match self.index_build(version_control).await {
707 Ok(outcome) => self.on_success(outcome).await,
708 Err(e) => {
709 warn!(
710 e; "Index build task failed, region: {}, file_id: {}",
711 self.file_meta.region_id, self.file_meta.file_id,
712 );
713 self.on_failure(e.into()).await
714 }
715 }
716 let worker_request = WorkerRequest::Background {
717 region_id: self.file_meta.region_id,
718 notify: BackgroundNotify::IndexBuildStopped(IndexBuildStopped {
719 region_id: self.file_meta.region_id,
720 file_id: self.file_meta.file_id,
721 }),
722 };
723 let _ = self
724 .request_sender
725 .send(WorkerRequestWithTime::new(worker_request))
726 .await;
727 }
728
729 async fn check_sst_file_exists(&self, version_control: &VersionControlRef) -> bool {
731 let file_id = self.file_meta.file_id;
732 let level = self.file_meta.level;
733 let version = version_control.current().version;
735
736 let Some(level_files) = version.ssts.levels().get(level as usize) else {
737 warn!(
738 "File id {} not found in level {} for index build, region: {}",
739 file_id, level, self.file_meta.region_id
740 );
741 return false;
742 };
743
744 match level_files.files.get(&file_id) {
745 Some(handle) if !handle.is_deleted() && !handle.compacting() => {
746 true
750 }
751 _ => {
752 warn!(
753 "File id {} not found in region version for index build, region: {}",
754 file_id, self.file_meta.region_id
755 );
756 false
757 }
758 }
759 }
760
761 async fn index_build(
762 &mut self,
763 version_control: VersionControlRef,
764 ) -> Result<IndexBuildOutcome> {
765 let new_index_version = if self.file_meta.index_file_size > 0 {
767 self.file_meta.index_version + 1
769 } else {
770 0 };
772
773 let index_file_id = self.file_meta.file_id;
775 let mut indexer = self
776 .indexer_builder
777 .build(index_file_id, new_index_version)
778 .await;
779
780 if !self.check_sst_file_exists(&version_control).await {
782 indexer.abort().await;
784 self.listener
785 .on_index_build_abort(RegionFileId::new(
786 self.file_meta.region_id,
787 self.file_meta.file_id,
788 ))
789 .await;
790 return Ok(IndexBuildOutcome::Aborted(format!(
791 "SST file not found during index build, region: {}, file_id: {}",
792 self.file_meta.region_id, self.file_meta.file_id
793 )));
794 }
795
796 let parquet_reader = self
797 .access_layer
798 .read_sst(self.file.clone()) .build()
800 .await?;
801
802 if let Some(mut parquet_reader) = parquet_reader {
803 loop {
805 match parquet_reader.next_record_batch().await {
806 Ok(Some(batch)) => {
807 indexer.update_flat(&batch).await;
808 }
809 Ok(None) => break,
810 Err(e) => {
811 indexer.abort().await;
812 return Err(e);
813 }
814 }
815 }
816 }
817 let index_output = indexer.finish().await;
818
819 if index_output.file_size > 0 {
820 if !self.check_sst_file_exists(&version_control).await {
822 indexer.abort().await;
824 self.listener
825 .on_index_build_abort(RegionFileId::new(
826 self.file_meta.region_id,
827 self.file_meta.file_id,
828 ))
829 .await;
830 return Ok(IndexBuildOutcome::Aborted(format!(
831 "SST file not found during index build, region: {}, file_id: {}",
832 self.file_meta.region_id, self.file_meta.file_id
833 )));
834 }
835
836 self.maybe_upload_index_file(index_output.clone(), index_file_id, new_index_version)
838 .await?;
839
840 let worker_request = match self.update_manifest(index_output, new_index_version).await {
841 Ok(edit) => {
842 let index_build_finished = IndexBuildFinished {
843 region_id: self.file_meta.region_id,
844 edit,
845 };
846 WorkerRequest::Background {
847 region_id: self.file_meta.region_id,
848 notify: BackgroundNotify::IndexBuildFinished(index_build_finished),
849 }
850 }
851 Err(e) => {
852 let err = Arc::new(e);
853 WorkerRequest::Background {
854 region_id: self.file_meta.region_id,
855 notify: BackgroundNotify::IndexBuildFailed(IndexBuildFailed { err }),
856 }
857 }
858 };
859
860 let _ = self
861 .request_sender
862 .send(WorkerRequestWithTime::new(worker_request))
863 .await;
864 }
865 Ok(IndexBuildOutcome::Finished)
866 }
867
868 async fn maybe_upload_index_file(
869 &self,
870 output: IndexOutput,
871 index_file_id: FileId,
872 index_version: u64,
873 ) -> Result<()> {
874 if let Some(write_cache) = &self.write_cache {
875 let file_id = self.file_meta.file_id;
876 let region_id = self.file_meta.region_id;
877 let remote_store = self.access_layer.object_store();
878 let mut upload_tracker = UploadTracker::new(region_id);
879 let mut err = None;
880 let puffin_key =
881 IndexKey::new(region_id, index_file_id, FileType::Puffin(output.version));
882 let index_id = RegionIndexId::new(RegionFileId::new(region_id, file_id), index_version);
883 let puffin_path = RegionFilePathFactory::new(
884 self.access_layer.table_dir().to_string(),
885 self.access_layer.path_type(),
886 )
887 .build_index_file_path_with_version(index_id);
888 if let Err(e) = write_cache
889 .upload(puffin_key, &puffin_path, remote_store)
890 .await
891 {
892 err = Some(e);
893 }
894 upload_tracker.push_uploaded_file(puffin_path);
895 if let Some(err) = err {
896 upload_tracker
898 .clean(
899 &smallvec![SstInfo {
900 file_id,
901 index_metadata: output,
902 ..Default::default()
903 }],
904 &write_cache.file_cache(),
905 remote_store,
906 )
907 .await;
908 return Err(err);
909 }
910 } else {
911 debug!("write cache is not available, skip uploading index file");
912 }
913 Ok(())
914 }
915
916 async fn update_manifest(
917 &mut self,
918 output: IndexOutput,
919 new_index_version: u64,
920 ) -> Result<RegionEdit> {
921 self.file_meta.available_indexes = output.build_available_indexes();
922 self.file_meta.indexes = output.build_indexes();
923 self.file_meta.index_file_size = output.file_size;
924 self.file_meta.index_version = new_index_version;
925 let edit = RegionEdit {
926 files_to_add: vec![self.file_meta.clone()],
927 files_to_remove: vec![],
928 timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
929 flushed_sequence: None,
930 flushed_entry_id: None,
931 committed_sequence: None,
932 compaction_time_window: None,
933 };
934 let version = self
935 .manifest_ctx
936 .update_manifest(
937 RegionLeaderState::Writable,
938 RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())),
939 false,
940 )
941 .await?;
942 info!(
943 "Successfully update manifest version to {version}, region: {}, reason: {}",
944 self.file_meta.region_id,
945 self.reason.as_str()
946 );
947 Ok(edit)
948 }
949}
950
951impl PartialEq for IndexBuildTask {
952 fn eq(&self, other: &Self) -> bool {
953 self.reason.priority() == other.reason.priority()
954 }
955}
956
957impl Eq for IndexBuildTask {}
958
959impl PartialOrd for IndexBuildTask {
960 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
961 Some(self.cmp(other))
962 }
963}
964
965impl Ord for IndexBuildTask {
966 fn cmp(&self, other: &Self) -> Ordering {
967 self.reason.priority().cmp(&other.reason.priority())
968 }
969}
970
971pub struct IndexBuildStatus {
973 pub region_id: RegionId,
974 pub building_files: HashSet<FileId>,
975 pub pending_tasks: BinaryHeap<IndexBuildTask>,
976}
977
978impl IndexBuildStatus {
979 pub fn new(region_id: RegionId) -> Self {
980 IndexBuildStatus {
981 region_id,
982 building_files: HashSet::new(),
983 pending_tasks: BinaryHeap::new(),
984 }
985 }
986
987 async fn on_failure(self, err: Arc<Error>) {
988 for task in self.pending_tasks {
989 task.on_failure(err.clone()).await;
990 }
991 }
992}
993
994pub struct IndexBuildScheduler {
995 scheduler: SchedulerRef,
997 region_status: HashMap<RegionId, IndexBuildStatus>,
999 files_limit: usize,
1001}
1002
1003impl IndexBuildScheduler {
1005 pub fn new(scheduler: SchedulerRef, files_limit: usize) -> Self {
1006 IndexBuildScheduler {
1007 scheduler,
1008 region_status: HashMap::new(),
1009 files_limit,
1010 }
1011 }
1012
1013 pub(crate) async fn schedule_build(
1014 &mut self,
1015 version_control: &VersionControlRef,
1016 task: IndexBuildTask,
1017 ) -> Result<()> {
1018 let status = self
1019 .region_status
1020 .entry(task.file_meta.region_id)
1021 .or_insert_with(|| IndexBuildStatus::new(task.file_meta.region_id));
1022
1023 if status.building_files.contains(&task.file_meta.file_id) {
1024 let region_file_id =
1025 RegionFileId::new(task.file_meta.region_id, task.file_meta.file_id);
1026 debug!(
1027 "Aborting index build task since index is already being built for region file {:?}",
1028 region_file_id
1029 );
1030 task.on_success(IndexBuildOutcome::Aborted(format!(
1031 "Index is already being built for region file {:?}",
1032 region_file_id
1033 )))
1034 .await;
1035 task.listener.on_index_build_abort(region_file_id).await;
1036 return Ok(());
1037 }
1038
1039 status.pending_tasks.push(task);
1040
1041 self.schedule_next_build_batch(version_control);
1042 Ok(())
1043 }
1044
1045 fn schedule_next_build_batch(&mut self, version_control: &VersionControlRef) {
1047 let mut building_count = 0;
1048 for status in self.region_status.values() {
1049 building_count += status.building_files.len();
1050 }
1051
1052 while building_count < self.files_limit {
1053 if let Some(task) = self.find_next_task() {
1054 let region_id = task.file_meta.region_id;
1055 let file_id = task.file_meta.file_id;
1056 let job = task.into_index_build_job(version_control.clone());
1057 if self.scheduler.schedule(job).is_ok() {
1058 if let Some(status) = self.region_status.get_mut(®ion_id) {
1059 status.building_files.insert(file_id);
1060 building_count += 1;
1061 status
1062 .pending_tasks
1063 .retain(|t| t.file_meta.file_id != file_id);
1064 } else {
1065 error!(
1066 "Region status not found when scheduling index build task, region: {}",
1067 region_id
1068 );
1069 }
1070 } else {
1071 error!(
1072 "Failed to schedule index build job, region: {}, file_id: {}",
1073 region_id, file_id
1074 );
1075 }
1076 } else {
1077 break;
1079 }
1080 }
1081 }
1082
1083 fn find_next_task(&self) -> Option<IndexBuildTask> {
1085 self.region_status
1086 .iter()
1087 .filter_map(|(_, status)| status.pending_tasks.peek())
1088 .max()
1089 .cloned()
1090 }
1091
1092 pub(crate) fn on_task_stopped(
1093 &mut self,
1094 region_id: RegionId,
1095 file_id: FileId,
1096 version_control: &VersionControlRef,
1097 ) {
1098 if let Some(status) = self.region_status.get_mut(®ion_id) {
1099 status.building_files.remove(&file_id);
1100 if status.building_files.is_empty() && status.pending_tasks.is_empty() {
1101 self.region_status.remove(®ion_id);
1103 }
1104 }
1105
1106 self.schedule_next_build_batch(version_control);
1107 }
1108
1109 pub(crate) async fn on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
1110 error!(
1111 err; "Index build scheduler encountered failure for region {}, removing all pending tasks.",
1112 region_id
1113 );
1114 let Some(status) = self.region_status.remove(®ion_id) else {
1115 return;
1116 };
1117 status.on_failure(err).await;
1118 }
1119
1120 pub(crate) async fn on_region_dropped(&mut self, region_id: RegionId) {
1122 self.remove_region_on_failure(
1123 region_id,
1124 Arc::new(RegionDroppedSnafu { region_id }.build()),
1125 )
1126 .await;
1127 }
1128
1129 pub(crate) async fn on_region_closed(&mut self, region_id: RegionId) {
1131 self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()))
1132 .await;
1133 }
1134
1135 pub(crate) async fn on_region_truncated(&mut self, region_id: RegionId) {
1137 self.remove_region_on_failure(
1138 region_id,
1139 Arc::new(RegionTruncatedSnafu { region_id }.build()),
1140 )
1141 .await;
1142 }
1143
1144 async fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
1145 let Some(status) = self.region_status.remove(®ion_id) else {
1146 return;
1147 };
1148 status.on_failure(err).await;
1149 }
1150}
1151
1152pub(crate) fn decode_primary_keys_with_counts(
1155 batch: &RecordBatch,
1156 codec: &IndexValuesCodec,
1157) -> Result<Vec<(CompositeValues, usize)>> {
1158 let primary_key_index = primary_key_column_index(batch.num_columns());
1159 let pk_dict_array = batch
1160 .column(primary_key_index)
1161 .as_any()
1162 .downcast_ref::<PrimaryKeyArray>()
1163 .context(InvalidRecordBatchSnafu {
1164 reason: "Primary key column is not a dictionary array",
1165 })?;
1166 let pk_values_array = pk_dict_array
1167 .values()
1168 .as_any()
1169 .downcast_ref::<BinaryArray>()
1170 .context(InvalidRecordBatchSnafu {
1171 reason: "Primary key values are not binary array",
1172 })?;
1173 let keys = pk_dict_array.keys();
1174
1175 let mut result: Vec<(CompositeValues, usize)> = Vec::new();
1177 let mut prev_key: Option<u32> = None;
1178
1179 let pk_indices = keys.values();
1180 for ¤t_key in pk_indices.iter().take(keys.len()) {
1181 if let Some(prev) = prev_key
1183 && prev == current_key
1184 {
1185 result.last_mut().unwrap().1 += 1;
1187 continue;
1188 }
1189
1190 let pk_bytes = pk_values_array.value(current_key as usize);
1192 let decoded_value = codec.decoder().decode(pk_bytes).context(DecodeSnafu)?;
1193
1194 result.push((decoded_value, 1));
1195 prev_key = Some(current_key);
1196 }
1197
1198 Ok(result)
1199}
1200
1201#[cfg(test)]
1202mod tests {
1203 use std::sync::Arc;
1204
1205 use api::v1::SemanticType;
1206 use common_base::readable_size::ReadableSize;
1207 use datafusion_common::HashMap;
1208 use datatypes::data_type::ConcreteDataType;
1209 use datatypes::schema::{
1210 ColumnSchema, FulltextOptions, SkippingIndexOptions, SkippingIndexType,
1211 };
1212 use object_store::ObjectStore;
1213 use object_store::services::Memory;
1214 use puffin_manager::PuffinManagerFactory;
1215 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1216 use tokio::sync::mpsc;
1217
1218 use super::*;
1219 use crate::access_layer::{FilePathProvider, Metrics, SstWriteRequest, WriteType};
1220 use crate::cache::write_cache::WriteCache;
1221 use crate::config::{FulltextIndexConfig, IndexBuildMode, MitoConfig, Mode};
1222 use crate::memtable::time_partition::TimePartitions;
1223 use crate::region::version::{VersionBuilder, VersionControl};
1224 use crate::sst::file::RegionFileId;
1225 use crate::sst::file_purger::NoopFilePurger;
1226 use crate::sst::location;
1227 use crate::sst::parquet::WriteOptions;
1228 use crate::test_util::memtable_util::EmptyMemtableBuilder;
1229 use crate::test_util::scheduler_util::SchedulerEnv;
1230 use crate::test_util::sst_util::{
1231 new_flat_source_from_record_batches, new_record_batch_by_range, sst_region_metadata,
1232 };
1233
1234 struct MetaConfig {
1235 with_inverted: bool,
1236 with_fulltext: bool,
1237 with_skipping_bloom: bool,
1238 #[cfg(feature = "vector_index")]
1239 with_vector: bool,
1240 }
1241
1242 fn mock_region_metadata(
1243 MetaConfig {
1244 with_inverted,
1245 with_fulltext,
1246 with_skipping_bloom,
1247 #[cfg(feature = "vector_index")]
1248 with_vector,
1249 }: MetaConfig,
1250 ) -> RegionMetadataRef {
1251 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
1252 let mut column_schema = ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false);
1253 if with_inverted {
1254 column_schema = column_schema.with_inverted_index(true);
1255 }
1256 builder
1257 .push_column_metadata(ColumnMetadata {
1258 column_schema,
1259 semantic_type: SemanticType::Field,
1260 column_id: 1,
1261 })
1262 .push_column_metadata(ColumnMetadata {
1263 column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
1264 semantic_type: SemanticType::Field,
1265 column_id: 2,
1266 })
1267 .push_column_metadata(ColumnMetadata {
1268 column_schema: ColumnSchema::new(
1269 "c",
1270 ConcreteDataType::timestamp_millisecond_datatype(),
1271 false,
1272 ),
1273 semantic_type: SemanticType::Timestamp,
1274 column_id: 3,
1275 });
1276
1277 if with_fulltext {
1278 let column_schema =
1279 ColumnSchema::new("text", ConcreteDataType::string_datatype(), true)
1280 .with_fulltext_options(FulltextOptions {
1281 enable: true,
1282 ..Default::default()
1283 })
1284 .unwrap();
1285
1286 let column = ColumnMetadata {
1287 column_schema,
1288 semantic_type: SemanticType::Field,
1289 column_id: 4,
1290 };
1291
1292 builder.push_column_metadata(column);
1293 }
1294
1295 if with_skipping_bloom {
1296 let column_schema =
1297 ColumnSchema::new("bloom", ConcreteDataType::string_datatype(), false)
1298 .with_skipping_options(SkippingIndexOptions::new_unchecked(
1299 42,
1300 0.01,
1301 SkippingIndexType::BloomFilter,
1302 ))
1303 .unwrap();
1304
1305 let column = ColumnMetadata {
1306 column_schema,
1307 semantic_type: SemanticType::Field,
1308 column_id: 5,
1309 };
1310
1311 builder.push_column_metadata(column);
1312 }
1313
1314 #[cfg(feature = "vector_index")]
1315 if with_vector {
1316 use index::vector::VectorIndexOptions;
1317
1318 let options = VectorIndexOptions::default();
1319 let column_schema =
1320 ColumnSchema::new("vec", ConcreteDataType::vector_datatype(4), true)
1321 .with_vector_index_options(&options)
1322 .unwrap();
1323 let column = ColumnMetadata {
1324 column_schema,
1325 semantic_type: SemanticType::Field,
1326 column_id: 6,
1327 };
1328
1329 builder.push_column_metadata(column);
1330 }
1331
1332 Arc::new(builder.build().unwrap())
1333 }
1334
1335 fn mock_object_store() -> ObjectStore {
1336 ObjectStore::new(Memory::default()).unwrap().finish()
1337 }
1338
1339 async fn mock_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
1340 IntermediateManager::init_fs(path).await.unwrap()
1341 }
1342 struct NoopPathProvider;
1343
1344 impl FilePathProvider for NoopPathProvider {
1345 fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
1346 unreachable!()
1347 }
1348
1349 fn build_index_file_path_with_version(&self, _index_id: RegionIndexId) -> String {
1350 unreachable!()
1351 }
1352
1353 fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
1354 unreachable!()
1355 }
1356 }
1357
1358 async fn mock_sst_file(
1359 metadata: RegionMetadataRef,
1360 env: &SchedulerEnv,
1361 build_mode: IndexBuildMode,
1362 ) -> SstInfo {
1363 let source = new_flat_source_from_record_batches(vec![
1364 new_record_batch_by_range(&["a", "d"], 0, 60),
1365 new_record_batch_by_range(&["b", "f"], 0, 40),
1366 new_record_batch_by_range(&["b", "h"], 100, 200),
1367 ]);
1368 let mut index_config = MitoConfig::default().index;
1369 index_config.build_mode = build_mode;
1370 let write_request = SstWriteRequest {
1371 op_type: OperationType::Flush,
1372 metadata: metadata.clone(),
1373 source,
1374 storage: None,
1375 max_sequence: None,
1376 sst_write_format: Default::default(),
1377 cache_manager: Default::default(),
1378 index_options: IndexOptions::default(),
1379 index_config,
1380 inverted_index_config: Default::default(),
1381 fulltext_index_config: Default::default(),
1382 bloom_filter_index_config: Default::default(),
1383 #[cfg(feature = "vector_index")]
1384 vector_index_config: Default::default(),
1385 };
1386 let mut metrics = Metrics::new(WriteType::Flush);
1387 env.access_layer
1388 .write_sst(write_request, &WriteOptions::default(), &mut metrics)
1389 .await
1390 .unwrap()
1391 .remove(0)
1392 }
1393
1394 async fn mock_version_control(
1395 metadata: RegionMetadataRef,
1396 file_purger: FilePurgerRef,
1397 files: HashMap<FileId, FileMeta>,
1398 ) -> VersionControlRef {
1399 let mutable = Arc::new(TimePartitions::new(
1400 metadata.clone(),
1401 Arc::new(EmptyMemtableBuilder::default()),
1402 0,
1403 None,
1404 ));
1405 let version_builder = VersionBuilder::new(metadata, mutable)
1406 .add_files(file_purger, files.values().cloned())
1407 .build();
1408 Arc::new(VersionControl::new(version_builder))
1409 }
1410
1411 async fn mock_indexer_builder(
1412 metadata: RegionMetadataRef,
1413 env: &SchedulerEnv,
1414 ) -> Arc<dyn IndexerBuilder + Send + Sync> {
1415 let (dir, factory) = PuffinManagerFactory::new_for_test_async("mock_indexer_builder").await;
1416 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1417 let puffin_manager = factory.build(
1418 env.access_layer.object_store().clone(),
1419 RegionFilePathFactory::new(
1420 env.access_layer.table_dir().to_string(),
1421 env.access_layer.path_type(),
1422 ),
1423 );
1424 Arc::new(IndexerBuilderImpl {
1425 build_type: IndexBuildType::Flush,
1426 metadata,
1427 row_group_size: 1024,
1428 puffin_manager,
1429 write_cache_enabled: false,
1430 intermediate_manager: intm_manager,
1431 index_options: IndexOptions::default(),
1432 inverted_index_config: InvertedIndexConfig::default(),
1433 fulltext_index_config: FulltextIndexConfig::default(),
1434 bloom_filter_index_config: BloomFilterConfig::default(),
1435 #[cfg(feature = "vector_index")]
1436 vector_index_config: Default::default(),
1437 })
1438 }
1439
1440 #[tokio::test]
1441 async fn test_build_indexer_basic() {
1442 let (dir, factory) =
1443 PuffinManagerFactory::new_for_test_async("test_build_indexer_basic_").await;
1444 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1445
1446 let metadata = mock_region_metadata(MetaConfig {
1447 with_inverted: true,
1448 with_fulltext: true,
1449 with_skipping_bloom: true,
1450 #[cfg(feature = "vector_index")]
1451 with_vector: false,
1452 });
1453 let indexer = IndexerBuilderImpl {
1454 build_type: IndexBuildType::Flush,
1455 metadata,
1456 row_group_size: 1024,
1457 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1458 write_cache_enabled: false,
1459 intermediate_manager: intm_manager,
1460 index_options: IndexOptions::default(),
1461 inverted_index_config: InvertedIndexConfig::default(),
1462 fulltext_index_config: FulltextIndexConfig::default(),
1463 bloom_filter_index_config: BloomFilterConfig::default(),
1464 #[cfg(feature = "vector_index")]
1465 vector_index_config: Default::default(),
1466 }
1467 .build(FileId::random(), 0)
1468 .await;
1469
1470 assert!(indexer.inverted_indexer.is_some());
1471 assert!(indexer.fulltext_indexer.is_some());
1472 assert!(indexer.bloom_filter_indexer.is_some());
1473 }
1474
1475 #[tokio::test]
1476 async fn test_build_indexer_disable_create() {
1477 let (dir, factory) =
1478 PuffinManagerFactory::new_for_test_async("test_build_indexer_disable_create_").await;
1479 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1480
1481 let metadata = mock_region_metadata(MetaConfig {
1482 with_inverted: true,
1483 with_fulltext: true,
1484 with_skipping_bloom: true,
1485 #[cfg(feature = "vector_index")]
1486 with_vector: false,
1487 });
1488 let indexer = IndexerBuilderImpl {
1489 build_type: IndexBuildType::Flush,
1490 metadata: metadata.clone(),
1491 row_group_size: 1024,
1492 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1493 write_cache_enabled: false,
1494 intermediate_manager: intm_manager.clone(),
1495 index_options: IndexOptions::default(),
1496 inverted_index_config: InvertedIndexConfig {
1497 create_on_flush: Mode::Disable,
1498 ..Default::default()
1499 },
1500 fulltext_index_config: FulltextIndexConfig::default(),
1501 bloom_filter_index_config: BloomFilterConfig::default(),
1502 #[cfg(feature = "vector_index")]
1503 vector_index_config: Default::default(),
1504 }
1505 .build(FileId::random(), 0)
1506 .await;
1507
1508 assert!(indexer.inverted_indexer.is_none());
1509 assert!(indexer.fulltext_indexer.is_some());
1510 assert!(indexer.bloom_filter_indexer.is_some());
1511
1512 let indexer = IndexerBuilderImpl {
1513 build_type: IndexBuildType::Compact,
1514 metadata: metadata.clone(),
1515 row_group_size: 1024,
1516 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1517 write_cache_enabled: false,
1518 intermediate_manager: intm_manager.clone(),
1519 index_options: IndexOptions::default(),
1520 inverted_index_config: InvertedIndexConfig::default(),
1521 fulltext_index_config: FulltextIndexConfig {
1522 create_on_compaction: Mode::Disable,
1523 ..Default::default()
1524 },
1525 bloom_filter_index_config: BloomFilterConfig::default(),
1526 #[cfg(feature = "vector_index")]
1527 vector_index_config: Default::default(),
1528 }
1529 .build(FileId::random(), 0)
1530 .await;
1531
1532 assert!(indexer.inverted_indexer.is_some());
1533 assert!(indexer.fulltext_indexer.is_none());
1534 assert!(indexer.bloom_filter_indexer.is_some());
1535
1536 let indexer = IndexerBuilderImpl {
1537 build_type: IndexBuildType::Compact,
1538 metadata,
1539 row_group_size: 1024,
1540 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1541 write_cache_enabled: false,
1542 intermediate_manager: intm_manager,
1543 index_options: IndexOptions::default(),
1544 inverted_index_config: InvertedIndexConfig::default(),
1545 fulltext_index_config: FulltextIndexConfig::default(),
1546 bloom_filter_index_config: BloomFilterConfig {
1547 create_on_compaction: Mode::Disable,
1548 ..Default::default()
1549 },
1550 #[cfg(feature = "vector_index")]
1551 vector_index_config: Default::default(),
1552 }
1553 .build(FileId::random(), 0)
1554 .await;
1555
1556 assert!(indexer.inverted_indexer.is_some());
1557 assert!(indexer.fulltext_indexer.is_some());
1558 assert!(indexer.bloom_filter_indexer.is_none());
1559 }
1560
1561 #[tokio::test]
1562 async fn test_build_indexer_no_required() {
1563 let (dir, factory) =
1564 PuffinManagerFactory::new_for_test_async("test_build_indexer_no_required_").await;
1565 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1566
1567 let metadata = mock_region_metadata(MetaConfig {
1568 with_inverted: false,
1569 with_fulltext: true,
1570 with_skipping_bloom: true,
1571 #[cfg(feature = "vector_index")]
1572 with_vector: false,
1573 });
1574 let indexer = IndexerBuilderImpl {
1575 build_type: IndexBuildType::Flush,
1576 metadata: metadata.clone(),
1577 row_group_size: 1024,
1578 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1579 write_cache_enabled: false,
1580 intermediate_manager: intm_manager.clone(),
1581 index_options: IndexOptions::default(),
1582 inverted_index_config: InvertedIndexConfig::default(),
1583 fulltext_index_config: FulltextIndexConfig::default(),
1584 bloom_filter_index_config: BloomFilterConfig::default(),
1585 #[cfg(feature = "vector_index")]
1586 vector_index_config: Default::default(),
1587 }
1588 .build(FileId::random(), 0)
1589 .await;
1590
1591 assert!(indexer.inverted_indexer.is_none());
1592 assert!(indexer.fulltext_indexer.is_some());
1593 assert!(indexer.bloom_filter_indexer.is_some());
1594
1595 let metadata = mock_region_metadata(MetaConfig {
1596 with_inverted: true,
1597 with_fulltext: false,
1598 with_skipping_bloom: true,
1599 #[cfg(feature = "vector_index")]
1600 with_vector: false,
1601 });
1602 let indexer = IndexerBuilderImpl {
1603 build_type: IndexBuildType::Flush,
1604 metadata: metadata.clone(),
1605 row_group_size: 1024,
1606 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1607 write_cache_enabled: false,
1608 intermediate_manager: intm_manager.clone(),
1609 index_options: IndexOptions::default(),
1610 inverted_index_config: InvertedIndexConfig::default(),
1611 fulltext_index_config: FulltextIndexConfig::default(),
1612 bloom_filter_index_config: BloomFilterConfig::default(),
1613 #[cfg(feature = "vector_index")]
1614 vector_index_config: Default::default(),
1615 }
1616 .build(FileId::random(), 0)
1617 .await;
1618
1619 assert!(indexer.inverted_indexer.is_some());
1620 assert!(indexer.fulltext_indexer.is_none());
1621 assert!(indexer.bloom_filter_indexer.is_some());
1622
1623 let metadata = mock_region_metadata(MetaConfig {
1624 with_inverted: true,
1625 with_fulltext: true,
1626 with_skipping_bloom: false,
1627 #[cfg(feature = "vector_index")]
1628 with_vector: false,
1629 });
1630 let indexer = IndexerBuilderImpl {
1631 build_type: IndexBuildType::Flush,
1632 metadata: metadata.clone(),
1633 row_group_size: 1024,
1634 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1635 write_cache_enabled: false,
1636 intermediate_manager: intm_manager,
1637 index_options: IndexOptions::default(),
1638 inverted_index_config: InvertedIndexConfig::default(),
1639 fulltext_index_config: FulltextIndexConfig::default(),
1640 bloom_filter_index_config: BloomFilterConfig::default(),
1641 #[cfg(feature = "vector_index")]
1642 vector_index_config: Default::default(),
1643 }
1644 .build(FileId::random(), 0)
1645 .await;
1646
1647 assert!(indexer.inverted_indexer.is_some());
1648 assert!(indexer.fulltext_indexer.is_some());
1649 assert!(indexer.bloom_filter_indexer.is_none());
1650 }
1651
1652 #[tokio::test]
1653 async fn test_build_indexer_zero_row_group() {
1654 let (dir, factory) =
1655 PuffinManagerFactory::new_for_test_async("test_build_indexer_zero_row_group_").await;
1656 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1657
1658 let metadata = mock_region_metadata(MetaConfig {
1659 with_inverted: true,
1660 with_fulltext: true,
1661 with_skipping_bloom: true,
1662 #[cfg(feature = "vector_index")]
1663 with_vector: false,
1664 });
1665 let indexer = IndexerBuilderImpl {
1666 build_type: IndexBuildType::Flush,
1667 metadata,
1668 row_group_size: 0,
1669 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1670 write_cache_enabled: false,
1671 intermediate_manager: intm_manager,
1672 index_options: IndexOptions::default(),
1673 inverted_index_config: InvertedIndexConfig::default(),
1674 fulltext_index_config: FulltextIndexConfig::default(),
1675 bloom_filter_index_config: BloomFilterConfig::default(),
1676 #[cfg(feature = "vector_index")]
1677 vector_index_config: Default::default(),
1678 }
1679 .build(FileId::random(), 0)
1680 .await;
1681
1682 assert!(indexer.inverted_indexer.is_none());
1683 }
1684
1685 #[cfg(feature = "vector_index")]
1686 #[tokio::test]
1687 async fn test_update_flat_builds_vector_index() {
1688 use datatypes::arrow::array::BinaryBuilder;
1689 use datatypes::arrow::datatypes::{DataType, Field, Schema};
1690
1691 struct TestPathProvider;
1692
1693 impl FilePathProvider for TestPathProvider {
1694 fn build_index_file_path(&self, file_id: RegionFileId) -> String {
1695 format!("index/{}.puffin", file_id)
1696 }
1697
1698 fn build_index_file_path_with_version(&self, index_id: RegionIndexId) -> String {
1699 format!("index/{}.puffin", index_id)
1700 }
1701
1702 fn build_sst_file_path(&self, file_id: RegionFileId) -> String {
1703 format!("sst/{}.parquet", file_id)
1704 }
1705 }
1706
1707 fn f32s_to_bytes(values: &[f32]) -> Vec<u8> {
1708 let mut bytes = Vec::with_capacity(values.len() * 4);
1709 for v in values {
1710 bytes.extend_from_slice(&v.to_le_bytes());
1711 }
1712 bytes
1713 }
1714
1715 let (dir, factory) =
1716 PuffinManagerFactory::new_for_test_async("test_update_flat_builds_vector_index_").await;
1717 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1718
1719 let metadata = mock_region_metadata(MetaConfig {
1720 with_inverted: false,
1721 with_fulltext: false,
1722 with_skipping_bloom: false,
1723 with_vector: true,
1724 });
1725
1726 let mut indexer = IndexerBuilderImpl {
1727 build_type: IndexBuildType::Flush,
1728 metadata,
1729 row_group_size: 1024,
1730 puffin_manager: factory.build(mock_object_store(), TestPathProvider),
1731 write_cache_enabled: false,
1732 intermediate_manager: intm_manager,
1733 index_options: IndexOptions::default(),
1734 inverted_index_config: InvertedIndexConfig::default(),
1735 fulltext_index_config: FulltextIndexConfig::default(),
1736 bloom_filter_index_config: BloomFilterConfig::default(),
1737 vector_index_config: Default::default(),
1738 }
1739 .build(FileId::random(), 0)
1740 .await;
1741
1742 assert!(indexer.vector_indexer.is_some());
1743
1744 let vec1 = f32s_to_bytes(&[1.0, 0.0, 0.0, 0.0]);
1745 let vec2 = f32s_to_bytes(&[0.0, 1.0, 0.0, 0.0]);
1746
1747 let mut builder = BinaryBuilder::with_capacity(2, vec1.len() + vec2.len());
1748 builder.append_value(&vec1);
1749 builder.append_value(&vec2);
1750
1751 let schema = Arc::new(Schema::new(vec![Field::new("vec", DataType::Binary, true)]));
1752 let batch = RecordBatch::try_new(schema, vec![Arc::new(builder.finish())]).unwrap();
1753
1754 indexer.update_flat(&batch).await;
1755 let output = indexer.finish().await;
1756
1757 assert!(output.vector_index.is_available());
1758 assert!(output.vector_index.columns.contains(&6));
1759 }
1760
1761 #[tokio::test]
1762 async fn test_index_build_task_sst_not_exist() {
1763 let env = SchedulerEnv::new().await;
1764 let (tx, _rx) = mpsc::channel(4);
1765 let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1766 let mut scheduler = env.mock_index_build_scheduler(4);
1767 let metadata = Arc::new(sst_region_metadata());
1768 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1769 let file_purger = Arc::new(NoopFilePurger {});
1770 let files = HashMap::new();
1771 let version_control =
1772 mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1773 let region_id = metadata.region_id;
1774 let indexer_builder = mock_indexer_builder(metadata, &env).await;
1775
1776 let file_meta = FileMeta {
1777 region_id,
1778 file_id: FileId::random(),
1779 file_size: 100,
1780 ..Default::default()
1781 };
1782
1783 let file = FileHandle::new(file_meta.clone(), file_purger.clone());
1784
1785 let task = IndexBuildTask {
1787 file,
1788 file_meta,
1789 reason: IndexBuildType::Flush,
1790 access_layer: env.access_layer.clone(),
1791 listener: WorkerListener::default(),
1792 manifest_ctx,
1793 write_cache: None,
1794 file_purger,
1795 indexer_builder,
1796 request_sender: tx,
1797 result_sender: result_tx,
1798 };
1799
1800 scheduler
1802 .schedule_build(&version_control, task)
1803 .await
1804 .unwrap();
1805 match result_rx.recv().await.unwrap() {
1806 Ok(outcome) => {
1807 if outcome == IndexBuildOutcome::Finished {
1808 panic!("Expect aborted result due to missing SST file")
1809 }
1810 }
1811 _ => panic!("Expect aborted result due to missing SST file"),
1812 }
1813 }
1814
1815 #[tokio::test]
1816 async fn test_index_build_task_sst_exist() {
1817 let env = SchedulerEnv::new().await;
1818 let mut scheduler = env.mock_index_build_scheduler(4);
1819 let metadata = Arc::new(sst_region_metadata());
1820 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1821 let region_id = metadata.region_id;
1822 let file_purger = Arc::new(NoopFilePurger {});
1823 let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
1824 let file_meta = FileMeta {
1825 region_id,
1826 file_id: sst_info.file_id,
1827 file_size: sst_info.file_size,
1828 max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
1829 index_file_size: sst_info.index_metadata.file_size,
1830 num_rows: sst_info.num_rows as u64,
1831 num_row_groups: sst_info.num_row_groups,
1832 ..Default::default()
1833 };
1834 let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1835 let version_control =
1836 mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1837 let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1838
1839 let file = FileHandle::new(file_meta.clone(), file_purger.clone());
1840
1841 let (tx, mut rx) = mpsc::channel(4);
1843 let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1844 let task = IndexBuildTask {
1845 file,
1846 file_meta: file_meta.clone(),
1847 reason: IndexBuildType::Flush,
1848 access_layer: env.access_layer.clone(),
1849 listener: WorkerListener::default(),
1850 manifest_ctx,
1851 write_cache: None,
1852 file_purger,
1853 indexer_builder,
1854 request_sender: tx,
1855 result_sender: result_tx,
1856 };
1857
1858 scheduler
1859 .schedule_build(&version_control, task)
1860 .await
1861 .unwrap();
1862
1863 match result_rx.recv().await.unwrap() {
1865 Ok(outcome) => {
1866 assert_eq!(outcome, IndexBuildOutcome::Finished);
1867 }
1868 _ => panic!("Expect finished result"),
1869 }
1870
1871 let worker_req = rx.recv().await.unwrap().request;
1873 match worker_req {
1874 WorkerRequest::Background {
1875 region_id: req_region_id,
1876 notify: BackgroundNotify::IndexBuildFinished(finished),
1877 } => {
1878 assert_eq!(req_region_id, region_id);
1879 assert_eq!(finished.edit.files_to_add.len(), 1);
1880 let updated_meta = &finished.edit.files_to_add[0];
1881
1882 assert!(!updated_meta.available_indexes.is_empty());
1884 assert!(updated_meta.index_file_size > 0);
1885 assert_eq!(updated_meta.file_id, file_meta.file_id);
1886 }
1887 _ => panic!("Unexpected worker request: {:?}", worker_req),
1888 }
1889 }
1890
1891 async fn schedule_index_build_task_with_mode(build_mode: IndexBuildMode) {
1892 let env = SchedulerEnv::new().await;
1893 let mut scheduler = env.mock_index_build_scheduler(4);
1894 let metadata = Arc::new(sst_region_metadata());
1895 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1896 let file_purger = Arc::new(NoopFilePurger {});
1897 let region_id = metadata.region_id;
1898 let sst_info = mock_sst_file(metadata.clone(), &env, build_mode.clone()).await;
1899 let file_meta = FileMeta {
1900 region_id,
1901 file_id: sst_info.file_id,
1902 file_size: sst_info.file_size,
1903 max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
1904 index_file_size: sst_info.index_metadata.file_size,
1905 num_rows: sst_info.num_rows as u64,
1906 num_row_groups: sst_info.num_row_groups,
1907 ..Default::default()
1908 };
1909 let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1910 let version_control =
1911 mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1912 let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1913
1914 let file = FileHandle::new(file_meta.clone(), file_purger.clone());
1915
1916 let (tx, _rx) = mpsc::channel(4);
1918 let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1919 let task = IndexBuildTask {
1920 file,
1921 file_meta: file_meta.clone(),
1922 reason: IndexBuildType::Flush,
1923 access_layer: env.access_layer.clone(),
1924 listener: WorkerListener::default(),
1925 manifest_ctx,
1926 write_cache: None,
1927 file_purger,
1928 indexer_builder,
1929 request_sender: tx,
1930 result_sender: result_tx,
1931 };
1932
1933 scheduler
1934 .schedule_build(&version_control, task)
1935 .await
1936 .unwrap();
1937
1938 let puffin_path = location::index_file_path(
1939 env.access_layer.table_dir(),
1940 RegionIndexId::new(RegionFileId::new(region_id, file_meta.file_id), 0),
1941 env.access_layer.path_type(),
1942 );
1943
1944 if build_mode == IndexBuildMode::Async {
1945 assert!(
1947 !env.access_layer
1948 .object_store()
1949 .exists(&puffin_path)
1950 .await
1951 .unwrap()
1952 );
1953 } else {
1954 assert!(
1956 env.access_layer
1957 .object_store()
1958 .exists(&puffin_path)
1959 .await
1960 .unwrap()
1961 );
1962 }
1963
1964 match result_rx.recv().await.unwrap() {
1966 Ok(outcome) => {
1967 assert_eq!(outcome, IndexBuildOutcome::Finished);
1968 }
1969 _ => panic!("Expect finished result"),
1970 }
1971
1972 assert!(
1974 env.access_layer
1975 .object_store()
1976 .exists(&puffin_path)
1977 .await
1978 .unwrap()
1979 );
1980 }
1981
1982 #[tokio::test]
1983 async fn test_index_build_task_build_mode() {
1984 schedule_index_build_task_with_mode(IndexBuildMode::Async).await;
1985 schedule_index_build_task_with_mode(IndexBuildMode::Sync).await;
1986 }
1987
1988 #[tokio::test]
1989 async fn test_index_build_task_no_index() {
1990 let env = SchedulerEnv::new().await;
1991 let mut scheduler = env.mock_index_build_scheduler(4);
1992 let mut metadata = sst_region_metadata();
1993 metadata.column_metadatas.iter_mut().for_each(|col| {
1995 col.column_schema.set_inverted_index(false);
1996 let _ = col.column_schema.unset_skipping_options();
1997 });
1998 let region_id = metadata.region_id;
1999 let metadata = Arc::new(metadata);
2000 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
2001 let file_purger = Arc::new(NoopFilePurger {});
2002 let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
2003 let file_meta = FileMeta {
2004 region_id,
2005 file_id: sst_info.file_id,
2006 file_size: sst_info.file_size,
2007 max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
2008 index_file_size: sst_info.index_metadata.file_size,
2009 num_rows: sst_info.num_rows as u64,
2010 num_row_groups: sst_info.num_row_groups,
2011 ..Default::default()
2012 };
2013 let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
2014 let version_control =
2015 mock_version_control(metadata.clone(), file_purger.clone(), files).await;
2016 let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
2017
2018 let file = FileHandle::new(file_meta.clone(), file_purger.clone());
2019
2020 let (tx, mut rx) = mpsc::channel(4);
2022 let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
2023 let task = IndexBuildTask {
2024 file,
2025 file_meta: file_meta.clone(),
2026 reason: IndexBuildType::Flush,
2027 access_layer: env.access_layer.clone(),
2028 listener: WorkerListener::default(),
2029 manifest_ctx,
2030 write_cache: None,
2031 file_purger,
2032 indexer_builder,
2033 request_sender: tx,
2034 result_sender: result_tx,
2035 };
2036
2037 scheduler
2038 .schedule_build(&version_control, task)
2039 .await
2040 .unwrap();
2041
2042 match result_rx.recv().await.unwrap() {
2044 Ok(outcome) => {
2045 assert_eq!(outcome, IndexBuildOutcome::Finished);
2046 }
2047 _ => panic!("Expect finished result"),
2048 }
2049
2050 let _ = rx.recv().await.is_none();
2052 }
2053
2054 #[tokio::test]
2055 async fn test_index_build_task_with_write_cache() {
2056 let env = SchedulerEnv::new().await;
2057 let mut scheduler = env.mock_index_build_scheduler(4);
2058 let metadata = Arc::new(sst_region_metadata());
2059 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
2060 let file_purger = Arc::new(NoopFilePurger {});
2061 let region_id = metadata.region_id;
2062
2063 let (dir, factory) = PuffinManagerFactory::new_for_test_async("test_write_cache").await;
2064 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
2065
2066 let write_cache = Arc::new(
2068 WriteCache::new_fs(
2069 dir.path().to_str().unwrap(),
2070 ReadableSize::mb(10),
2071 None,
2072 None,
2073 true, factory,
2075 intm_manager,
2076 ReadableSize::mb(10),
2077 )
2078 .await
2079 .unwrap(),
2080 );
2081 let indexer_builder = Arc::new(IndexerBuilderImpl {
2083 build_type: IndexBuildType::Flush,
2084 metadata: metadata.clone(),
2085 row_group_size: 1024,
2086 puffin_manager: write_cache.build_puffin_manager().clone(),
2087 write_cache_enabled: true,
2088 intermediate_manager: write_cache.intermediate_manager().clone(),
2089 index_options: IndexOptions::default(),
2090 inverted_index_config: InvertedIndexConfig::default(),
2091 fulltext_index_config: FulltextIndexConfig::default(),
2092 bloom_filter_index_config: BloomFilterConfig::default(),
2093 #[cfg(feature = "vector_index")]
2094 vector_index_config: Default::default(),
2095 });
2096
2097 let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
2098 let file_meta = FileMeta {
2099 region_id,
2100 file_id: sst_info.file_id,
2101 file_size: sst_info.file_size,
2102 index_file_size: sst_info.index_metadata.file_size,
2103 num_rows: sst_info.num_rows as u64,
2104 num_row_groups: sst_info.num_row_groups,
2105 ..Default::default()
2106 };
2107 let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
2108 let version_control =
2109 mock_version_control(metadata.clone(), file_purger.clone(), files).await;
2110
2111 let file = FileHandle::new(file_meta.clone(), file_purger.clone());
2112
2113 let (tx, mut _rx) = mpsc::channel(4);
2115 let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
2116 let task = IndexBuildTask {
2117 file,
2118 file_meta: file_meta.clone(),
2119 reason: IndexBuildType::Flush,
2120 access_layer: env.access_layer.clone(),
2121 listener: WorkerListener::default(),
2122 manifest_ctx,
2123 write_cache: Some(write_cache.clone()),
2124 file_purger,
2125 indexer_builder,
2126 request_sender: tx,
2127 result_sender: result_tx,
2128 };
2129
2130 scheduler
2131 .schedule_build(&version_control, task)
2132 .await
2133 .unwrap();
2134
2135 match result_rx.recv().await.unwrap() {
2137 Ok(outcome) => {
2138 assert_eq!(outcome, IndexBuildOutcome::Finished);
2139 }
2140 _ => panic!("Expect finished result"),
2141 }
2142
2143 let index_key = IndexKey::new(
2145 region_id,
2146 file_meta.file_id,
2147 FileType::Puffin(sst_info.index_metadata.version),
2148 );
2149 assert!(write_cache.file_cache().contains_key(&index_key));
2150 }
2151
2152 async fn create_mock_task_for_schedule(
2153 env: &SchedulerEnv,
2154 file_id: FileId,
2155 region_id: RegionId,
2156 reason: IndexBuildType,
2157 ) -> IndexBuildTask {
2158 let metadata = Arc::new(sst_region_metadata());
2159 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
2160 let file_purger = Arc::new(NoopFilePurger {});
2161 let indexer_builder = mock_indexer_builder(metadata, env).await;
2162 let (tx, _rx) = mpsc::channel(4);
2163 let (result_tx, _result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
2164
2165 let file_meta = FileMeta {
2166 region_id,
2167 file_id,
2168 file_size: 100,
2169 ..Default::default()
2170 };
2171
2172 let file = FileHandle::new(file_meta.clone(), file_purger.clone());
2173
2174 IndexBuildTask {
2175 file,
2176 file_meta,
2177 reason,
2178 access_layer: env.access_layer.clone(),
2179 listener: WorkerListener::default(),
2180 manifest_ctx,
2181 write_cache: None,
2182 file_purger,
2183 indexer_builder,
2184 request_sender: tx,
2185 result_sender: result_tx,
2186 }
2187 }
2188
2189 #[tokio::test]
2190 async fn test_scheduler_comprehensive() {
2191 let env = SchedulerEnv::new().await;
2192 let mut scheduler = env.mock_index_build_scheduler(2);
2193 let metadata = Arc::new(sst_region_metadata());
2194 let region_id = metadata.region_id;
2195 let file_purger = Arc::new(NoopFilePurger {});
2196
2197 let file_id1 = FileId::random();
2199 let file_id2 = FileId::random();
2200 let file_id3 = FileId::random();
2201 let file_id4 = FileId::random();
2202 let file_id5 = FileId::random();
2203
2204 let mut files = HashMap::new();
2205 for file_id in [file_id1, file_id2, file_id3, file_id4, file_id5] {
2206 files.insert(
2207 file_id,
2208 FileMeta {
2209 region_id,
2210 file_id,
2211 file_size: 100,
2212 ..Default::default()
2213 },
2214 );
2215 }
2216
2217 let version_control = mock_version_control(metadata, file_purger, files).await;
2218
2219 let task1 =
2221 create_mock_task_for_schedule(&env, file_id1, region_id, IndexBuildType::Flush).await;
2222 assert!(
2223 scheduler
2224 .schedule_build(&version_control, task1)
2225 .await
2226 .is_ok()
2227 );
2228 assert!(scheduler.region_status.contains_key(®ion_id));
2229 let status = scheduler.region_status.get(®ion_id).unwrap();
2230 assert_eq!(status.building_files.len(), 1);
2231 assert!(status.building_files.contains(&file_id1));
2232
2233 let task1_dup =
2235 create_mock_task_for_schedule(&env, file_id1, region_id, IndexBuildType::Flush).await;
2236 scheduler
2237 .schedule_build(&version_control, task1_dup)
2238 .await
2239 .unwrap();
2240 let status = scheduler.region_status.get(®ion_id).unwrap();
2241 assert_eq!(status.building_files.len(), 1); let task2 =
2245 create_mock_task_for_schedule(&env, file_id2, region_id, IndexBuildType::Flush).await;
2246 scheduler
2247 .schedule_build(&version_control, task2)
2248 .await
2249 .unwrap();
2250 let status = scheduler.region_status.get(®ion_id).unwrap();
2251 assert_eq!(status.building_files.len(), 2); assert_eq!(status.pending_tasks.len(), 0);
2253
2254 let task3 =
2257 create_mock_task_for_schedule(&env, file_id3, region_id, IndexBuildType::Compact).await;
2258 let task4 =
2259 create_mock_task_for_schedule(&env, file_id4, region_id, IndexBuildType::SchemaChange)
2260 .await;
2261 let task5 =
2262 create_mock_task_for_schedule(&env, file_id5, region_id, IndexBuildType::Manual).await;
2263
2264 scheduler
2265 .schedule_build(&version_control, task3)
2266 .await
2267 .unwrap();
2268 scheduler
2269 .schedule_build(&version_control, task4)
2270 .await
2271 .unwrap();
2272 scheduler
2273 .schedule_build(&version_control, task5)
2274 .await
2275 .unwrap();
2276
2277 let status = scheduler.region_status.get(®ion_id).unwrap();
2278 assert_eq!(status.building_files.len(), 2); assert_eq!(status.pending_tasks.len(), 3); scheduler.on_task_stopped(region_id, file_id1, &version_control);
2283 let status = scheduler.region_status.get(®ion_id).unwrap();
2284 assert!(!status.building_files.contains(&file_id1));
2285 assert_eq!(status.building_files.len(), 2); assert_eq!(status.pending_tasks.len(), 2); assert!(status.building_files.contains(&file_id5));
2289
2290 scheduler.on_task_stopped(region_id, file_id2, &version_control);
2292 let status = scheduler.region_status.get(®ion_id).unwrap();
2293 assert_eq!(status.building_files.len(), 2);
2294 assert_eq!(status.pending_tasks.len(), 1); assert!(status.building_files.contains(&file_id4)); scheduler.on_task_stopped(region_id, file_id5, &version_control);
2299 scheduler.on_task_stopped(region_id, file_id4, &version_control);
2300
2301 let status = scheduler.region_status.get(®ion_id).unwrap();
2302 assert_eq!(status.building_files.len(), 1); assert_eq!(status.pending_tasks.len(), 0);
2304 assert!(status.building_files.contains(&file_id3));
2305
2306 scheduler.on_task_stopped(region_id, file_id3, &version_control);
2307
2308 assert!(!scheduler.region_status.contains_key(®ion_id));
2310
2311 let task6 =
2313 create_mock_task_for_schedule(&env, file_id1, region_id, IndexBuildType::Flush).await;
2314 let task7 =
2315 create_mock_task_for_schedule(&env, file_id2, region_id, IndexBuildType::Flush).await;
2316 let task8 =
2317 create_mock_task_for_schedule(&env, file_id3, region_id, IndexBuildType::Manual).await;
2318
2319 scheduler
2320 .schedule_build(&version_control, task6)
2321 .await
2322 .unwrap();
2323 scheduler
2324 .schedule_build(&version_control, task7)
2325 .await
2326 .unwrap();
2327 scheduler
2328 .schedule_build(&version_control, task8)
2329 .await
2330 .unwrap();
2331
2332 assert!(scheduler.region_status.contains_key(®ion_id));
2333 let status = scheduler.region_status.get(®ion_id).unwrap();
2334 assert_eq!(status.building_files.len(), 2);
2335 assert_eq!(status.pending_tasks.len(), 1);
2336
2337 scheduler.on_region_dropped(region_id).await;
2338 assert!(!scheduler.region_status.contains_key(®ion_id));
2339 }
2340}