1use std::num::NonZero;
16use std::sync::Arc;
17use std::time::Duration;
18
19use common_base::cancellation::{CancellableFuture, CancellationHandle};
20use common_meta::key::SchemaMetadataManagerRef;
21use common_telemetry::{debug, info, warn};
22use common_time::TimeToLive;
23use either::Either;
24use itertools::Itertools;
25use object_store::manager::ObjectStoreManagerRef;
26use partition::expr::PartitionExpr;
27use serde::{Deserialize, Serialize};
28use snafu::{OptionExt, ResultExt};
29use store_api::metadata::RegionMetadataRef;
30use store_api::region_request::PathType;
31use store_api::storage::RegionId;
32
33use crate::access_layer::{
34 AccessLayer, AccessLayerRef, Metrics, OperationType, SstWriteRequest, WriteType,
35};
36use crate::cache::{CacheManager, CacheManagerRef};
37use crate::compaction::picker::PickerOutput;
38use crate::compaction::{CompactionOutput, CompactionSstReaderBuilder, find_dynamic_options};
39use crate::config::MitoConfig;
40use crate::error;
41use crate::error::{
42 EmptyRegionDirSnafu, InvalidPartitionExprSnafu, ObjectStoreNotFoundSnafu, Result,
43};
44use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
45use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
46use crate::read::FlatSource;
47use crate::region::options::RegionOptions;
48use crate::region::version::VersionRef;
49use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState};
50use crate::schedule::scheduler::LocalScheduler;
51use crate::sst::FormatType;
52use crate::sst::file::FileMeta;
53use crate::sst::file_purger::LocalFilePurger;
54use crate::sst::index::intermediate::IntermediateManager;
55use crate::sst::index::puffin_manager::PuffinManagerFactory;
56use crate::sst::location::region_dir_from_table_dir;
57use crate::sst::parquet::WriteOptions;
58use crate::sst::version::{SstVersion, SstVersionRef};
59
60#[derive(Clone)]
62pub struct CompactionVersion {
63 pub(crate) metadata: RegionMetadataRef,
68 pub(crate) options: RegionOptions,
70 pub(crate) ssts: SstVersionRef,
72 pub(crate) compaction_time_window: Option<Duration>,
74}
75
76impl From<VersionRef> for CompactionVersion {
77 fn from(value: VersionRef) -> Self {
78 Self {
79 metadata: value.metadata.clone(),
80 options: value.options.clone(),
81 ssts: value.ssts.clone(),
82 compaction_time_window: value.compaction_time_window,
83 }
84 }
85}
86
87#[derive(Clone)]
90pub struct CompactionRegion {
91 pub region_id: RegionId,
92 pub region_options: RegionOptions,
93
94 pub(crate) engine_config: Arc<MitoConfig>,
95 pub(crate) region_metadata: RegionMetadataRef,
96 pub(crate) cache_manager: CacheManagerRef,
97 pub access_layer: AccessLayerRef,
99 pub(crate) manifest_ctx: Arc<ManifestContext>,
100 pub(crate) current_version: CompactionVersion,
101 pub(crate) file_purger: Option<Arc<LocalFilePurger>>,
102 pub(crate) ttl: Option<TimeToLive>,
103
104 pub max_parallelism: usize,
109}
110
111#[derive(Debug, Clone)]
113pub struct OpenCompactionRegionRequest {
114 pub region_id: RegionId,
115 pub table_dir: String,
116 pub path_type: PathType,
117 pub region_options: RegionOptions,
118 pub max_parallelism: usize,
119}
120
121pub async fn open_compaction_region(
124 req: &OpenCompactionRegionRequest,
125 mito_config: &MitoConfig,
126 object_store_manager: ObjectStoreManagerRef,
127 ttl_provider: Either<TimeToLive, SchemaMetadataManagerRef>,
128) -> Result<CompactionRegion> {
129 let object_store = {
130 let name = &req.region_options.storage;
131 if let Some(name) = name {
132 object_store_manager
133 .find(name)
134 .with_context(|| ObjectStoreNotFoundSnafu {
135 object_store: name.clone(),
136 })?
137 } else {
138 object_store_manager.default_object_store()
139 }
140 };
141
142 let access_layer = {
143 let puffin_manager_factory = PuffinManagerFactory::new(
144 &mito_config.index.aux_path,
145 mito_config.index.staging_size.as_bytes(),
146 Some(mito_config.index.write_buffer_size.as_bytes() as _),
147 mito_config.index.staging_ttl,
148 )
149 .await?;
150 let intermediate_manager =
151 IntermediateManager::init_fs(mito_config.index.aux_path.clone()).await?;
152
153 Arc::new(AccessLayer::new(
154 &req.table_dir,
155 req.path_type,
156 object_store.clone(),
157 puffin_manager_factory,
158 intermediate_manager,
159 ))
160 };
161
162 let manifest_manager = {
163 let region_dir = region_dir_from_table_dir(&req.table_dir, req.region_id, req.path_type);
164 let region_manifest_options =
165 RegionManifestOptions::new(mito_config, ®ion_dir, object_store);
166
167 RegionManifestManager::open(region_manifest_options, &Default::default())
168 .await?
169 .with_context(|| EmptyRegionDirSnafu {
170 region_id: req.region_id,
171 region_dir: region_dir_from_table_dir(&req.table_dir, req.region_id, req.path_type),
172 })?
173 };
174
175 let manifest = manifest_manager.manifest();
176 let region_metadata = manifest.metadata.clone();
177 let manifest_ctx = Arc::new(ManifestContext::new(
178 manifest_manager,
179 RegionRoleState::Leader(RegionLeaderState::Writable),
180 ));
181
182 let file_purger = {
183 let purge_scheduler = Arc::new(LocalScheduler::new(mito_config.max_background_purges));
184 Arc::new(LocalFilePurger::new(
185 purge_scheduler.clone(),
186 access_layer.clone(),
187 None,
188 ))
189 };
190
191 let current_version = {
192 let mut ssts = SstVersion::new();
193 ssts.add_files(file_purger.clone(), manifest.files.values().cloned());
194 CompactionVersion {
195 metadata: region_metadata.clone(),
196 options: req.region_options.clone(),
197 ssts: Arc::new(ssts),
198 compaction_time_window: manifest.compaction_time_window,
199 }
200 };
201
202 let ttl = match ttl_provider {
203 Either::Left(ttl) => ttl,
205 Either::Right(schema_metadata_manager) => {
207 let (_, ttl) = find_dynamic_options(
208 req.region_id.table_id(),
209 &req.region_options,
210 &schema_metadata_manager,
211 )
212 .await
213 .unwrap_or_else(|e| {
214 warn!(e; "Failed to get ttl for region: {}", region_metadata.region_id);
215 (
216 crate::region::options::CompactionOptions::default(),
217 TimeToLive::default(),
218 )
219 });
220 ttl
221 }
222 };
223
224 Ok(CompactionRegion {
225 region_id: req.region_id,
226 region_options: req.region_options.clone(),
227 engine_config: Arc::new(mito_config.clone()),
228 region_metadata: region_metadata.clone(),
229 cache_manager: Arc::new(CacheManager::default()),
230 access_layer,
231 manifest_ctx,
232 current_version,
233 file_purger: Some(file_purger),
234 ttl: Some(ttl),
235 max_parallelism: req.max_parallelism,
236 })
237}
238
239impl CompactionRegion {
240 pub fn file_purger(&self) -> Option<Arc<LocalFilePurger>> {
242 self.file_purger.clone()
243 }
244
245 pub async fn stop_purger_scheduler(&self) -> Result<()> {
247 if let Some(file_purger) = &self.file_purger {
248 file_purger.stop_scheduler().await
249 } else {
250 Ok(())
251 }
252 }
253}
254
255#[derive(Default, Clone, Debug, Serialize, Deserialize)]
257pub struct MergeOutput {
258 pub files_to_add: Vec<FileMeta>,
259 pub files_to_remove: Vec<FileMeta>,
260 pub compaction_time_window: Option<i64>,
261}
262
263impl MergeOutput {
264 pub fn is_empty(&self) -> bool {
265 self.files_to_add.is_empty() && self.files_to_remove.is_empty()
266 }
267
268 pub fn input_file_size(&self) -> u64 {
269 self.files_to_remove.iter().map(|f| f.file_size).sum()
270 }
271
272 pub fn output_file_size(&self) -> u64 {
273 self.files_to_add.iter().map(|f| f.file_size).sum()
274 }
275}
276
277#[async_trait::async_trait]
279pub trait Compactor: Send + Sync + 'static {
280 async fn merge_ssts(
282 &self,
283 compaction_region: &CompactionRegion,
284 picker_output: PickerOutput,
285 ) -> Result<MergeOutput>;
286
287 async fn update_manifest(
289 &self,
290 compaction_region: &CompactionRegion,
291 merge_output: MergeOutput,
292 ) -> Result<RegionEdit>;
293}
294
295#[async_trait::async_trait]
300pub trait SstMerger: Send + Sync + 'static {
301 async fn merge_single_output(
302 &self,
303 compaction_region: CompactionRegion,
304 output: CompactionOutput,
305 write_opts: WriteOptions,
306 ) -> Result<Vec<FileMeta>>;
307}
308
309#[derive(Clone)]
311pub struct DefaultSstMerger;
312
313#[async_trait::async_trait]
314impl SstMerger for DefaultSstMerger {
315 async fn merge_single_output(
316 &self,
317 compaction_region: CompactionRegion,
318 output: CompactionOutput,
319 write_opts: WriteOptions,
320 ) -> Result<Vec<FileMeta>> {
321 let region_id = compaction_region.region_id;
322 let storage = compaction_region.region_options.storage.clone();
323 let index_options = compaction_region
324 .current_version
325 .options
326 .index_options
327 .clone();
328 let append_mode = compaction_region.current_version.options.append_mode;
329 let merge_mode = compaction_region.current_version.options.merge_mode();
330 let flat_format = compaction_region
331 .region_options
332 .sst_format
333 .map(|format| format == FormatType::Flat)
334 .unwrap_or(compaction_region.engine_config.default_flat_format);
335
336 let index_config = compaction_region.engine_config.index.clone();
337 let inverted_index_config = compaction_region.engine_config.inverted_index.clone();
338 let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone();
339 let bloom_filter_index_config = compaction_region.engine_config.bloom_filter_index.clone();
340 #[cfg(feature = "vector_index")]
341 let vector_index_config = compaction_region.engine_config.vector_index.clone();
342
343 let input_file_names = output
344 .inputs
345 .iter()
346 .map(|f| f.file_id().to_string())
347 .join(",");
348 let max_sequence = output
349 .inputs
350 .iter()
351 .map(|f| f.meta_ref().sequence)
352 .max()
353 .flatten();
354 let builder = CompactionSstReaderBuilder {
355 metadata: compaction_region.region_metadata.clone(),
356 sst_layer: compaction_region.access_layer.clone(),
357 cache: compaction_region.cache_manager.clone(),
358 inputs: &output.inputs,
359 append_mode,
360 filter_deleted: output.filter_deleted,
361 time_range: output.output_time_range,
362 merge_mode,
363 };
364 let reader = builder.build_flat_sst_reader().await?;
365 let source = FlatSource::Stream(reader);
366 let mut metrics = Metrics::new(WriteType::Compaction);
367 let region_metadata = compaction_region.region_metadata.clone();
368 let sst_infos = compaction_region
369 .access_layer
370 .write_sst(
371 SstWriteRequest {
372 op_type: OperationType::Compact,
373 metadata: region_metadata.clone(),
374 source,
375 cache_manager: compaction_region.cache_manager.clone(),
376 storage,
377 max_sequence: max_sequence.map(NonZero::get),
378 sst_write_format: if flat_format {
379 FormatType::Flat
380 } else {
381 FormatType::PrimaryKey
382 },
383 index_options,
384 index_config,
385 inverted_index_config,
386 fulltext_index_config,
387 bloom_filter_index_config,
388 #[cfg(feature = "vector_index")]
389 vector_index_config,
390 },
391 &write_opts,
392 &mut metrics,
393 )
394 .await?;
395 let partition_expr = match ®ion_metadata.partition_expr {
397 None => None,
398 Some(json_str) if json_str.is_empty() => None,
399 Some(json_str) => PartitionExpr::from_json_str(json_str).with_context(|_| {
400 InvalidPartitionExprSnafu {
401 expr: json_str.clone(),
402 }
403 })?,
404 };
405
406 let output_files = sst_infos
407 .into_iter()
408 .map(|sst_info| FileMeta {
409 region_id,
410 file_id: sst_info.file_id,
411 time_range: sst_info.time_range,
412 level: output.output_level,
413 file_size: sst_info.file_size,
414 max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
415 available_indexes: sst_info.index_metadata.build_available_indexes(),
416 indexes: sst_info.index_metadata.build_indexes(),
417 index_file_size: sst_info.index_metadata.file_size,
418 index_version: 0,
419 num_rows: sst_info.num_rows as u64,
420 num_row_groups: sst_info.num_row_groups,
421 sequence: max_sequence,
422 partition_expr: partition_expr.clone(),
423 num_series: sst_info.num_series,
424 })
425 .collect::<Vec<_>>();
426 let output_file_names = output_files.iter().map(|f| f.file_id.to_string()).join(",");
427 info!(
428 "Region {} compaction inputs: [{}], outputs: [{}], flat_format: {}, metrics: {:?}",
429 region_id, input_file_names, output_file_names, flat_format, metrics
430 );
431 metrics.observe();
432 Ok(output_files)
433 }
434}
435
436pub struct DefaultCompactor<M = DefaultSstMerger> {
441 merger: M,
442 cancel_handle: Arc<CancellationHandle>,
443}
444
445#[cfg(test)]
446impl<M: SstMerger> DefaultCompactor<M> {
447 pub fn with_merger(merger: M) -> Self {
448 Self {
449 merger,
450 cancel_handle: Arc::new(CancellationHandle::default()),
451 }
452 }
453}
454
455impl DefaultCompactor {
456 pub fn with_cancel_handle(cancel_handle: Arc<CancellationHandle>) -> Self {
457 Self {
458 merger: DefaultSstMerger,
459 cancel_handle,
460 }
461 }
462}
463
464#[async_trait::async_trait]
465impl<M: SstMerger> Compactor for DefaultCompactor<M>
466where
467 M: Clone,
468{
469 async fn merge_ssts(
470 &self,
471 compaction_region: &CompactionRegion,
472 mut picker_output: PickerOutput,
473 ) -> Result<MergeOutput> {
474 let internal_parallelism = compaction_region.max_parallelism.max(1);
475 let compaction_time_window = picker_output.time_window_size;
476 let region_id = compaction_region.region_id;
477
478 let mut tasks: Vec<(Vec<FileMeta>, _)> = Vec::with_capacity(picker_output.outputs.len());
481
482 for output in picker_output.outputs.drain(..) {
483 let inputs_to_remove: Vec<_> =
484 output.inputs.iter().map(|f| f.meta_ref().clone()).collect();
485 let write_opts = WriteOptions {
486 write_buffer_size: compaction_region.engine_config.sst_write_buffer_size,
487 max_file_size: picker_output.max_file_size,
488 ..Default::default()
489 };
490 let merger = self.merger.clone();
491 let compaction_region = compaction_region.clone();
492 let fut = async move {
493 merger
494 .merge_single_output(compaction_region, output, write_opts)
495 .await
496 };
497 tasks.push((inputs_to_remove, fut));
498 }
499
500 let mut output_files = Vec::with_capacity(tasks.len());
501 let mut compacted_inputs = Vec::with_capacity(
502 tasks.iter().map(|(inputs, _)| inputs.len()).sum::<usize>()
503 + picker_output.expired_ssts.len(),
504 );
505
506 while !tasks.is_empty() {
507 let mut chunk: Vec<(Vec<FileMeta>, _)> = Vec::with_capacity(internal_parallelism);
508 for _ in 0..internal_parallelism {
509 if let Some(task) = tasks.pop() {
510 chunk.push(task);
511 }
512 }
513 let mut spawned: Vec<_> = chunk
514 .into_iter()
515 .map(|(inputs, fut)| {
516 let handle = common_runtime::spawn_compact(fut);
517 (inputs, handle)
518 })
519 .collect();
520
521 while let Some((inputs, handle)) = spawned.pop() {
522 let abort_handle = handle.abort_handle();
523 match CancellableFuture::new(handle, self.cancel_handle.clone()).await {
524 Ok(Ok(Ok(files))) => {
525 output_files.extend(files);
526 compacted_inputs.extend(inputs);
527 }
528 Ok(Ok(Err(e))) => {
529 warn!(
530 e; "Failed to merge compaction output for region: {}, inputs: [{}]",
531 region_id,
532 inputs.iter().map(|f| f.file_id.to_string()).join(",")
533 );
534 }
535 Ok(Err(e)) => {
536 warn!(
537 "Region {} compaction task join error for inputs: [{}], skipping: {}",
538 region_id,
539 inputs.iter().map(|f| f.file_id.to_string()).join(","),
540 e
541 );
542 if self.cancel_handle.is_cancelled() {
545 abort_handle.abort();
546 for (_, handle) in spawned {
547 handle.abort();
548 }
549 }
550 return Err(e).context(error::JoinSnafu);
551 }
552 Err(_) => {
553 debug!(
554 "Compaction merge cancelled for region: {}, aborting remaining {} spawned tasks",
555 region_id,
556 spawned.len(),
557 );
558 abort_handle.abort();
559 for (_, handle) in spawned {
560 handle.abort();
561 }
562 break;
563 }
564 }
565 }
566
567 if self.cancel_handle.is_cancelled() {
568 info!("Compaction merge cancelled for region: {}", region_id);
569 break;
570 }
571 }
572
573 compacted_inputs.extend(
575 picker_output
576 .expired_ssts
577 .iter()
578 .map(|f| f.meta_ref().clone()),
579 );
580
581 Ok(MergeOutput {
582 files_to_add: output_files,
583 files_to_remove: compacted_inputs,
584 compaction_time_window: Some(compaction_time_window),
585 })
586 }
587
588 async fn update_manifest(
589 &self,
590 compaction_region: &CompactionRegion,
591 merge_output: MergeOutput,
592 ) -> Result<RegionEdit> {
593 let edit = RegionEdit {
595 files_to_add: merge_output.files_to_add,
596 files_to_remove: merge_output.files_to_remove,
597 timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
599 compaction_time_window: merge_output
600 .compaction_time_window
601 .map(|seconds| Duration::from_secs(seconds as u64)),
602 flushed_entry_id: None,
603 flushed_sequence: None,
604 committed_sequence: None,
605 };
606
607 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
608 compaction_region
610 .manifest_ctx
611 .update_manifest(RegionLeaderState::Writable, action_list, false)
612 .await?;
613
614 Ok(edit)
615 }
616}
617
618#[cfg(test)]
619mod tests {
620 use std::sync::atomic::{AtomicUsize, Ordering};
621 use std::sync::{Arc, Mutex};
622 use std::time::Duration;
623
624 use store_api::storage::{FileId, RegionId};
625 use tokio::time::sleep;
626
627 use super::{DefaultCompactor, *};
628 use crate::cache::CacheManager;
629 use crate::compaction::picker::PickerOutput;
630 use crate::error::Result;
631 use crate::sst::file::FileHandle;
632 use crate::sst::file_purger::NoopFilePurger;
633 use crate::sst::version::SstVersion;
634 use crate::test_util::memtable_util::metadata_for_test;
635 use crate::test_util::scheduler_util::SchedulerEnv;
636
637 fn dummy_file_meta() -> FileMeta {
638 FileMeta {
639 region_id: RegionId::new(1, 1),
640 file_id: FileId::random(),
641 file_size: 100,
642 ..Default::default()
643 }
644 }
645
646 fn new_file_handle(meta: FileMeta) -> FileHandle {
647 FileHandle::new(meta, Arc::new(NoopFilePurger))
648 }
649
650 async fn new_test_compaction_region() -> CompactionRegion {
653 let env = SchedulerEnv::new().await;
654 let metadata = metadata_for_test();
655 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
656 CompactionRegion {
657 region_id: RegionId::new(1, 1),
658 region_options: RegionOptions::default(),
659 engine_config: Arc::new(MitoConfig::default()),
660 region_metadata: metadata.clone(),
661 cache_manager: Arc::new(CacheManager::default()),
662 access_layer: env.access_layer.clone(),
663 manifest_ctx,
664 current_version: CompactionVersion {
665 metadata,
666 options: RegionOptions::default(),
667 ssts: Arc::new(SstVersion::new()),
668 compaction_time_window: None,
669 },
670 file_purger: None,
671 ttl: None,
672 max_parallelism: 1,
673 }
674 }
675
676 #[derive(Clone)]
680 struct MockMerger {
681 results: Arc<Mutex<Vec<Result<Vec<FileMeta>>>>>,
682 call_idx: Arc<AtomicUsize>,
683 }
684
685 impl MockMerger {
686 fn new(results: Vec<Result<Vec<FileMeta>>>) -> Self {
687 Self {
688 results: Arc::new(Mutex::new(results)),
689 call_idx: Arc::new(AtomicUsize::new(0)),
690 }
691 }
692 }
693
694 #[async_trait::async_trait]
695 impl SstMerger for MockMerger {
696 async fn merge_single_output(
697 &self,
698 _compaction_region: CompactionRegion,
699 _output: CompactionOutput,
700 _write_opts: WriteOptions,
701 ) -> Result<Vec<FileMeta>> {
702 let idx = self.call_idx.fetch_add(1, Ordering::SeqCst);
703 match self.results.lock().unwrap().get(idx) {
704 Some(Ok(files)) => Ok(files.clone()),
705 Some(Err(_)) => error::InvalidMetaSnafu {
706 reason: format!("simulated failure at index {idx}"),
707 }
708 .fail(),
709 None => panic!("MockMerger: no result configured for call index {idx}"),
710 }
711 }
712 }
713
714 #[tokio::test]
715 async fn test_partial_merge_failure_collects_only_successful_outputs() {
716 common_telemetry::init_default_ut_logging();
717
718 let compaction_region = new_test_compaction_region().await;
719
720 let input_meta_0 = dummy_file_meta();
722 let input_meta_1 = dummy_file_meta();
723 let input_meta_2 = dummy_file_meta();
724
725 let output_meta_0 = vec![dummy_file_meta()];
726 let output_meta_2 = vec![dummy_file_meta(), dummy_file_meta()];
727
728 let merger = MockMerger::new(vec![
729 Ok(output_meta_0.clone()),
730 Err(error::InvalidMetaSnafu {
731 reason: "boom".to_string(),
732 }
733 .build()),
734 Ok(output_meta_2.clone()),
735 ]);
736 let compactor = DefaultCompactor::with_merger(merger);
737
738 let picker_output = PickerOutput {
739 outputs: vec![
740 CompactionOutput {
741 output_level: 1,
742 inputs: vec![new_file_handle(input_meta_0.clone())],
743 filter_deleted: false,
744 output_time_range: None,
745 },
746 CompactionOutput {
747 output_level: 1,
748 inputs: vec![new_file_handle(input_meta_1.clone())],
749 filter_deleted: false,
750 output_time_range: None,
751 },
752 CompactionOutput {
753 output_level: 1,
754 inputs: vec![new_file_handle(input_meta_2.clone())],
755 filter_deleted: false,
756 output_time_range: None,
757 },
758 ],
759 expired_ssts: vec![],
760 time_window_size: 3600,
761 max_file_size: None,
762 };
763
764 let merge_output = compactor
765 .merge_ssts(&compaction_region, picker_output)
766 .await
767 .unwrap();
768
769 assert_eq!(merge_output.files_to_add.len(), 3);
771 assert_eq!(merge_output.files_to_remove.len(), 2);
773
774 let removed_ids: Vec<_> = merge_output
775 .files_to_remove
776 .iter()
777 .map(|f| f.file_id)
778 .collect();
779 assert!(removed_ids.contains(&input_meta_0.file_id));
780 assert!(removed_ids.contains(&input_meta_2.file_id));
781 assert!(!removed_ids.contains(&input_meta_1.file_id));
783 }
784
785 #[tokio::test]
786 async fn test_all_outputs_succeed() {
787 common_telemetry::init_default_ut_logging();
788
789 let compaction_region = new_test_compaction_region().await;
790 let input_meta = dummy_file_meta();
791 let output_meta = vec![dummy_file_meta()];
792
793 let merger = MockMerger::new(vec![Ok(output_meta.clone())]);
794 let compactor = DefaultCompactor::with_merger(merger);
795
796 let picker_output = PickerOutput {
797 outputs: vec![CompactionOutput {
798 output_level: 1,
799 inputs: vec![new_file_handle(input_meta.clone())],
800 filter_deleted: false,
801 output_time_range: None,
802 }],
803 expired_ssts: vec![],
804 time_window_size: 3600,
805 max_file_size: None,
806 };
807
808 let merge_output = compactor
809 .merge_ssts(&compaction_region, picker_output)
810 .await
811 .unwrap();
812
813 assert_eq!(merge_output.files_to_add.len(), 1);
814 assert_eq!(merge_output.files_to_add[0].file_id, output_meta[0].file_id);
815 assert_eq!(merge_output.files_to_remove.len(), 1);
816 assert_eq!(merge_output.files_to_remove[0].file_id, input_meta.file_id);
817 }
818
819 #[tokio::test]
820 async fn test_expired_ssts_always_removed() {
821 common_telemetry::init_default_ut_logging();
822
823 let compaction_region = new_test_compaction_region().await;
824 let input_meta = dummy_file_meta();
825 let expired_meta = dummy_file_meta();
826
827 let merger = MockMerger::new(vec![Err(error::InvalidMetaSnafu {
829 reason: "fail".to_string(),
830 }
831 .build())]);
832 let compactor = DefaultCompactor::with_merger(merger);
833
834 let picker_output = PickerOutput {
835 outputs: vec![CompactionOutput {
836 output_level: 1,
837 inputs: vec![new_file_handle(input_meta.clone())],
838 filter_deleted: false,
839 output_time_range: None,
840 }],
841 expired_ssts: vec![new_file_handle(expired_meta.clone())],
842 time_window_size: 3600,
843 max_file_size: None,
844 };
845
846 let merge_output = compactor
847 .merge_ssts(&compaction_region, picker_output)
848 .await
849 .unwrap();
850
851 assert!(merge_output.files_to_add.is_empty());
853 assert_eq!(merge_output.files_to_remove.len(), 1);
855 assert_eq!(
856 merge_output.files_to_remove[0].file_id,
857 expired_meta.file_id
858 );
859 }
860
861 #[derive(Clone)]
862 struct BlockingMerger {
863 call_idx: Arc<AtomicUsize>,
864 }
865
866 #[async_trait::async_trait]
867 impl SstMerger for BlockingMerger {
868 async fn merge_single_output(
869 &self,
870 _compaction_region: CompactionRegion,
871 _output: CompactionOutput,
872 _write_opts: WriteOptions,
873 ) -> Result<Vec<FileMeta>> {
874 self.call_idx.fetch_add(1, Ordering::SeqCst);
875 std::future::pending().await
876 }
877 }
878
879 #[tokio::test(flavor = "multi_thread")]
880 async fn test_merge_ssts_cancels_spawned_tasks() {
881 common_telemetry::init_default_ut_logging();
882
883 let mut compaction_region = new_test_compaction_region().await;
884 compaction_region.max_parallelism = 2;
885
886 let cancel_handle = Arc::new(CancellationHandle::default());
887 let call_idx = Arc::new(AtomicUsize::new(0));
888 let compactor = DefaultCompactor {
889 merger: BlockingMerger {
890 call_idx: call_idx.clone(),
891 },
892 cancel_handle: cancel_handle.clone(),
893 };
894
895 let picker_output = PickerOutput {
896 outputs: vec![
897 CompactionOutput {
898 output_level: 1,
899 inputs: vec![new_file_handle(dummy_file_meta())],
900 filter_deleted: false,
901 output_time_range: None,
902 },
903 CompactionOutput {
904 output_level: 1,
905 inputs: vec![new_file_handle(dummy_file_meta())],
906 filter_deleted: false,
907 output_time_range: None,
908 },
909 CompactionOutput {
910 output_level: 1,
911 inputs: vec![new_file_handle(dummy_file_meta())],
912 filter_deleted: false,
913 output_time_range: None,
914 },
915 ],
916 expired_ssts: vec![],
917 time_window_size: 3600,
918 max_file_size: None,
919 };
920
921 let task = tokio::spawn(async move {
922 compactor
923 .merge_ssts(&compaction_region, picker_output)
924 .await
925 });
926
927 sleep(Duration::from_millis(100)).await;
928 cancel_handle.cancel();
929
930 let merge_output = task
931 .await
932 .expect("merge_ssts should stop after cancellation")
933 .unwrap();
934
935 let started = call_idx.load(Ordering::SeqCst);
936
937 assert!(merge_output.files_to_add.is_empty());
938 assert!(merge_output.files_to_remove.is_empty());
939 assert_eq!(started, 2);
940 }
941}