1use std::num::NonZero;
16use std::sync::Arc;
17use std::time::Duration;
18
19use common_base::cancellation::{CancellableFuture, CancellationHandle};
20use common_meta::key::SchemaMetadataManagerRef;
21use common_telemetry::{debug, info, warn};
22use common_time::TimeToLive;
23use either::Either;
24use itertools::Itertools;
25use object_store::manager::ObjectStoreManagerRef;
26use partition::expr::PartitionExpr;
27use serde::{Deserialize, Serialize};
28use snafu::{OptionExt, ResultExt};
29use store_api::metadata::RegionMetadataRef;
30use store_api::region_request::PathType;
31use store_api::storage::RegionId;
32
33use crate::access_layer::{
34 AccessLayer, AccessLayerRef, Metrics, OperationType, SstWriteRequest, WriteType,
35};
36use crate::cache::{CacheManager, CacheManagerRef};
37use crate::compaction::picker::PickerOutput;
38use crate::compaction::{CompactionOutput, CompactionSstReaderBuilder, find_dynamic_options};
39use crate::config::MitoConfig;
40use crate::error;
41use crate::error::{
42 EmptyRegionDirSnafu, InvalidPartitionExprSnafu, ObjectStoreNotFoundSnafu, Result,
43};
44use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
45use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
46use crate::region::options::RegionOptions;
47use crate::region::version::VersionRef;
48use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState};
49use crate::schedule::scheduler::LocalScheduler;
50use crate::sst::FormatType;
51use crate::sst::file::FileMeta;
52use crate::sst::file_purger::LocalFilePurger;
53use crate::sst::index::intermediate::IntermediateManager;
54use crate::sst::index::puffin_manager::PuffinManagerFactory;
55use crate::sst::location::region_dir_from_table_dir;
56use crate::sst::parquet::WriteOptions;
57use crate::sst::parquet::metadata::extract_primary_key_range;
58use crate::sst::version::{SstVersion, SstVersionRef};
59
60#[derive(Clone)]
62pub struct CompactionVersion {
63 pub(crate) metadata: RegionMetadataRef,
68 pub(crate) options: RegionOptions,
70 pub(crate) ssts: SstVersionRef,
72 pub(crate) compaction_time_window: Option<Duration>,
74}
75
76impl From<VersionRef> for CompactionVersion {
77 fn from(value: VersionRef) -> Self {
78 Self {
79 metadata: value.metadata.clone(),
80 options: value.options.clone(),
81 ssts: value.ssts.clone(),
82 compaction_time_window: value.compaction_time_window,
83 }
84 }
85}
86
87#[derive(Clone)]
90pub struct CompactionRegion {
91 pub region_id: RegionId,
92 pub region_options: RegionOptions,
93
94 pub(crate) engine_config: Arc<MitoConfig>,
95 pub(crate) region_metadata: RegionMetadataRef,
96 pub(crate) cache_manager: CacheManagerRef,
97 pub access_layer: AccessLayerRef,
99 pub(crate) manifest_ctx: Arc<ManifestContext>,
100 pub(crate) current_version: CompactionVersion,
101 pub(crate) file_purger: Option<Arc<LocalFilePurger>>,
102 pub(crate) ttl: Option<TimeToLive>,
103
104 pub max_parallelism: usize,
109}
110
111#[derive(Debug, Clone)]
113pub struct OpenCompactionRegionRequest {
114 pub region_id: RegionId,
115 pub table_dir: String,
116 pub path_type: PathType,
117 pub region_options: RegionOptions,
118 pub max_parallelism: usize,
119}
120
121pub async fn open_compaction_region(
124 req: &OpenCompactionRegionRequest,
125 mito_config: &MitoConfig,
126 object_store_manager: ObjectStoreManagerRef,
127 ttl_provider: Either<TimeToLive, SchemaMetadataManagerRef>,
128) -> Result<CompactionRegion> {
129 let object_store = {
130 let name = &req.region_options.storage;
131 if let Some(name) = name {
132 object_store_manager
133 .find(name)
134 .with_context(|| ObjectStoreNotFoundSnafu {
135 object_store: name.clone(),
136 })?
137 } else {
138 object_store_manager.default_object_store()
139 }
140 };
141
142 let access_layer = {
143 let puffin_manager_factory = PuffinManagerFactory::new(
144 &mito_config.index.aux_path,
145 mito_config.index.staging_size.as_bytes(),
146 Some(mito_config.index.write_buffer_size.as_bytes() as _),
147 mito_config.index.staging_ttl,
148 )
149 .await?;
150 let intermediate_manager =
151 IntermediateManager::init_fs(mito_config.index.aux_path.clone()).await?;
152
153 Arc::new(AccessLayer::new(
154 &req.table_dir,
155 req.path_type,
156 object_store.clone(),
157 puffin_manager_factory,
158 intermediate_manager,
159 ))
160 };
161
162 let manifest_manager = {
163 let region_dir = region_dir_from_table_dir(&req.table_dir, req.region_id, req.path_type);
164 let region_manifest_options =
165 RegionManifestOptions::new(mito_config, ®ion_dir, object_store);
166
167 RegionManifestManager::open(region_manifest_options, &Default::default())
168 .await?
169 .with_context(|| EmptyRegionDirSnafu {
170 region_id: req.region_id,
171 region_dir: region_dir_from_table_dir(&req.table_dir, req.region_id, req.path_type),
172 })?
173 };
174
175 let manifest = manifest_manager.manifest();
176 let region_metadata = manifest.metadata.clone();
177 let manifest_ctx = Arc::new(ManifestContext::new(
178 manifest_manager,
179 RegionRoleState::Leader(RegionLeaderState::Writable),
180 ));
181
182 let file_purger = {
183 let purge_scheduler = Arc::new(LocalScheduler::new(mito_config.max_background_purges));
184 Arc::new(LocalFilePurger::new(
185 purge_scheduler.clone(),
186 access_layer.clone(),
187 None,
188 ))
189 };
190
191 let current_version = {
192 let mut ssts = SstVersion::new();
193 ssts.add_files(file_purger.clone(), manifest.files.values().cloned());
194 CompactionVersion {
195 metadata: region_metadata.clone(),
196 options: req.region_options.clone(),
197 ssts: Arc::new(ssts),
198 compaction_time_window: manifest.compaction_time_window,
199 }
200 };
201
202 let ttl = match ttl_provider {
203 Either::Left(ttl) => ttl,
205 Either::Right(schema_metadata_manager) => {
207 let (_, ttl) = find_dynamic_options(
208 req.region_id.table_id(),
209 &req.region_options,
210 &schema_metadata_manager,
211 )
212 .await
213 .unwrap_or_else(|e| {
214 warn!(e; "Failed to get ttl for region: {}", region_metadata.region_id);
215 (
216 crate::region::options::CompactionOptions::default(),
217 TimeToLive::default(),
218 )
219 });
220 ttl
221 }
222 };
223
224 Ok(CompactionRegion {
225 region_id: req.region_id,
226 region_options: req.region_options.clone(),
227 engine_config: Arc::new(mito_config.clone()),
228 region_metadata: region_metadata.clone(),
229 cache_manager: Arc::new(CacheManager::default()),
230 access_layer,
231 manifest_ctx,
232 current_version,
233 file_purger: Some(file_purger),
234 ttl: Some(ttl),
235 max_parallelism: req.max_parallelism,
236 })
237}
238
239impl CompactionRegion {
240 pub fn file_purger(&self) -> Option<Arc<LocalFilePurger>> {
242 self.file_purger.clone()
243 }
244
245 pub async fn stop_purger_scheduler(&self) -> Result<()> {
247 if let Some(file_purger) = &self.file_purger {
248 file_purger.stop_scheduler().await
249 } else {
250 Ok(())
251 }
252 }
253}
254
255#[derive(Default, Clone, Debug, Serialize, Deserialize)]
257pub struct MergeOutput {
258 pub files_to_add: Vec<FileMeta>,
259 pub files_to_remove: Vec<FileMeta>,
260 pub compaction_time_window: Option<i64>,
261}
262
263impl MergeOutput {
264 pub fn is_empty(&self) -> bool {
265 self.files_to_add.is_empty() && self.files_to_remove.is_empty()
266 }
267
268 pub fn input_file_size(&self) -> u64 {
269 self.files_to_remove.iter().map(|f| f.file_size).sum()
270 }
271
272 pub fn output_file_size(&self) -> u64 {
273 self.files_to_add.iter().map(|f| f.file_size).sum()
274 }
275}
276
277#[async_trait::async_trait]
279pub trait Compactor: Send + Sync + 'static {
280 async fn merge_ssts(
282 &self,
283 compaction_region: &CompactionRegion,
284 picker_output: PickerOutput,
285 ) -> Result<MergeOutput>;
286
287 async fn update_manifest(
289 &self,
290 compaction_region: &CompactionRegion,
291 merge_output: MergeOutput,
292 ) -> Result<RegionEdit>;
293}
294
295#[async_trait::async_trait]
300pub trait SstMerger: Send + Sync + 'static {
301 async fn merge_single_output(
302 &self,
303 compaction_region: CompactionRegion,
304 output: CompactionOutput,
305 write_opts: WriteOptions,
306 ) -> Result<Vec<FileMeta>>;
307}
308
309#[derive(Clone)]
311pub struct DefaultSstMerger;
312
313#[async_trait::async_trait]
314impl SstMerger for DefaultSstMerger {
315 async fn merge_single_output(
316 &self,
317 compaction_region: CompactionRegion,
318 output: CompactionOutput,
319 write_opts: WriteOptions,
320 ) -> Result<Vec<FileMeta>> {
321 let region_id = compaction_region.region_id;
322 let storage = compaction_region.region_options.storage.clone();
323 let index_options = compaction_region
324 .current_version
325 .options
326 .index_options
327 .clone();
328 let append_mode = compaction_region.current_version.options.append_mode;
329 let merge_mode = compaction_region.current_version.options.merge_mode();
330 let flat_format = compaction_region
331 .region_options
332 .sst_format
333 .map(|format| format == FormatType::Flat)
334 .unwrap_or(compaction_region.engine_config.default_flat_format);
335
336 let index_config = compaction_region.engine_config.index.clone();
337 let inverted_index_config = compaction_region.engine_config.inverted_index.clone();
338 let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone();
339 let bloom_filter_index_config = compaction_region.engine_config.bloom_filter_index.clone();
340 #[cfg(feature = "vector_index")]
341 let vector_index_config = compaction_region.engine_config.vector_index.clone();
342
343 let input_file_names = output
344 .inputs
345 .iter()
346 .map(|f| f.file_id().to_string())
347 .join(",");
348 let max_sequence = output
349 .inputs
350 .iter()
351 .map(|f| f.meta_ref().sequence)
352 .max()
353 .flatten();
354 let builder = CompactionSstReaderBuilder {
355 metadata: compaction_region.region_metadata.clone(),
356 sst_layer: compaction_region.access_layer.clone(),
357 cache: compaction_region.cache_manager.clone(),
358 inputs: &output.inputs,
359 append_mode,
360 filter_deleted: output.filter_deleted,
361 time_range: output.output_time_range,
362 merge_mode,
363 };
364 let source = builder.build_flat_sst_reader().await?;
365 let mut metrics = Metrics::new(WriteType::Compaction);
366 let region_metadata = compaction_region.region_metadata.clone();
367 let sst_infos = compaction_region
368 .access_layer
369 .write_sst(
370 SstWriteRequest {
371 op_type: OperationType::Compact,
372 metadata: region_metadata.clone(),
373 source,
374 cache_manager: compaction_region.cache_manager.clone(),
375 storage,
376 max_sequence: max_sequence.map(NonZero::get),
377 sst_write_format: if flat_format {
378 FormatType::Flat
379 } else {
380 FormatType::PrimaryKey
381 },
382 index_options,
383 index_config,
384 inverted_index_config,
385 fulltext_index_config,
386 bloom_filter_index_config,
387 #[cfg(feature = "vector_index")]
388 vector_index_config,
389 },
390 &write_opts,
391 &mut metrics,
392 )
393 .await?;
394 let partition_expr = match ®ion_metadata.partition_expr {
396 None => None,
397 Some(json_str) if json_str.is_empty() => None,
398 Some(json_str) => PartitionExpr::from_json_str(json_str).with_context(|_| {
399 InvalidPartitionExprSnafu {
400 expr: json_str.clone(),
401 }
402 })?,
403 };
404
405 let output_files = sst_infos
406 .into_iter()
407 .map(|sst_info| {
408 let pk_range = sst_info
409 .file_metadata
410 .as_ref()
411 .and_then(|meta| extract_primary_key_range(meta, ®ion_metadata));
412 let (primary_key_min, primary_key_max) = match pk_range {
413 Some((min, max)) => (Some(min), Some(max)),
414 None => (None, None),
415 };
416
417 FileMeta {
418 region_id,
419 file_id: sst_info.file_id,
420 time_range: sst_info.time_range,
421 level: output.output_level,
422 file_size: sst_info.file_size,
423 max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
424 available_indexes: sst_info.index_metadata.build_available_indexes(),
425 indexes: sst_info.index_metadata.build_indexes(),
426 index_file_size: sst_info.index_metadata.file_size,
427 index_version: 0,
428 num_rows: sst_info.num_rows as u64,
429 num_row_groups: sst_info.num_row_groups,
430 sequence: max_sequence,
431 partition_expr: partition_expr.clone(),
432 num_series: sst_info.num_series,
433 primary_key_min,
434 primary_key_max,
435 }
436 })
437 .collect::<Vec<_>>();
438 let output_file_names = output_files.iter().map(|f| f.file_id.to_string()).join(",");
439 info!(
440 "Region {} compaction inputs: [{}], outputs: [{}], flat_format: {}, metrics: {:?}",
441 region_id, input_file_names, output_file_names, flat_format, metrics
442 );
443 metrics.observe();
444 Ok(output_files)
445 }
446}
447
448pub struct DefaultCompactor<M = DefaultSstMerger> {
453 merger: M,
454 cancel_handle: Arc<CancellationHandle>,
455}
456
457#[cfg(test)]
458impl<M: SstMerger> DefaultCompactor<M> {
459 pub fn with_merger(merger: M) -> Self {
460 Self {
461 merger,
462 cancel_handle: Arc::new(CancellationHandle::default()),
463 }
464 }
465}
466
467impl DefaultCompactor {
468 pub fn with_cancel_handle(cancel_handle: Arc<CancellationHandle>) -> Self {
469 Self {
470 merger: DefaultSstMerger,
471 cancel_handle,
472 }
473 }
474}
475
476#[async_trait::async_trait]
477impl<M: SstMerger> Compactor for DefaultCompactor<M>
478where
479 M: Clone,
480{
481 async fn merge_ssts(
482 &self,
483 compaction_region: &CompactionRegion,
484 mut picker_output: PickerOutput,
485 ) -> Result<MergeOutput> {
486 let internal_parallelism = compaction_region.max_parallelism.max(1);
487 let compaction_time_window = picker_output.time_window_size;
488 let region_id = compaction_region.region_id;
489
490 let mut tasks: Vec<(Vec<FileMeta>, _)> = Vec::with_capacity(picker_output.outputs.len());
493
494 for output in picker_output.outputs.drain(..) {
495 let inputs_to_remove: Vec<_> =
496 output.inputs.iter().map(|f| f.meta_ref().clone()).collect();
497 let write_opts = WriteOptions {
498 write_buffer_size: compaction_region.engine_config.sst_write_buffer_size,
499 max_file_size: picker_output.max_file_size,
500 ..Default::default()
501 };
502 let merger = self.merger.clone();
503 let compaction_region = compaction_region.clone();
504 let fut = async move {
505 merger
506 .merge_single_output(compaction_region, output, write_opts)
507 .await
508 };
509 tasks.push((inputs_to_remove, fut));
510 }
511
512 let mut output_files = Vec::with_capacity(tasks.len());
513 let mut compacted_inputs = Vec::with_capacity(
514 tasks.iter().map(|(inputs, _)| inputs.len()).sum::<usize>()
515 + picker_output.expired_ssts.len(),
516 );
517
518 while !tasks.is_empty() {
519 let mut chunk: Vec<(Vec<FileMeta>, _)> = Vec::with_capacity(internal_parallelism);
520 for _ in 0..internal_parallelism {
521 if let Some(task) = tasks.pop() {
522 chunk.push(task);
523 }
524 }
525 let mut spawned: Vec<_> = chunk
526 .into_iter()
527 .map(|(inputs, fut)| {
528 let handle = common_runtime::spawn_compact(fut);
529 (inputs, handle)
530 })
531 .collect();
532
533 while let Some((inputs, handle)) = spawned.pop() {
534 let abort_handle = handle.abort_handle();
535 match CancellableFuture::new(handle, self.cancel_handle.clone()).await {
536 Ok(Ok(Ok(files))) => {
537 output_files.extend(files);
538 compacted_inputs.extend(inputs);
539 }
540 Ok(Ok(Err(e))) => {
541 warn!(
542 e; "Failed to merge compaction output for region: {}, inputs: [{}]",
543 region_id,
544 inputs.iter().map(|f| f.file_id.to_string()).join(",")
545 );
546 }
547 Ok(Err(e)) => {
548 warn!(
549 "Region {} compaction task join error for inputs: [{}], skipping: {}",
550 region_id,
551 inputs.iter().map(|f| f.file_id.to_string()).join(","),
552 e
553 );
554 if self.cancel_handle.is_cancelled() {
557 abort_handle.abort();
558 for (_, handle) in spawned {
559 handle.abort();
560 }
561 }
562 return Err(e).context(error::JoinSnafu);
563 }
564 Err(_) => {
565 debug!(
566 "Compaction merge cancelled for region: {}, aborting remaining {} spawned tasks",
567 region_id,
568 spawned.len(),
569 );
570 abort_handle.abort();
571 for (_, handle) in spawned {
572 handle.abort();
573 }
574 break;
575 }
576 }
577 }
578
579 if self.cancel_handle.is_cancelled() {
580 info!("Compaction merge cancelled for region: {}", region_id);
581 break;
582 }
583 }
584
585 compacted_inputs.extend(
587 picker_output
588 .expired_ssts
589 .iter()
590 .map(|f| f.meta_ref().clone()),
591 );
592
593 Ok(MergeOutput {
594 files_to_add: output_files,
595 files_to_remove: compacted_inputs,
596 compaction_time_window: Some(compaction_time_window),
597 })
598 }
599
600 async fn update_manifest(
601 &self,
602 compaction_region: &CompactionRegion,
603 merge_output: MergeOutput,
604 ) -> Result<RegionEdit> {
605 let edit = RegionEdit {
607 files_to_add: merge_output.files_to_add,
608 files_to_remove: merge_output.files_to_remove,
609 timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
611 compaction_time_window: merge_output
612 .compaction_time_window
613 .map(|seconds| Duration::from_secs(seconds as u64)),
614 flushed_entry_id: None,
615 flushed_sequence: None,
616 committed_sequence: None,
617 };
618
619 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
620 compaction_region
622 .manifest_ctx
623 .update_manifest(RegionLeaderState::Writable, action_list, false)
624 .await?;
625
626 Ok(edit)
627 }
628}
629
630#[cfg(test)]
631mod tests {
632 use std::sync::atomic::{AtomicUsize, Ordering};
633 use std::sync::{Arc, Mutex};
634 use std::time::Duration;
635
636 use store_api::storage::{FileId, RegionId};
637 use tokio::time::sleep;
638
639 use super::{DefaultCompactor, *};
640 use crate::cache::CacheManager;
641 use crate::compaction::picker::PickerOutput;
642 use crate::error::Result;
643 use crate::sst::file::FileHandle;
644 use crate::sst::file_purger::NoopFilePurger;
645 use crate::sst::version::SstVersion;
646 use crate::test_util::memtable_util::metadata_for_test;
647 use crate::test_util::scheduler_util::SchedulerEnv;
648
649 fn dummy_file_meta() -> FileMeta {
650 FileMeta {
651 region_id: RegionId::new(1, 1),
652 file_id: FileId::random(),
653 file_size: 100,
654 ..Default::default()
655 }
656 }
657
658 fn new_file_handle(meta: FileMeta) -> FileHandle {
659 FileHandle::new(meta, Arc::new(NoopFilePurger))
660 }
661
662 async fn new_test_compaction_region() -> CompactionRegion {
665 let env = SchedulerEnv::new().await;
666 let metadata = metadata_for_test();
667 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
668 CompactionRegion {
669 region_id: RegionId::new(1, 1),
670 region_options: RegionOptions::default(),
671 engine_config: Arc::new(MitoConfig::default()),
672 region_metadata: metadata.clone(),
673 cache_manager: Arc::new(CacheManager::default()),
674 access_layer: env.access_layer.clone(),
675 manifest_ctx,
676 current_version: CompactionVersion {
677 metadata,
678 options: RegionOptions::default(),
679 ssts: Arc::new(SstVersion::new()),
680 compaction_time_window: None,
681 },
682 file_purger: None,
683 ttl: None,
684 max_parallelism: 1,
685 }
686 }
687
688 #[derive(Clone)]
692 struct MockMerger {
693 results: Arc<Mutex<Vec<Result<Vec<FileMeta>>>>>,
694 call_idx: Arc<AtomicUsize>,
695 }
696
697 impl MockMerger {
698 fn new(results: Vec<Result<Vec<FileMeta>>>) -> Self {
699 Self {
700 results: Arc::new(Mutex::new(results)),
701 call_idx: Arc::new(AtomicUsize::new(0)),
702 }
703 }
704 }
705
706 #[async_trait::async_trait]
707 impl SstMerger for MockMerger {
708 async fn merge_single_output(
709 &self,
710 _compaction_region: CompactionRegion,
711 _output: CompactionOutput,
712 _write_opts: WriteOptions,
713 ) -> Result<Vec<FileMeta>> {
714 let idx = self.call_idx.fetch_add(1, Ordering::SeqCst);
715 match self.results.lock().unwrap().get(idx) {
716 Some(Ok(files)) => Ok(files.clone()),
717 Some(Err(_)) => error::InvalidMetaSnafu {
718 reason: format!("simulated failure at index {idx}"),
719 }
720 .fail(),
721 None => panic!("MockMerger: no result configured for call index {idx}"),
722 }
723 }
724 }
725
726 #[tokio::test]
727 async fn test_partial_merge_failure_collects_only_successful_outputs() {
728 common_telemetry::init_default_ut_logging();
729
730 let compaction_region = new_test_compaction_region().await;
731
732 let input_meta_0 = dummy_file_meta();
734 let input_meta_1 = dummy_file_meta();
735 let input_meta_2 = dummy_file_meta();
736
737 let output_meta_0 = vec![dummy_file_meta()];
738 let output_meta_2 = vec![dummy_file_meta(), dummy_file_meta()];
739
740 let merger = MockMerger::new(vec![
741 Ok(output_meta_0.clone()),
742 Err(error::InvalidMetaSnafu {
743 reason: "boom".to_string(),
744 }
745 .build()),
746 Ok(output_meta_2.clone()),
747 ]);
748 let compactor = DefaultCompactor::with_merger(merger);
749
750 let picker_output = PickerOutput {
751 outputs: vec![
752 CompactionOutput {
753 output_level: 1,
754 inputs: vec![new_file_handle(input_meta_0.clone())],
755 filter_deleted: false,
756 output_time_range: None,
757 },
758 CompactionOutput {
759 output_level: 1,
760 inputs: vec![new_file_handle(input_meta_1.clone())],
761 filter_deleted: false,
762 output_time_range: None,
763 },
764 CompactionOutput {
765 output_level: 1,
766 inputs: vec![new_file_handle(input_meta_2.clone())],
767 filter_deleted: false,
768 output_time_range: None,
769 },
770 ],
771 expired_ssts: vec![],
772 time_window_size: 3600,
773 max_file_size: None,
774 };
775
776 let merge_output = compactor
777 .merge_ssts(&compaction_region, picker_output)
778 .await
779 .unwrap();
780
781 assert_eq!(merge_output.files_to_add.len(), 3);
783 assert_eq!(merge_output.files_to_remove.len(), 2);
785
786 let removed_ids: Vec<_> = merge_output
787 .files_to_remove
788 .iter()
789 .map(|f| f.file_id)
790 .collect();
791 assert!(removed_ids.contains(&input_meta_0.file_id));
792 assert!(removed_ids.contains(&input_meta_2.file_id));
793 assert!(!removed_ids.contains(&input_meta_1.file_id));
795 }
796
797 #[tokio::test]
798 async fn test_all_outputs_succeed() {
799 common_telemetry::init_default_ut_logging();
800
801 let compaction_region = new_test_compaction_region().await;
802 let input_meta = dummy_file_meta();
803 let output_meta = vec![dummy_file_meta()];
804
805 let merger = MockMerger::new(vec![Ok(output_meta.clone())]);
806 let compactor = DefaultCompactor::with_merger(merger);
807
808 let picker_output = PickerOutput {
809 outputs: vec![CompactionOutput {
810 output_level: 1,
811 inputs: vec![new_file_handle(input_meta.clone())],
812 filter_deleted: false,
813 output_time_range: None,
814 }],
815 expired_ssts: vec![],
816 time_window_size: 3600,
817 max_file_size: None,
818 };
819
820 let merge_output = compactor
821 .merge_ssts(&compaction_region, picker_output)
822 .await
823 .unwrap();
824
825 assert_eq!(merge_output.files_to_add.len(), 1);
826 assert_eq!(merge_output.files_to_add[0].file_id, output_meta[0].file_id);
827 assert_eq!(merge_output.files_to_remove.len(), 1);
828 assert_eq!(merge_output.files_to_remove[0].file_id, input_meta.file_id);
829 }
830
831 #[tokio::test]
832 async fn test_expired_ssts_always_removed() {
833 common_telemetry::init_default_ut_logging();
834
835 let compaction_region = new_test_compaction_region().await;
836 let input_meta = dummy_file_meta();
837 let expired_meta = dummy_file_meta();
838
839 let merger = MockMerger::new(vec![Err(error::InvalidMetaSnafu {
841 reason: "fail".to_string(),
842 }
843 .build())]);
844 let compactor = DefaultCompactor::with_merger(merger);
845
846 let picker_output = PickerOutput {
847 outputs: vec![CompactionOutput {
848 output_level: 1,
849 inputs: vec![new_file_handle(input_meta.clone())],
850 filter_deleted: false,
851 output_time_range: None,
852 }],
853 expired_ssts: vec![new_file_handle(expired_meta.clone())],
854 time_window_size: 3600,
855 max_file_size: None,
856 };
857
858 let merge_output = compactor
859 .merge_ssts(&compaction_region, picker_output)
860 .await
861 .unwrap();
862
863 assert!(merge_output.files_to_add.is_empty());
865 assert_eq!(merge_output.files_to_remove.len(), 1);
867 assert_eq!(
868 merge_output.files_to_remove[0].file_id,
869 expired_meta.file_id
870 );
871 }
872
873 #[derive(Clone)]
874 struct BlockingMerger {
875 call_idx: Arc<AtomicUsize>,
876 }
877
878 #[async_trait::async_trait]
879 impl SstMerger for BlockingMerger {
880 async fn merge_single_output(
881 &self,
882 _compaction_region: CompactionRegion,
883 _output: CompactionOutput,
884 _write_opts: WriteOptions,
885 ) -> Result<Vec<FileMeta>> {
886 self.call_idx.fetch_add(1, Ordering::SeqCst);
887 std::future::pending().await
888 }
889 }
890
891 #[tokio::test(flavor = "multi_thread")]
892 async fn test_merge_ssts_cancels_spawned_tasks() {
893 common_telemetry::init_default_ut_logging();
894
895 let mut compaction_region = new_test_compaction_region().await;
896 compaction_region.max_parallelism = 2;
897
898 let cancel_handle = Arc::new(CancellationHandle::default());
899 let call_idx = Arc::new(AtomicUsize::new(0));
900 let compactor = DefaultCompactor {
901 merger: BlockingMerger {
902 call_idx: call_idx.clone(),
903 },
904 cancel_handle: cancel_handle.clone(),
905 };
906
907 let picker_output = PickerOutput {
908 outputs: vec![
909 CompactionOutput {
910 output_level: 1,
911 inputs: vec![new_file_handle(dummy_file_meta())],
912 filter_deleted: false,
913 output_time_range: None,
914 },
915 CompactionOutput {
916 output_level: 1,
917 inputs: vec![new_file_handle(dummy_file_meta())],
918 filter_deleted: false,
919 output_time_range: None,
920 },
921 CompactionOutput {
922 output_level: 1,
923 inputs: vec![new_file_handle(dummy_file_meta())],
924 filter_deleted: false,
925 output_time_range: None,
926 },
927 ],
928 expired_ssts: vec![],
929 time_window_size: 3600,
930 max_file_size: None,
931 };
932
933 let task = tokio::spawn(async move {
934 compactor
935 .merge_ssts(&compaction_region, picker_output)
936 .await
937 });
938
939 sleep(Duration::from_millis(100)).await;
940 cancel_handle.cancel();
941
942 let merge_output = task
943 .await
944 .expect("merge_ssts should stop after cancellation")
945 .unwrap();
946
947 let started = call_idx.load(Ordering::SeqCst);
948
949 assert!(merge_output.files_to_add.is_empty());
950 assert!(merge_output.files_to_remove.is_empty());
951 assert_eq!(started, 2);
952 }
953}