1pub(crate) mod bloom_filter;
16pub(crate) mod fulltext_index;
17mod indexer;
18pub mod intermediate;
19pub(crate) mod inverted_index;
20pub mod puffin_manager;
21mod statistics;
22pub(crate) mod store;
23
24use std::num::NonZeroUsize;
25use std::sync::Arc;
26
27use bloom_filter::creator::BloomFilterIndexer;
28use common_telemetry::{debug, info, warn};
29use datatypes::arrow::array::BinaryArray;
30use datatypes::arrow::record_batch::RecordBatch;
31use mito_codec::index::IndexValuesCodec;
32use mito_codec::row_converter::CompositeValues;
33use puffin_manager::SstPuffinManager;
34use smallvec::{SmallVec, smallvec};
35use snafu::{OptionExt, ResultExt};
36use statistics::{ByteCount, RowCount};
37use store_api::metadata::RegionMetadataRef;
38use store_api::storage::{ColumnId, FileId, RegionId};
39use strum::IntoStaticStr;
40use tokio::sync::mpsc::Sender;
41
42use crate::access_layer::{AccessLayerRef, FilePathProvider, OperationType, RegionFilePathFactory};
43use crate::cache::file_cache::{FileType, IndexKey};
44use crate::cache::write_cache::{UploadTracker, WriteCacheRef};
45use crate::config::{BloomFilterConfig, FulltextIndexConfig, InvertedIndexConfig};
46use crate::error::{BuildIndexAsyncSnafu, DecodeSnafu, Error, InvalidRecordBatchSnafu, Result};
47use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
48use crate::metrics::INDEX_CREATE_MEMORY_USAGE;
49use crate::read::{Batch, BatchReader};
50use crate::region::options::IndexOptions;
51use crate::region::version::VersionControlRef;
52use crate::region::{ManifestContextRef, RegionLeaderState};
53use crate::request::{
54 BackgroundNotify, IndexBuildFailed, IndexBuildFinished, WorkerRequest, WorkerRequestWithTime,
55};
56use crate::schedule::scheduler::{Job, SchedulerRef};
57use crate::sst::file::{FileHandle, FileMeta, IndexType, RegionFileId};
58use crate::sst::file_purger::FilePurgerRef;
59use crate::sst::index::fulltext_index::creator::FulltextIndexer;
60use crate::sst::index::intermediate::IntermediateManager;
61use crate::sst::index::inverted_index::creator::InvertedIndexer;
62use crate::sst::parquet::SstInfo;
63use crate::sst::parquet::flat_format::primary_key_column_index;
64use crate::sst::parquet::format::PrimaryKeyArray;
65use crate::worker::WorkerListener;
66
67pub(crate) const TYPE_INVERTED_INDEX: &str = "inverted_index";
68pub(crate) const TYPE_FULLTEXT_INDEX: &str = "fulltext_index";
69pub(crate) const TYPE_BLOOM_FILTER_INDEX: &str = "bloom_filter_index";
70
71#[derive(Debug, Clone, Default)]
73pub struct IndexOutput {
74 pub file_size: u64,
76 pub inverted_index: InvertedIndexOutput,
78 pub fulltext_index: FulltextIndexOutput,
80 pub bloom_filter: BloomFilterOutput,
82}
83
84impl IndexOutput {
85 pub fn build_available_indexes(&self) -> SmallVec<[IndexType; 4]> {
86 let mut indexes = SmallVec::new();
87 if self.inverted_index.is_available() {
88 indexes.push(IndexType::InvertedIndex);
89 }
90 if self.fulltext_index.is_available() {
91 indexes.push(IndexType::FulltextIndex);
92 }
93 if self.bloom_filter.is_available() {
94 indexes.push(IndexType::BloomFilterIndex);
95 }
96 indexes
97 }
98}
99
100#[derive(Debug, Clone, Default)]
102pub struct IndexBaseOutput {
103 pub index_size: ByteCount,
105 pub row_count: RowCount,
107 pub columns: Vec<ColumnId>,
109}
110
111impl IndexBaseOutput {
112 pub fn is_available(&self) -> bool {
113 self.index_size > 0
114 }
115}
116
117pub type InvertedIndexOutput = IndexBaseOutput;
119pub type FulltextIndexOutput = IndexBaseOutput;
121pub type BloomFilterOutput = IndexBaseOutput;
123
124#[derive(Default)]
126pub struct Indexer {
127 file_id: FileId,
128 region_id: RegionId,
129 puffin_manager: Option<SstPuffinManager>,
130 inverted_indexer: Option<InvertedIndexer>,
131 last_mem_inverted_index: usize,
132 fulltext_indexer: Option<FulltextIndexer>,
133 last_mem_fulltext_index: usize,
134 bloom_filter_indexer: Option<BloomFilterIndexer>,
135 last_mem_bloom_filter: usize,
136 intermediate_manager: Option<IntermediateManager>,
137}
138
139impl Indexer {
140 pub async fn update(&mut self, batch: &mut Batch) {
142 self.do_update(batch).await;
143
144 self.flush_mem_metrics();
145 }
146
147 pub async fn update_flat(&mut self, batch: &RecordBatch) {
149 self.do_update_flat(batch).await;
150
151 self.flush_mem_metrics();
152 }
153
154 pub async fn finish(&mut self) -> IndexOutput {
156 let output = self.do_finish().await;
157
158 self.flush_mem_metrics();
159 output
160 }
161
162 pub async fn abort(&mut self) {
164 self.do_abort().await;
165
166 self.flush_mem_metrics();
167 }
168
169 fn flush_mem_metrics(&mut self) {
170 let inverted_mem = self
171 .inverted_indexer
172 .as_ref()
173 .map_or(0, |creator| creator.memory_usage());
174 INDEX_CREATE_MEMORY_USAGE
175 .with_label_values(&[TYPE_INVERTED_INDEX])
176 .add(inverted_mem as i64 - self.last_mem_inverted_index as i64);
177 self.last_mem_inverted_index = inverted_mem;
178
179 let fulltext_mem = self
180 .fulltext_indexer
181 .as_ref()
182 .map_or(0, |creator| creator.memory_usage());
183 INDEX_CREATE_MEMORY_USAGE
184 .with_label_values(&[TYPE_FULLTEXT_INDEX])
185 .add(fulltext_mem as i64 - self.last_mem_fulltext_index as i64);
186 self.last_mem_fulltext_index = fulltext_mem;
187
188 let bloom_filter_mem = self
189 .bloom_filter_indexer
190 .as_ref()
191 .map_or(0, |creator| creator.memory_usage());
192 INDEX_CREATE_MEMORY_USAGE
193 .with_label_values(&[TYPE_BLOOM_FILTER_INDEX])
194 .add(bloom_filter_mem as i64 - self.last_mem_bloom_filter as i64);
195 self.last_mem_bloom_filter = bloom_filter_mem;
196 }
197}
198
199#[async_trait::async_trait]
200pub trait IndexerBuilder {
201 async fn build(&self, file_id: FileId) -> Indexer;
203}
204#[derive(Clone)]
205pub(crate) struct IndexerBuilderImpl {
206 pub(crate) build_type: IndexBuildType,
207 pub(crate) metadata: RegionMetadataRef,
208 pub(crate) row_group_size: usize,
209 pub(crate) puffin_manager: SstPuffinManager,
210 pub(crate) intermediate_manager: IntermediateManager,
211 pub(crate) index_options: IndexOptions,
212 pub(crate) inverted_index_config: InvertedIndexConfig,
213 pub(crate) fulltext_index_config: FulltextIndexConfig,
214 pub(crate) bloom_filter_index_config: BloomFilterConfig,
215}
216
217#[async_trait::async_trait]
218impl IndexerBuilder for IndexerBuilderImpl {
219 async fn build(&self, file_id: FileId) -> Indexer {
221 let mut indexer = Indexer {
222 file_id,
223 region_id: self.metadata.region_id,
224 ..Default::default()
225 };
226
227 indexer.inverted_indexer = self.build_inverted_indexer(file_id);
228 indexer.fulltext_indexer = self.build_fulltext_indexer(file_id).await;
229 indexer.bloom_filter_indexer = self.build_bloom_filter_indexer(file_id);
230 indexer.intermediate_manager = Some(self.intermediate_manager.clone());
231 if indexer.inverted_indexer.is_none()
232 && indexer.fulltext_indexer.is_none()
233 && indexer.bloom_filter_indexer.is_none()
234 {
235 indexer.abort().await;
236 return Indexer::default();
237 }
238
239 indexer.puffin_manager = Some(self.puffin_manager.clone());
240 indexer
241 }
242}
243
244impl IndexerBuilderImpl {
245 fn build_inverted_indexer(&self, file_id: FileId) -> Option<InvertedIndexer> {
246 let create = match self.build_type {
247 IndexBuildType::Flush => self.inverted_index_config.create_on_flush.auto(),
248 IndexBuildType::Compact => self.inverted_index_config.create_on_compaction.auto(),
249 _ => true,
250 };
251
252 if !create {
253 debug!(
254 "Skip creating inverted index due to config, region_id: {}, file_id: {}",
255 self.metadata.region_id, file_id,
256 );
257 return None;
258 }
259
260 let indexed_column_ids = self.metadata.inverted_indexed_column_ids(
261 self.index_options.inverted_index.ignore_column_ids.iter(),
262 );
263 if indexed_column_ids.is_empty() {
264 debug!(
265 "No columns to be indexed, skip creating inverted index, region_id: {}, file_id: {}",
266 self.metadata.region_id, file_id,
267 );
268 return None;
269 }
270
271 let Some(mut segment_row_count) =
272 NonZeroUsize::new(self.index_options.inverted_index.segment_row_count)
273 else {
274 warn!(
275 "Segment row count is 0, skip creating index, region_id: {}, file_id: {}",
276 self.metadata.region_id, file_id,
277 );
278 return None;
279 };
280
281 let Some(row_group_size) = NonZeroUsize::new(self.row_group_size) else {
282 warn!(
283 "Row group size is 0, skip creating index, region_id: {}, file_id: {}",
284 self.metadata.region_id, file_id,
285 );
286 return None;
287 };
288
289 if row_group_size.get() % segment_row_count.get() != 0 {
291 segment_row_count = row_group_size;
292 }
293
294 let indexer = InvertedIndexer::new(
295 file_id,
296 &self.metadata,
297 self.intermediate_manager.clone(),
298 self.inverted_index_config.mem_threshold_on_create(),
299 segment_row_count,
300 indexed_column_ids,
301 );
302
303 Some(indexer)
304 }
305
306 async fn build_fulltext_indexer(&self, file_id: FileId) -> Option<FulltextIndexer> {
307 let create = match self.build_type {
308 IndexBuildType::Flush => self.fulltext_index_config.create_on_flush.auto(),
309 IndexBuildType::Compact => self.fulltext_index_config.create_on_compaction.auto(),
310 _ => true,
311 };
312
313 if !create {
314 debug!(
315 "Skip creating full-text index due to config, region_id: {}, file_id: {}",
316 self.metadata.region_id, file_id,
317 );
318 return None;
319 }
320
321 let mem_limit = self.fulltext_index_config.mem_threshold_on_create();
322 let creator = FulltextIndexer::new(
323 &self.metadata.region_id,
324 &file_id,
325 &self.intermediate_manager,
326 &self.metadata,
327 self.fulltext_index_config.compress,
328 mem_limit,
329 )
330 .await;
331
332 let err = match creator {
333 Ok(creator) => {
334 if creator.is_none() {
335 debug!(
336 "Skip creating full-text index due to no columns require indexing, region_id: {}, file_id: {}",
337 self.metadata.region_id, file_id,
338 );
339 }
340 return creator;
341 }
342 Err(err) => err,
343 };
344
345 if cfg!(any(test, feature = "test")) {
346 panic!(
347 "Failed to create full-text indexer, region_id: {}, file_id: {}, err: {:?}",
348 self.metadata.region_id, file_id, err
349 );
350 } else {
351 warn!(
352 err; "Failed to create full-text indexer, region_id: {}, file_id: {}",
353 self.metadata.region_id, file_id,
354 );
355 }
356
357 None
358 }
359
360 fn build_bloom_filter_indexer(&self, file_id: FileId) -> Option<BloomFilterIndexer> {
361 let create = match self.build_type {
362 IndexBuildType::Flush => self.bloom_filter_index_config.create_on_flush.auto(),
363 IndexBuildType::Compact => self.bloom_filter_index_config.create_on_compaction.auto(),
364 _ => true,
365 };
366
367 if !create {
368 debug!(
369 "Skip creating bloom filter due to config, region_id: {}, file_id: {}",
370 self.metadata.region_id, file_id,
371 );
372 return None;
373 }
374
375 let mem_limit = self.bloom_filter_index_config.mem_threshold_on_create();
376 let indexer = BloomFilterIndexer::new(
377 file_id,
378 &self.metadata,
379 self.intermediate_manager.clone(),
380 mem_limit,
381 );
382
383 let err = match indexer {
384 Ok(indexer) => {
385 if indexer.is_none() {
386 debug!(
387 "Skip creating bloom filter due to no columns require indexing, region_id: {}, file_id: {}",
388 self.metadata.region_id, file_id,
389 );
390 }
391 return indexer;
392 }
393 Err(err) => err,
394 };
395
396 if cfg!(any(test, feature = "test")) {
397 panic!(
398 "Failed to create bloom filter, region_id: {}, file_id: {}, err: {:?}",
399 self.metadata.region_id, file_id, err
400 );
401 } else {
402 warn!(
403 err; "Failed to create bloom filter, region_id: {}, file_id: {}",
404 self.metadata.region_id, file_id,
405 );
406 }
407
408 None
409 }
410}
411
412#[derive(Debug, Clone, PartialEq, IntoStaticStr)]
414pub enum IndexBuildType {
415 SchemaChange,
417 Flush,
419 Compact,
421 Manual,
423}
424
425impl IndexBuildType {
426 fn as_str(&self) -> &'static str {
427 self.into()
428 }
429}
430
431impl From<OperationType> for IndexBuildType {
432 fn from(op_type: OperationType) -> Self {
433 match op_type {
434 OperationType::Flush => IndexBuildType::Flush,
435 OperationType::Compact => IndexBuildType::Compact,
436 }
437 }
438}
439
440#[derive(Debug, Clone, PartialEq, Eq, Hash)]
442pub enum IndexBuildOutcome {
443 Finished,
444 Aborted(String),
445}
446
447pub type ResultMpscSender = Sender<Result<IndexBuildOutcome>>;
449
450pub struct IndexBuildTask {
451 pub file_meta: FileMeta,
453 pub reason: IndexBuildType,
454 pub access_layer: AccessLayerRef,
455 pub(crate) listener: WorkerListener,
456 pub(crate) manifest_ctx: ManifestContextRef,
457 pub write_cache: Option<WriteCacheRef>,
458 pub file_purger: FilePurgerRef,
459 pub indexer_builder: Arc<dyn IndexerBuilder + Send + Sync>,
462 pub(crate) request_sender: Sender<WorkerRequestWithTime>,
464 pub(crate) result_sender: ResultMpscSender,
466}
467
468impl IndexBuildTask {
469 pub async fn on_success(&mut self, outcome: IndexBuildOutcome) {
471 let _ = self.result_sender.send(Ok(outcome)).await;
472 }
473
474 pub async fn on_failure(&mut self, err: Arc<Error>) {
476 let _ = self
477 .result_sender
478 .send(Err(err.clone()).context(BuildIndexAsyncSnafu {
479 region_id: self.file_meta.region_id,
480 }))
481 .await;
482 }
483
484 fn into_index_build_job(mut self, version_control: VersionControlRef) -> Job {
485 Box::pin(async move {
486 self.do_index_build(version_control).await;
487 })
488 }
489
490 async fn do_index_build(&mut self, version_control: VersionControlRef) {
491 self.listener
492 .on_index_build_begin(RegionFileId::new(
493 self.file_meta.region_id,
494 self.file_meta.file_id,
495 ))
496 .await;
497 match self.index_build(version_control).await {
498 Ok(outcome) => self.on_success(outcome).await,
499 Err(e) => {
500 warn!(
501 e; "Index build task failed, region: {}, file_id: {}",
502 self.file_meta.region_id, self.file_meta.file_id,
503 );
504 self.on_failure(e.into()).await
505 }
506 };
507 }
508
509 async fn check_sst_file_exists(&self, version_control: &VersionControlRef) -> bool {
511 let file_id = self.file_meta.file_id;
512 let level = self.file_meta.level;
513 let version = version_control.current().version;
515
516 let Some(level_files) = version.ssts.levels().get(level as usize) else {
517 warn!(
518 "File id {} not found in level {} for index build, region: {}",
519 file_id, level, self.file_meta.region_id
520 );
521 return false;
522 };
523
524 match level_files.files.get(&file_id) {
525 Some(handle) if !handle.is_deleted() && !handle.compacting() => {
526 true
530 }
531 _ => {
532 warn!(
533 "File id {} not found in region version for index build, region: {}",
534 file_id, self.file_meta.region_id
535 );
536 false
537 }
538 }
539 }
540
541 async fn index_build(
542 &mut self,
543 version_control: VersionControlRef,
544 ) -> Result<IndexBuildOutcome> {
545 let mut indexer = self.indexer_builder.build(self.file_meta.file_id).await;
546
547 if !self.check_sst_file_exists(&version_control).await {
549 indexer.abort().await;
551 self.listener
552 .on_index_build_abort(RegionFileId::new(
553 self.file_meta.region_id,
554 self.file_meta.file_id,
555 ))
556 .await;
557 return Ok(IndexBuildOutcome::Aborted(format!(
558 "SST file not found during index build, region: {}, file_id: {}",
559 self.file_meta.region_id, self.file_meta.file_id
560 )));
561 }
562
563 let mut parquet_reader = self
564 .access_layer
565 .read_sst(FileHandle::new(
566 self.file_meta.clone(),
567 self.file_purger.clone(),
568 ))
569 .build()
570 .await?;
571
572 loop {
574 match parquet_reader.next_batch().await {
575 Ok(Some(batch)) => {
576 indexer.update(&mut batch.clone()).await;
577 }
578 Ok(None) => break,
579 Err(e) => {
580 indexer.abort().await;
581 return Err(e);
582 }
583 }
584 }
585 let index_output = indexer.finish().await;
586
587 if index_output.file_size > 0 {
588 if !self.check_sst_file_exists(&version_control).await {
590 indexer.abort().await;
592 self.listener
593 .on_index_build_abort(RegionFileId::new(
594 self.file_meta.region_id,
595 self.file_meta.file_id,
596 ))
597 .await;
598 return Ok(IndexBuildOutcome::Aborted(format!(
599 "SST file not found during index build, region: {}, file_id: {}",
600 self.file_meta.region_id, self.file_meta.file_id
601 )));
602 }
603
604 self.maybe_upload_index_file(index_output.clone()).await?;
606
607 let worker_request = match self.update_manifest(index_output).await {
608 Ok(edit) => {
609 let index_build_finished = IndexBuildFinished {
610 region_id: self.file_meta.region_id,
611 edit,
612 };
613 WorkerRequest::Background {
614 region_id: self.file_meta.region_id,
615 notify: BackgroundNotify::IndexBuildFinished(index_build_finished),
616 }
617 }
618 Err(e) => {
619 let err = Arc::new(e);
620 WorkerRequest::Background {
621 region_id: self.file_meta.region_id,
622 notify: BackgroundNotify::IndexBuildFailed(IndexBuildFailed { err }),
623 }
624 }
625 };
626
627 let _ = self
628 .request_sender
629 .send(WorkerRequestWithTime::new(worker_request))
630 .await;
631 }
632 Ok(IndexBuildOutcome::Finished)
633 }
634
635 async fn maybe_upload_index_file(&self, output: IndexOutput) -> Result<()> {
636 if let Some(write_cache) = &self.write_cache {
637 let file_id = self.file_meta.file_id;
638 let region_id = self.file_meta.region_id;
639 let remote_store = self.access_layer.object_store();
640 let mut upload_tracker = UploadTracker::new(region_id);
641 let mut err = None;
642 let puffin_key = IndexKey::new(region_id, file_id, FileType::Puffin);
643 let puffin_path = RegionFilePathFactory::new(
644 self.access_layer.table_dir().to_string(),
645 self.access_layer.path_type(),
646 )
647 .build_index_file_path(RegionFileId::new(region_id, file_id));
648 if let Err(e) = write_cache
649 .upload(puffin_key, &puffin_path, remote_store)
650 .await
651 {
652 err = Some(e);
653 }
654 upload_tracker.push_uploaded_file(puffin_path);
655 if let Some(err) = err {
656 upload_tracker
658 .clean(
659 &smallvec![SstInfo {
660 file_id,
661 index_metadata: output,
662 ..Default::default()
663 }],
664 &write_cache.file_cache(),
665 remote_store,
666 )
667 .await;
668 return Err(err);
669 }
670 } else {
671 debug!("write cache is not available, skip uploading index file");
672 }
673 Ok(())
674 }
675
676 async fn update_manifest(&mut self, output: IndexOutput) -> Result<RegionEdit> {
677 self.file_meta.available_indexes = output.build_available_indexes();
678 self.file_meta.index_file_size = output.file_size;
679 let edit = RegionEdit {
680 files_to_add: vec![self.file_meta.clone()],
681 files_to_remove: vec![],
682 timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
683 flushed_sequence: None,
684 flushed_entry_id: None,
685 committed_sequence: None,
686 compaction_time_window: None,
687 };
688 let version = self
689 .manifest_ctx
690 .update_manifest(
691 RegionLeaderState::Writable,
692 RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone())),
693 )
694 .await?;
695 info!(
696 "Successfully update manifest version to {version}, region: {}, reason: {}",
697 self.file_meta.region_id,
698 self.reason.as_str()
699 );
700 Ok(edit)
701 }
702}
703
704#[derive(Clone)]
705pub struct IndexBuildScheduler {
706 scheduler: SchedulerRef,
707}
708
709impl IndexBuildScheduler {
710 pub fn new(scheduler: SchedulerRef) -> Self {
711 IndexBuildScheduler { scheduler }
712 }
713
714 pub(crate) fn schedule_build(
715 &mut self,
716 version_control: &VersionControlRef,
717 task: IndexBuildTask,
718 ) -> Result<()> {
719 let job = task.into_index_build_job(version_control.clone());
721 self.scheduler.schedule(job)?;
722 Ok(())
723 }
724}
725
726pub(crate) fn decode_primary_keys_with_counts(
729 batch: &RecordBatch,
730 codec: &IndexValuesCodec,
731) -> Result<Vec<(CompositeValues, usize)>> {
732 let primary_key_index = primary_key_column_index(batch.num_columns());
733 let pk_dict_array = batch
734 .column(primary_key_index)
735 .as_any()
736 .downcast_ref::<PrimaryKeyArray>()
737 .context(InvalidRecordBatchSnafu {
738 reason: "Primary key column is not a dictionary array",
739 })?;
740 let pk_values_array = pk_dict_array
741 .values()
742 .as_any()
743 .downcast_ref::<BinaryArray>()
744 .context(InvalidRecordBatchSnafu {
745 reason: "Primary key values are not binary array",
746 })?;
747 let keys = pk_dict_array.keys();
748
749 let mut result: Vec<(CompositeValues, usize)> = Vec::new();
751 let mut prev_key: Option<u32> = None;
752
753 for i in 0..keys.len() {
754 let current_key = keys.value(i);
755
756 if let Some(prev) = prev_key
758 && prev == current_key
759 {
760 result.last_mut().unwrap().1 += 1;
762 continue;
763 }
764
765 let pk_bytes = pk_values_array.value(current_key as usize);
767 let decoded_value = codec.decoder().decode(pk_bytes).context(DecodeSnafu)?;
768
769 result.push((decoded_value, 1));
770 prev_key = Some(current_key);
771 }
772
773 Ok(result)
774}
775
776#[cfg(test)]
777mod tests {
778 use std::sync::Arc;
779
780 use api::v1::SemanticType;
781 use common_base::readable_size::ReadableSize;
782 use datafusion_common::HashMap;
783 use datatypes::data_type::ConcreteDataType;
784 use datatypes::schema::{
785 ColumnSchema, FulltextOptions, SkippingIndexOptions, SkippingIndexType,
786 };
787 use object_store::ObjectStore;
788 use object_store::services::Memory;
789 use puffin_manager::PuffinManagerFactory;
790 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
791 use tokio::sync::mpsc;
792
793 use super::*;
794 use crate::access_layer::{FilePathProvider, SstWriteRequest, WriteType};
795 use crate::cache::write_cache::WriteCache;
796 use crate::config::{FulltextIndexConfig, IndexBuildMode, MitoConfig, Mode};
797 use crate::memtable::time_partition::TimePartitions;
798 use crate::region::version::{VersionBuilder, VersionControl};
799 use crate::sst::file::RegionFileId;
800 use crate::sst::file_purger::NoopFilePurger;
801 use crate::sst::location;
802 use crate::sst::parquet::WriteOptions;
803 use crate::test_util::memtable_util::EmptyMemtableBuilder;
804 use crate::test_util::scheduler_util::SchedulerEnv;
805 use crate::test_util::sst_util::{new_batch_by_range, new_source, sst_region_metadata};
806
807 struct MetaConfig {
808 with_inverted: bool,
809 with_fulltext: bool,
810 with_skipping_bloom: bool,
811 }
812
813 fn mock_region_metadata(
814 MetaConfig {
815 with_inverted,
816 with_fulltext,
817 with_skipping_bloom,
818 }: MetaConfig,
819 ) -> RegionMetadataRef {
820 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 2));
821 let mut column_schema = ColumnSchema::new("a", ConcreteDataType::int64_datatype(), false);
822 if with_inverted {
823 column_schema = column_schema.with_inverted_index(true);
824 }
825 builder
826 .push_column_metadata(ColumnMetadata {
827 column_schema,
828 semantic_type: SemanticType::Field,
829 column_id: 1,
830 })
831 .push_column_metadata(ColumnMetadata {
832 column_schema: ColumnSchema::new("b", ConcreteDataType::float64_datatype(), false),
833 semantic_type: SemanticType::Field,
834 column_id: 2,
835 })
836 .push_column_metadata(ColumnMetadata {
837 column_schema: ColumnSchema::new(
838 "c",
839 ConcreteDataType::timestamp_millisecond_datatype(),
840 false,
841 ),
842 semantic_type: SemanticType::Timestamp,
843 column_id: 3,
844 });
845
846 if with_fulltext {
847 let column_schema =
848 ColumnSchema::new("text", ConcreteDataType::string_datatype(), true)
849 .with_fulltext_options(FulltextOptions {
850 enable: true,
851 ..Default::default()
852 })
853 .unwrap();
854
855 let column = ColumnMetadata {
856 column_schema,
857 semantic_type: SemanticType::Field,
858 column_id: 4,
859 };
860
861 builder.push_column_metadata(column);
862 }
863
864 if with_skipping_bloom {
865 let column_schema =
866 ColumnSchema::new("bloom", ConcreteDataType::string_datatype(), false)
867 .with_skipping_options(SkippingIndexOptions::new_unchecked(
868 42,
869 0.01,
870 SkippingIndexType::BloomFilter,
871 ))
872 .unwrap();
873
874 let column = ColumnMetadata {
875 column_schema,
876 semantic_type: SemanticType::Field,
877 column_id: 5,
878 };
879
880 builder.push_column_metadata(column);
881 }
882
883 Arc::new(builder.build().unwrap())
884 }
885
886 fn mock_object_store() -> ObjectStore {
887 ObjectStore::new(Memory::default()).unwrap().finish()
888 }
889
890 async fn mock_intm_mgr(path: impl AsRef<str>) -> IntermediateManager {
891 IntermediateManager::init_fs(path).await.unwrap()
892 }
893 struct NoopPathProvider;
894
895 impl FilePathProvider for NoopPathProvider {
896 fn build_index_file_path(&self, _file_id: RegionFileId) -> String {
897 unreachable!()
898 }
899
900 fn build_sst_file_path(&self, _file_id: RegionFileId) -> String {
901 unreachable!()
902 }
903 }
904
905 async fn mock_sst_file(
906 metadata: RegionMetadataRef,
907 env: &SchedulerEnv,
908 build_mode: IndexBuildMode,
909 ) -> SstInfo {
910 let source = new_source(&[
911 new_batch_by_range(&["a", "d"], 0, 60),
912 new_batch_by_range(&["b", "f"], 0, 40),
913 new_batch_by_range(&["b", "h"], 100, 200),
914 ]);
915 let mut index_config = MitoConfig::default().index;
916 index_config.build_mode = build_mode;
917 let write_request = SstWriteRequest {
918 op_type: OperationType::Flush,
919 metadata: metadata.clone(),
920 source: either::Left(source),
921 storage: None,
922 max_sequence: None,
923 cache_manager: Default::default(),
924 index_options: IndexOptions::default(),
925 index_config,
926 inverted_index_config: Default::default(),
927 fulltext_index_config: Default::default(),
928 bloom_filter_index_config: Default::default(),
929 };
930 env.access_layer
931 .write_sst(write_request, &WriteOptions::default(), WriteType::Flush)
932 .await
933 .unwrap()
934 .0
935 .remove(0)
936 }
937
938 async fn mock_version_control(
939 metadata: RegionMetadataRef,
940 file_purger: FilePurgerRef,
941 files: HashMap<FileId, FileMeta>,
942 ) -> VersionControlRef {
943 let mutable = Arc::new(TimePartitions::new(
944 metadata.clone(),
945 Arc::new(EmptyMemtableBuilder::default()),
946 0,
947 None,
948 ));
949 let version_builder = VersionBuilder::new(metadata, mutable)
950 .add_files(file_purger, files.values().cloned())
951 .build();
952 Arc::new(VersionControl::new(version_builder))
953 }
954
955 async fn mock_indexer_builder(
956 metadata: RegionMetadataRef,
957 env: &SchedulerEnv,
958 ) -> Arc<dyn IndexerBuilder + Send + Sync> {
959 let (dir, factory) = PuffinManagerFactory::new_for_test_async("mock_indexer_builder").await;
960 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
961 let puffin_manager = factory.build(
962 env.access_layer.object_store().clone(),
963 RegionFilePathFactory::new(
964 env.access_layer.table_dir().to_string(),
965 env.access_layer.path_type(),
966 ),
967 );
968 Arc::new(IndexerBuilderImpl {
969 build_type: IndexBuildType::Flush,
970 metadata,
971 row_group_size: 1024,
972 puffin_manager,
973 intermediate_manager: intm_manager,
974 index_options: IndexOptions::default(),
975 inverted_index_config: InvertedIndexConfig::default(),
976 fulltext_index_config: FulltextIndexConfig::default(),
977 bloom_filter_index_config: BloomFilterConfig::default(),
978 })
979 }
980
981 #[tokio::test]
982 async fn test_build_indexer_basic() {
983 let (dir, factory) =
984 PuffinManagerFactory::new_for_test_async("test_build_indexer_basic_").await;
985 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
986
987 let metadata = mock_region_metadata(MetaConfig {
988 with_inverted: true,
989 with_fulltext: true,
990 with_skipping_bloom: true,
991 });
992 let indexer = IndexerBuilderImpl {
993 build_type: IndexBuildType::Flush,
994 metadata,
995 row_group_size: 1024,
996 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
997 intermediate_manager: intm_manager,
998 index_options: IndexOptions::default(),
999 inverted_index_config: InvertedIndexConfig::default(),
1000 fulltext_index_config: FulltextIndexConfig::default(),
1001 bloom_filter_index_config: BloomFilterConfig::default(),
1002 }
1003 .build(FileId::random())
1004 .await;
1005
1006 assert!(indexer.inverted_indexer.is_some());
1007 assert!(indexer.fulltext_indexer.is_some());
1008 assert!(indexer.bloom_filter_indexer.is_some());
1009 }
1010
1011 #[tokio::test]
1012 async fn test_build_indexer_disable_create() {
1013 let (dir, factory) =
1014 PuffinManagerFactory::new_for_test_async("test_build_indexer_disable_create_").await;
1015 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1016
1017 let metadata = mock_region_metadata(MetaConfig {
1018 with_inverted: true,
1019 with_fulltext: true,
1020 with_skipping_bloom: true,
1021 });
1022 let indexer = IndexerBuilderImpl {
1023 build_type: IndexBuildType::Flush,
1024 metadata: metadata.clone(),
1025 row_group_size: 1024,
1026 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1027 intermediate_manager: intm_manager.clone(),
1028 index_options: IndexOptions::default(),
1029 inverted_index_config: InvertedIndexConfig {
1030 create_on_flush: Mode::Disable,
1031 ..Default::default()
1032 },
1033 fulltext_index_config: FulltextIndexConfig::default(),
1034 bloom_filter_index_config: BloomFilterConfig::default(),
1035 }
1036 .build(FileId::random())
1037 .await;
1038
1039 assert!(indexer.inverted_indexer.is_none());
1040 assert!(indexer.fulltext_indexer.is_some());
1041 assert!(indexer.bloom_filter_indexer.is_some());
1042
1043 let indexer = IndexerBuilderImpl {
1044 build_type: IndexBuildType::Compact,
1045 metadata: metadata.clone(),
1046 row_group_size: 1024,
1047 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1048 intermediate_manager: intm_manager.clone(),
1049 index_options: IndexOptions::default(),
1050 inverted_index_config: InvertedIndexConfig::default(),
1051 fulltext_index_config: FulltextIndexConfig {
1052 create_on_compaction: Mode::Disable,
1053 ..Default::default()
1054 },
1055 bloom_filter_index_config: BloomFilterConfig::default(),
1056 }
1057 .build(FileId::random())
1058 .await;
1059
1060 assert!(indexer.inverted_indexer.is_some());
1061 assert!(indexer.fulltext_indexer.is_none());
1062 assert!(indexer.bloom_filter_indexer.is_some());
1063
1064 let indexer = IndexerBuilderImpl {
1065 build_type: IndexBuildType::Compact,
1066 metadata,
1067 row_group_size: 1024,
1068 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1069 intermediate_manager: intm_manager,
1070 index_options: IndexOptions::default(),
1071 inverted_index_config: InvertedIndexConfig::default(),
1072 fulltext_index_config: FulltextIndexConfig::default(),
1073 bloom_filter_index_config: BloomFilterConfig {
1074 create_on_compaction: Mode::Disable,
1075 ..Default::default()
1076 },
1077 }
1078 .build(FileId::random())
1079 .await;
1080
1081 assert!(indexer.inverted_indexer.is_some());
1082 assert!(indexer.fulltext_indexer.is_some());
1083 assert!(indexer.bloom_filter_indexer.is_none());
1084 }
1085
1086 #[tokio::test]
1087 async fn test_build_indexer_no_required() {
1088 let (dir, factory) =
1089 PuffinManagerFactory::new_for_test_async("test_build_indexer_no_required_").await;
1090 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1091
1092 let metadata = mock_region_metadata(MetaConfig {
1093 with_inverted: false,
1094 with_fulltext: true,
1095 with_skipping_bloom: true,
1096 });
1097 let indexer = IndexerBuilderImpl {
1098 build_type: IndexBuildType::Flush,
1099 metadata: metadata.clone(),
1100 row_group_size: 1024,
1101 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1102 intermediate_manager: intm_manager.clone(),
1103 index_options: IndexOptions::default(),
1104 inverted_index_config: InvertedIndexConfig::default(),
1105 fulltext_index_config: FulltextIndexConfig::default(),
1106 bloom_filter_index_config: BloomFilterConfig::default(),
1107 }
1108 .build(FileId::random())
1109 .await;
1110
1111 assert!(indexer.inverted_indexer.is_none());
1112 assert!(indexer.fulltext_indexer.is_some());
1113 assert!(indexer.bloom_filter_indexer.is_some());
1114
1115 let metadata = mock_region_metadata(MetaConfig {
1116 with_inverted: true,
1117 with_fulltext: false,
1118 with_skipping_bloom: true,
1119 });
1120 let indexer = IndexerBuilderImpl {
1121 build_type: IndexBuildType::Flush,
1122 metadata: metadata.clone(),
1123 row_group_size: 1024,
1124 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1125 intermediate_manager: intm_manager.clone(),
1126 index_options: IndexOptions::default(),
1127 inverted_index_config: InvertedIndexConfig::default(),
1128 fulltext_index_config: FulltextIndexConfig::default(),
1129 bloom_filter_index_config: BloomFilterConfig::default(),
1130 }
1131 .build(FileId::random())
1132 .await;
1133
1134 assert!(indexer.inverted_indexer.is_some());
1135 assert!(indexer.fulltext_indexer.is_none());
1136 assert!(indexer.bloom_filter_indexer.is_some());
1137
1138 let metadata = mock_region_metadata(MetaConfig {
1139 with_inverted: true,
1140 with_fulltext: true,
1141 with_skipping_bloom: false,
1142 });
1143 let indexer = IndexerBuilderImpl {
1144 build_type: IndexBuildType::Flush,
1145 metadata: metadata.clone(),
1146 row_group_size: 1024,
1147 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1148 intermediate_manager: intm_manager,
1149 index_options: IndexOptions::default(),
1150 inverted_index_config: InvertedIndexConfig::default(),
1151 fulltext_index_config: FulltextIndexConfig::default(),
1152 bloom_filter_index_config: BloomFilterConfig::default(),
1153 }
1154 .build(FileId::random())
1155 .await;
1156
1157 assert!(indexer.inverted_indexer.is_some());
1158 assert!(indexer.fulltext_indexer.is_some());
1159 assert!(indexer.bloom_filter_indexer.is_none());
1160 }
1161
1162 #[tokio::test]
1163 async fn test_build_indexer_zero_row_group() {
1164 let (dir, factory) =
1165 PuffinManagerFactory::new_for_test_async("test_build_indexer_zero_row_group_").await;
1166 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1167
1168 let metadata = mock_region_metadata(MetaConfig {
1169 with_inverted: true,
1170 with_fulltext: true,
1171 with_skipping_bloom: true,
1172 });
1173 let indexer = IndexerBuilderImpl {
1174 build_type: IndexBuildType::Flush,
1175 metadata,
1176 row_group_size: 0,
1177 puffin_manager: factory.build(mock_object_store(), NoopPathProvider),
1178 intermediate_manager: intm_manager,
1179 index_options: IndexOptions::default(),
1180 inverted_index_config: InvertedIndexConfig::default(),
1181 fulltext_index_config: FulltextIndexConfig::default(),
1182 bloom_filter_index_config: BloomFilterConfig::default(),
1183 }
1184 .build(FileId::random())
1185 .await;
1186
1187 assert!(indexer.inverted_indexer.is_none());
1188 }
1189
1190 #[tokio::test]
1191 async fn test_index_build_task_sst_not_exist() {
1192 let env = SchedulerEnv::new().await;
1193 let (tx, _rx) = mpsc::channel(4);
1194 let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1195 let mut scheduler = env.mock_index_build_scheduler();
1196 let metadata = Arc::new(sst_region_metadata());
1197 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1198 let file_purger = Arc::new(NoopFilePurger {});
1199 let files = HashMap::new();
1200 let version_control =
1201 mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1202 let region_id = metadata.region_id;
1203 let indexer_builder = mock_indexer_builder(metadata, &env).await;
1204
1205 let task = IndexBuildTask {
1207 file_meta: FileMeta {
1208 region_id,
1209 file_id: FileId::random(),
1210 file_size: 100,
1211 ..Default::default()
1212 },
1213 reason: IndexBuildType::Flush,
1214 access_layer: env.access_layer.clone(),
1215 listener: WorkerListener::default(),
1216 manifest_ctx,
1217 write_cache: None,
1218 file_purger,
1219 indexer_builder,
1220 request_sender: tx,
1221 result_sender: result_tx,
1222 };
1223
1224 scheduler.schedule_build(&version_control, task).unwrap();
1226 match result_rx.recv().await.unwrap() {
1227 Ok(outcome) => {
1228 if outcome == IndexBuildOutcome::Finished {
1229 panic!("Expect aborted result due to missing SST file")
1230 }
1231 }
1232 _ => panic!("Expect aborted result due to missing SST file"),
1233 }
1234 }
1235
1236 #[tokio::test]
1237 async fn test_index_build_task_sst_exist() {
1238 let env = SchedulerEnv::new().await;
1239 let mut scheduler = env.mock_index_build_scheduler();
1240 let metadata = Arc::new(sst_region_metadata());
1241 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1242 let region_id = metadata.region_id;
1243 let file_purger = Arc::new(NoopFilePurger {});
1244 let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
1245 let file_meta = FileMeta {
1246 region_id,
1247 file_id: sst_info.file_id,
1248 file_size: sst_info.file_size,
1249 index_file_size: sst_info.index_metadata.file_size,
1250 num_rows: sst_info.num_rows as u64,
1251 num_row_groups: sst_info.num_row_groups,
1252 ..Default::default()
1253 };
1254 let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1255 let version_control =
1256 mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1257 let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1258
1259 let (tx, mut rx) = mpsc::channel(4);
1261 let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1262 let task = IndexBuildTask {
1263 file_meta: file_meta.clone(),
1264 reason: IndexBuildType::Flush,
1265 access_layer: env.access_layer.clone(),
1266 listener: WorkerListener::default(),
1267 manifest_ctx,
1268 write_cache: None,
1269 file_purger,
1270 indexer_builder,
1271 request_sender: tx,
1272 result_sender: result_tx,
1273 };
1274
1275 scheduler.schedule_build(&version_control, task).unwrap();
1276
1277 match result_rx.recv().await.unwrap() {
1279 Ok(outcome) => {
1280 assert_eq!(outcome, IndexBuildOutcome::Finished);
1281 }
1282 _ => panic!("Expect finished result"),
1283 }
1284
1285 let worker_req = rx.recv().await.unwrap().request;
1287 match worker_req {
1288 WorkerRequest::Background {
1289 region_id: req_region_id,
1290 notify: BackgroundNotify::IndexBuildFinished(finished),
1291 } => {
1292 assert_eq!(req_region_id, region_id);
1293 assert_eq!(finished.edit.files_to_add.len(), 1);
1294 let updated_meta = &finished.edit.files_to_add[0];
1295
1296 assert!(!updated_meta.available_indexes.is_empty());
1298 assert!(updated_meta.index_file_size > 0);
1299 assert_eq!(updated_meta.file_id, file_meta.file_id);
1300 }
1301 _ => panic!("Unexpected worker request: {:?}", worker_req),
1302 }
1303 }
1304
1305 async fn schedule_index_build_task_with_mode(build_mode: IndexBuildMode) {
1306 let env = SchedulerEnv::new().await;
1307 let mut scheduler = env.mock_index_build_scheduler();
1308 let metadata = Arc::new(sst_region_metadata());
1309 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1310 let file_purger = Arc::new(NoopFilePurger {});
1311 let region_id = metadata.region_id;
1312 let sst_info = mock_sst_file(metadata.clone(), &env, build_mode.clone()).await;
1313 let file_meta = FileMeta {
1314 region_id,
1315 file_id: sst_info.file_id,
1316 file_size: sst_info.file_size,
1317 index_file_size: sst_info.index_metadata.file_size,
1318 num_rows: sst_info.num_rows as u64,
1319 num_row_groups: sst_info.num_row_groups,
1320 ..Default::default()
1321 };
1322 let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1323 let version_control =
1324 mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1325 let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1326
1327 let (tx, _rx) = mpsc::channel(4);
1329 let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1330 let task = IndexBuildTask {
1331 file_meta: file_meta.clone(),
1332 reason: IndexBuildType::Flush,
1333 access_layer: env.access_layer.clone(),
1334 listener: WorkerListener::default(),
1335 manifest_ctx,
1336 write_cache: None,
1337 file_purger,
1338 indexer_builder,
1339 request_sender: tx,
1340 result_sender: result_tx,
1341 };
1342
1343 scheduler.schedule_build(&version_control, task).unwrap();
1344
1345 let puffin_path = location::index_file_path(
1346 env.access_layer.table_dir(),
1347 RegionFileId::new(region_id, file_meta.file_id),
1348 env.access_layer.path_type(),
1349 );
1350
1351 if build_mode == IndexBuildMode::Async {
1352 assert!(
1354 !env.access_layer
1355 .object_store()
1356 .exists(&puffin_path)
1357 .await
1358 .unwrap()
1359 );
1360 } else {
1361 assert!(
1363 env.access_layer
1364 .object_store()
1365 .exists(&puffin_path)
1366 .await
1367 .unwrap()
1368 );
1369 }
1370
1371 match result_rx.recv().await.unwrap() {
1373 Ok(outcome) => {
1374 assert_eq!(outcome, IndexBuildOutcome::Finished);
1375 }
1376 _ => panic!("Expect finished result"),
1377 }
1378
1379 assert!(
1381 env.access_layer
1382 .object_store()
1383 .exists(&puffin_path)
1384 .await
1385 .unwrap()
1386 );
1387 }
1388
1389 #[tokio::test]
1390 async fn test_index_build_task_build_mode() {
1391 schedule_index_build_task_with_mode(IndexBuildMode::Async).await;
1392 schedule_index_build_task_with_mode(IndexBuildMode::Sync).await;
1393 }
1394
1395 #[tokio::test]
1396 async fn test_index_build_task_no_index() {
1397 let env = SchedulerEnv::new().await;
1398 let mut scheduler = env.mock_index_build_scheduler();
1399 let mut metadata = sst_region_metadata();
1400 metadata.column_metadatas.iter_mut().for_each(|col| {
1402 col.column_schema.set_inverted_index(false);
1403 let _ = col.column_schema.unset_skipping_options();
1404 });
1405 let region_id = metadata.region_id;
1406 let metadata = Arc::new(metadata);
1407 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1408 let file_purger = Arc::new(NoopFilePurger {});
1409 let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
1410 let file_meta = FileMeta {
1411 region_id,
1412 file_id: sst_info.file_id,
1413 file_size: sst_info.file_size,
1414 index_file_size: sst_info.index_metadata.file_size,
1415 num_rows: sst_info.num_rows as u64,
1416 num_row_groups: sst_info.num_row_groups,
1417 ..Default::default()
1418 };
1419 let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1420 let version_control =
1421 mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1422 let indexer_builder = mock_indexer_builder(metadata.clone(), &env).await;
1423
1424 let (tx, mut rx) = mpsc::channel(4);
1426 let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1427 let task = IndexBuildTask {
1428 file_meta: file_meta.clone(),
1429 reason: IndexBuildType::Flush,
1430 access_layer: env.access_layer.clone(),
1431 listener: WorkerListener::default(),
1432 manifest_ctx,
1433 write_cache: None,
1434 file_purger,
1435 indexer_builder,
1436 request_sender: tx,
1437 result_sender: result_tx,
1438 };
1439
1440 scheduler.schedule_build(&version_control, task).unwrap();
1441
1442 match result_rx.recv().await.unwrap() {
1444 Ok(outcome) => {
1445 assert_eq!(outcome, IndexBuildOutcome::Finished);
1446 }
1447 _ => panic!("Expect finished result"),
1448 }
1449
1450 let _ = rx.recv().await.is_none();
1452 }
1453
1454 #[tokio::test]
1455 async fn test_index_build_task_with_write_cache() {
1456 let env = SchedulerEnv::new().await;
1457 let mut scheduler = env.mock_index_build_scheduler();
1458 let metadata = Arc::new(sst_region_metadata());
1459 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
1460 let file_purger = Arc::new(NoopFilePurger {});
1461 let region_id = metadata.region_id;
1462
1463 let (dir, factory) = PuffinManagerFactory::new_for_test_async("test_write_cache").await;
1464 let intm_manager = mock_intm_mgr(dir.path().to_string_lossy()).await;
1465
1466 let write_cache = Arc::new(
1468 WriteCache::new_fs(
1469 dir.path().to_str().unwrap(),
1470 ReadableSize::mb(10),
1471 None,
1472 factory,
1473 intm_manager,
1474 )
1475 .await
1476 .unwrap(),
1477 );
1478 let indexer_builder = Arc::new(IndexerBuilderImpl {
1480 build_type: IndexBuildType::Flush,
1481 metadata: metadata.clone(),
1482 row_group_size: 1024,
1483 puffin_manager: write_cache.build_puffin_manager().clone(),
1484 intermediate_manager: write_cache.intermediate_manager().clone(),
1485 index_options: IndexOptions::default(),
1486 inverted_index_config: InvertedIndexConfig::default(),
1487 fulltext_index_config: FulltextIndexConfig::default(),
1488 bloom_filter_index_config: BloomFilterConfig::default(),
1489 });
1490
1491 let sst_info = mock_sst_file(metadata.clone(), &env, IndexBuildMode::Async).await;
1492 let file_meta = FileMeta {
1493 region_id,
1494 file_id: sst_info.file_id,
1495 file_size: sst_info.file_size,
1496 index_file_size: sst_info.index_metadata.file_size,
1497 num_rows: sst_info.num_rows as u64,
1498 num_row_groups: sst_info.num_row_groups,
1499 ..Default::default()
1500 };
1501 let files = HashMap::from([(file_meta.file_id, file_meta.clone())]);
1502 let version_control =
1503 mock_version_control(metadata.clone(), file_purger.clone(), files).await;
1504
1505 let (tx, mut _rx) = mpsc::channel(4);
1507 let (result_tx, mut result_rx) = mpsc::channel::<Result<IndexBuildOutcome>>(4);
1508 let task = IndexBuildTask {
1509 file_meta: file_meta.clone(),
1510 reason: IndexBuildType::Flush,
1511 access_layer: env.access_layer.clone(),
1512 listener: WorkerListener::default(),
1513 manifest_ctx,
1514 write_cache: Some(write_cache.clone()),
1515 file_purger,
1516 indexer_builder,
1517 request_sender: tx,
1518 result_sender: result_tx,
1519 };
1520
1521 scheduler.schedule_build(&version_control, task).unwrap();
1522
1523 match result_rx.recv().await.unwrap() {
1525 Ok(outcome) => {
1526 assert_eq!(outcome, IndexBuildOutcome::Finished);
1527 }
1528 _ => panic!("Expect finished result"),
1529 }
1530
1531 let index_key = IndexKey::new(region_id, file_meta.file_id, FileType::Puffin);
1533 assert!(write_cache.file_cache().contains_key(&index_key));
1534 }
1535}