1use std::num::NonZero;
16use std::sync::Arc;
17use std::time::Duration;
18
19use common_base::Plugins;
20use common_base::cancellation::{CancellableFuture, CancellationHandle};
21use common_meta::key::SchemaMetadataManagerRef;
22use common_telemetry::{debug, info, warn};
23use common_time::TimeToLive;
24use either::Either;
25use itertools::Itertools;
26use object_store::manager::ObjectStoreManagerRef;
27use partition::expr::PartitionExpr;
28use serde::{Deserialize, Serialize};
29use snafu::{OptionExt, ResultExt};
30use store_api::ManifestVersion;
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_request::PathType;
33use store_api::storage::RegionId;
34
35use crate::access_layer::{
36 AccessLayer, AccessLayerRef, Metrics, OperationType, SstWriteRequest, WriteType,
37};
38use crate::cache::{CacheManager, CacheManagerRef};
39use crate::compaction::picker::PickerOutput;
40use crate::compaction::{CompactionOutput, CompactionSstReaderBuilder, find_dynamic_options};
41use crate::config::MitoConfig;
42use crate::engine::region_hook::RegionHookRef;
43use crate::error;
44use crate::error::{
45 EmptyRegionDirSnafu, InvalidPartitionExprSnafu, ObjectStoreNotFoundSnafu, Result,
46};
47use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
48use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
49use crate::region::options::RegionOptions;
50use crate::region::version::VersionRef;
51use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState};
52use crate::schedule::scheduler::LocalScheduler;
53use crate::sst::FormatType;
54use crate::sst::file::FileMeta;
55use crate::sst::file_purger::LocalFilePurger;
56use crate::sst::index::intermediate::IntermediateManager;
57use crate::sst::index::puffin_manager::PuffinManagerFactory;
58use crate::sst::location::region_dir_from_table_dir;
59use crate::sst::parquet::metadata::extract_primary_key_range;
60use crate::sst::parquet::{SstInfo, WriteOptions};
61use crate::sst::version::{SstVersion, SstVersionRef};
62
63#[derive(Clone)]
65pub struct CompactionVersion {
66 pub(crate) metadata: RegionMetadataRef,
71 pub(crate) options: RegionOptions,
73 pub(crate) ssts: SstVersionRef,
75 pub(crate) compaction_time_window: Option<Duration>,
77}
78
79impl From<VersionRef> for CompactionVersion {
80 fn from(value: VersionRef) -> Self {
81 Self {
82 metadata: value.metadata.clone(),
83 options: value.options.clone(),
84 ssts: value.ssts.clone(),
85 compaction_time_window: value.compaction_time_window,
86 }
87 }
88}
89
90#[derive(Clone)]
93pub struct CompactionRegion {
94 pub region_id: RegionId,
95 pub region_options: RegionOptions,
96
97 pub(crate) engine_config: Arc<MitoConfig>,
98 pub(crate) region_metadata: RegionMetadataRef,
99 pub(crate) cache_manager: CacheManagerRef,
100 pub access_layer: AccessLayerRef,
102 pub(crate) manifest_ctx: Arc<ManifestContext>,
103 pub(crate) current_version: CompactionVersion,
104 pub(crate) file_purger: Option<Arc<LocalFilePurger>>,
105 pub(crate) ttl: Option<TimeToLive>,
106
107 pub max_parallelism: usize,
112
113 pub(crate) plugins: Plugins,
114}
115
116#[derive(Clone)]
118pub struct OpenCompactionRegionRequest {
119 pub region_id: RegionId,
120 pub table_dir: String,
121 pub path_type: PathType,
122 pub region_options: RegionOptions,
123 pub max_parallelism: usize,
124 pub plugins: Plugins,
126}
127
128pub async fn open_compaction_region(
131 req: &OpenCompactionRegionRequest,
132 mito_config: &MitoConfig,
133 object_store_manager: ObjectStoreManagerRef,
134 ttl_provider: Either<TimeToLive, SchemaMetadataManagerRef>,
135) -> Result<CompactionRegion> {
136 let object_store = {
137 let name = &req.region_options.storage;
138 if let Some(name) = name {
139 object_store_manager
140 .find(name)
141 .with_context(|| ObjectStoreNotFoundSnafu {
142 object_store: name.clone(),
143 })?
144 } else {
145 object_store_manager.default_object_store()
146 }
147 };
148
149 let access_layer = {
150 let puffin_manager_factory = PuffinManagerFactory::new(
151 &mito_config.index.aux_path,
152 mito_config.index.staging_size.as_bytes(),
153 Some(mito_config.index.write_buffer_size.as_bytes() as _),
154 mito_config.index.staging_ttl,
155 )
156 .await?;
157 let intermediate_manager =
158 IntermediateManager::init_fs(mito_config.index.aux_path.clone()).await?;
159
160 Arc::new(AccessLayer::new(
161 &req.table_dir,
162 req.path_type,
163 object_store.clone(),
164 puffin_manager_factory,
165 intermediate_manager,
166 ))
167 };
168
169 let manifest_manager = {
170 let region_dir = region_dir_from_table_dir(&req.table_dir, req.region_id, req.path_type);
171 let region_manifest_options =
172 RegionManifestOptions::new(mito_config, ®ion_dir, object_store);
173
174 RegionManifestManager::open(region_manifest_options, &Default::default())
175 .await?
176 .with_context(|| EmptyRegionDirSnafu {
177 region_id: req.region_id,
178 region_dir: region_dir_from_table_dir(&req.table_dir, req.region_id, req.path_type),
179 })?
180 };
181
182 let manifest = manifest_manager.manifest();
183 let region_metadata = manifest.metadata.clone();
184 let hook: Option<RegionHookRef> = req.plugins.get();
185 let manifest_ctx = Arc::new(ManifestContext::new(
186 manifest_manager,
187 RegionRoleState::Leader(RegionLeaderState::Writable),
188 hook,
189 ));
190
191 let file_purger = {
192 let purge_scheduler = Arc::new(LocalScheduler::new(mito_config.max_background_purges));
193 Arc::new(LocalFilePurger::new(
194 purge_scheduler.clone(),
195 access_layer.clone(),
196 None,
197 ))
198 };
199
200 let current_version = {
201 let mut ssts = SstVersion::new();
202 ssts.add_files(file_purger.clone(), manifest.files.values().cloned());
203 CompactionVersion {
204 metadata: region_metadata.clone(),
205 options: req.region_options.clone(),
206 ssts: Arc::new(ssts),
207 compaction_time_window: manifest.compaction_time_window,
208 }
209 };
210
211 let ttl = match ttl_provider {
212 Either::Left(ttl) => ttl,
214 Either::Right(schema_metadata_manager) => {
216 let (_, ttl) =
217 find_dynamic_options(req.region_id, &req.region_options, &schema_metadata_manager)
218 .await
219 .unwrap_or_else(|e| {
220 warn!(e; "Failed to get ttl for region: {}", region_metadata.region_id);
221 (
222 crate::region::options::CompactionOptions::default(),
223 TimeToLive::default(),
224 )
225 });
226 ttl
227 }
228 };
229
230 Ok(CompactionRegion {
231 region_id: req.region_id,
232 region_options: req.region_options.clone(),
233 engine_config: Arc::new(mito_config.clone()),
234 region_metadata: region_metadata.clone(),
235 cache_manager: Arc::new(CacheManager::default()),
236 access_layer,
237 manifest_ctx,
238 current_version,
239 file_purger: Some(file_purger),
240 ttl: Some(ttl),
241 max_parallelism: req.max_parallelism,
242 plugins: req.plugins.clone(),
243 })
244}
245
246impl CompactionRegion {
247 pub fn file_purger(&self) -> Option<Arc<LocalFilePurger>> {
249 self.file_purger.clone()
250 }
251
252 pub async fn stop_purger_scheduler(&self) -> Result<()> {
254 if let Some(file_purger) = &self.file_purger {
255 file_purger.stop_scheduler().await
256 } else {
257 Ok(())
258 }
259 }
260}
261
262#[derive(Default, Debug, Serialize, Deserialize)]
264pub struct MergeOutput {
265 pub files_to_add: Vec<FileMeta>,
266 pub files_to_remove: Vec<FileMeta>,
267 pub compaction_time_window: Option<i64>,
268 #[serde(skip)]
269 pub sst_infos: Vec<SstInfo>,
270}
271
272impl MergeOutput {
273 pub fn is_empty(&self) -> bool {
274 self.files_to_add.is_empty() && self.files_to_remove.is_empty()
275 }
276
277 pub fn input_file_size(&self) -> u64 {
278 self.files_to_remove.iter().map(|f| f.file_size).sum()
279 }
280
281 pub fn output_file_size(&self) -> u64 {
282 self.files_to_add.iter().map(|f| f.file_size).sum()
283 }
284}
285
286#[async_trait::async_trait]
288pub trait Compactor: Send + Sync + 'static {
289 async fn merge_ssts(
291 &self,
292 compaction_region: &CompactionRegion,
293 picker_output: PickerOutput,
294 ) -> Result<MergeOutput>;
295
296 async fn update_manifest(
298 &self,
299 compaction_region: &CompactionRegion,
300 merge_output: MergeOutput,
301 ) -> Result<(RegionEdit, ManifestVersion)>;
302}
303
304#[async_trait::async_trait]
309pub trait SstMerger: Send + Sync + 'static {
310 async fn merge_single_output(
311 &self,
312 compaction_region: CompactionRegion,
313 output: CompactionOutput,
314 write_opts: WriteOptions,
315 ) -> Result<(Vec<FileMeta>, Vec<SstInfo>)>;
316}
317
318#[derive(Clone)]
320pub struct DefaultSstMerger;
321
322#[async_trait::async_trait]
323impl SstMerger for DefaultSstMerger {
324 async fn merge_single_output(
325 &self,
326 compaction_region: CompactionRegion,
327 output: CompactionOutput,
328 write_opts: WriteOptions,
329 ) -> Result<(Vec<FileMeta>, Vec<SstInfo>)> {
330 let region_id = compaction_region.region_id;
331 let storage = compaction_region.region_options.storage.clone();
332 let index_options = compaction_region
333 .current_version
334 .options
335 .index_options
336 .clone();
337 let append_mode = compaction_region.current_version.options.append_mode;
338 let merge_mode = compaction_region.current_version.options.merge_mode();
339 let flat_format = compaction_region
340 .region_options
341 .sst_format
342 .map(|format| format == FormatType::Flat)
343 .unwrap_or(compaction_region.engine_config.default_flat_format);
344
345 let index_config = compaction_region.engine_config.index.clone();
346 let inverted_index_config = compaction_region.engine_config.inverted_index.clone();
347 let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone();
348 let bloom_filter_index_config = compaction_region.engine_config.bloom_filter_index.clone();
349 #[cfg(feature = "vector_index")]
350 let vector_index_config = compaction_region.engine_config.vector_index.clone();
351
352 let input_file_names = output
353 .inputs
354 .iter()
355 .map(|f| f.file_id().to_string())
356 .join(",");
357 let max_sequence = output
358 .inputs
359 .iter()
360 .map(|f| f.meta_ref().sequence)
361 .max()
362 .flatten();
363 let builder = CompactionSstReaderBuilder {
364 metadata: compaction_region.region_metadata.clone(),
365 sst_layer: compaction_region.access_layer.clone(),
366 cache: compaction_region.cache_manager.clone(),
367 inputs: &output.inputs,
368 append_mode,
369 filter_deleted: output.filter_deleted,
370 time_range: output.output_time_range,
371 merge_mode,
372 };
373 let source = builder.build_flat_sst_reader().await?;
374
375 let mut metrics = Metrics::new(WriteType::Compaction);
376 let region_metadata = compaction_region.region_metadata.clone();
377 let sst_infos = compaction_region
378 .access_layer
379 .write_sst(
380 SstWriteRequest {
381 op_type: OperationType::Compact,
382 metadata: region_metadata.clone(),
383 source,
384 cache_manager: compaction_region.cache_manager.clone(),
385 storage,
386 max_sequence: max_sequence.map(NonZero::get),
387 sst_write_format: if flat_format {
388 FormatType::Flat
389 } else {
390 FormatType::PrimaryKey
391 },
392 index_options,
393 index_config,
394 inverted_index_config,
395 fulltext_index_config,
396 bloom_filter_index_config,
397 #[cfg(feature = "vector_index")]
398 vector_index_config,
399 },
400 &write_opts,
401 &mut metrics,
402 )
403 .await?;
404 let partition_expr = match ®ion_metadata.partition_expr {
406 None => None,
407 Some(json_str) if json_str.is_empty() => None,
408 Some(json_str) => PartitionExpr::from_json_str(json_str).with_context(|_| {
409 InvalidPartitionExprSnafu {
410 expr: json_str.clone(),
411 }
412 })?,
413 };
414
415 let output_files = sst_infos
416 .iter()
417 .map(|sst_info| {
418 let pk_range = sst_info
419 .file_metadata
420 .as_ref()
421 .and_then(|meta| extract_primary_key_range(meta, ®ion_metadata));
422 let (primary_key_min, primary_key_max) = match pk_range {
423 Some((min, max)) => (Some(min), Some(max)),
424 None => (None, None),
425 };
426
427 FileMeta {
428 region_id,
429 file_id: sst_info.file_id,
430 time_range: sst_info.time_range,
431 level: output.output_level,
432 file_size: sst_info.file_size,
433 max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
434 available_indexes: sst_info.index_metadata.build_available_indexes(),
435 indexes: sst_info.index_metadata.build_indexes(),
436 index_file_size: sst_info.index_metadata.file_size,
437 index_version: 0,
438 num_rows: sst_info.num_rows as u64,
439 num_row_groups: sst_info.num_row_groups,
440 sequence: max_sequence,
441 partition_expr: partition_expr.clone(),
442 num_series: sst_info.num_series,
443 primary_key_min,
444 primary_key_max,
445 }
446 })
447 .collect::<Vec<_>>();
448 let output_file_names = output_files.iter().map(|f| f.file_id.to_string()).join(",");
449 info!(
450 "Region {} compaction inputs: [{}], outputs: [{}], flat_format: {}, metrics: {:?}",
451 region_id, input_file_names, output_file_names, flat_format, metrics
452 );
453 metrics.observe();
454 Ok((output_files, sst_infos.into_iter().collect()))
455 }
456}
457
458pub struct DefaultCompactor<M = DefaultSstMerger> {
463 merger: M,
464 cancel_handle: Arc<CancellationHandle>,
465}
466
467#[cfg(test)]
468impl<M: SstMerger> DefaultCompactor<M> {
469 pub fn with_merger(merger: M) -> Self {
470 Self {
471 merger,
472 cancel_handle: Arc::new(CancellationHandle::default()),
473 }
474 }
475}
476
477impl DefaultCompactor {
478 pub fn with_cancel_handle(cancel_handle: Arc<CancellationHandle>) -> Self {
479 Self {
480 merger: DefaultSstMerger,
481 cancel_handle,
482 }
483 }
484}
485
486#[async_trait::async_trait]
487impl<M: SstMerger> Compactor for DefaultCompactor<M>
488where
489 M: Clone,
490{
491 async fn merge_ssts(
492 &self,
493 compaction_region: &CompactionRegion,
494 mut picker_output: PickerOutput,
495 ) -> Result<MergeOutput> {
496 let internal_parallelism = compaction_region.max_parallelism.max(1);
497 let compaction_time_window = picker_output.time_window_size;
498 let region_id = compaction_region.region_id;
499
500 let mut tasks: Vec<(Vec<FileMeta>, _)> = Vec::with_capacity(picker_output.outputs.len());
503
504 for output in picker_output.outputs.drain(..) {
505 let inputs_to_remove: Vec<_> =
506 output.inputs.iter().map(|f| f.meta_ref().clone()).collect();
507 let write_opts = WriteOptions {
508 write_buffer_size: compaction_region.engine_config.sst_write_buffer_size,
509 max_file_size: picker_output.max_file_size,
510 ..Default::default()
511 };
512 let merger = self.merger.clone();
513 let compaction_region = compaction_region.clone();
514 let fut = async move {
515 merger
516 .merge_single_output(compaction_region, output, write_opts)
517 .await
518 };
519 tasks.push((inputs_to_remove, fut));
520 }
521
522 let hook: Option<RegionHookRef> = compaction_region.plugins.get();
523 let mut output_files = Vec::with_capacity(tasks.len());
524 let mut all_sst_infos: Vec<SstInfo> = Vec::new();
525 let mut compacted_inputs = Vec::with_capacity(
526 tasks.iter().map(|(inputs, _)| inputs.len()).sum::<usize>()
527 + picker_output.expired_ssts.len(),
528 );
529
530 while !tasks.is_empty() {
531 let mut chunk: Vec<(Vec<FileMeta>, _)> = Vec::with_capacity(internal_parallelism);
532 for _ in 0..internal_parallelism {
533 if let Some(task) = tasks.pop() {
534 chunk.push(task);
535 }
536 }
537 let mut spawned: Vec<_> = chunk
538 .into_iter()
539 .map(|(inputs, fut)| {
540 let handle = common_runtime::spawn_compact(fut);
541 (inputs, handle)
542 })
543 .collect();
544
545 while let Some((inputs, handle)) = spawned.pop() {
546 let abort_handle = handle.abort_handle();
547 match CancellableFuture::new(handle, self.cancel_handle.clone()).await {
548 Ok(Ok(Ok((files, infos)))) => {
549 output_files.extend(files);
550 if hook.is_some() {
551 all_sst_infos.extend(infos);
552 }
553 compacted_inputs.extend(inputs);
554 }
555 Ok(Ok(Err(e))) => {
556 warn!(
557 e; "Failed to merge compaction output for region: {}, inputs: [{}]",
558 region_id,
559 inputs.iter().map(|f| f.file_id.to_string()).join(",")
560 );
561 }
562 Ok(Err(e)) => {
563 warn!(
564 "Region {} compaction task join error for inputs: [{}], skipping: {}",
565 region_id,
566 inputs.iter().map(|f| f.file_id.to_string()).join(","),
567 e
568 );
569 if self.cancel_handle.is_cancelled() {
572 abort_handle.abort();
573 for (_, handle) in spawned {
574 handle.abort();
575 }
576 }
577 return Err(e).context(error::JoinSnafu);
578 }
579 Err(_) => {
580 debug!(
581 "Compaction merge cancelled for region: {}, aborting remaining {} spawned tasks",
582 region_id,
583 spawned.len(),
584 );
585 abort_handle.abort();
586 for (_, handle) in spawned {
587 handle.abort();
588 }
589 break;
590 }
591 }
592 }
593
594 if self.cancel_handle.is_cancelled() {
595 info!("Compaction merge cancelled for region: {}", region_id);
596 break;
597 }
598 }
599
600 compacted_inputs.extend(
602 picker_output
603 .expired_ssts
604 .iter()
605 .map(|f| f.meta_ref().clone()),
606 );
607
608 Ok(MergeOutput {
609 files_to_add: output_files,
610 files_to_remove: compacted_inputs,
611 compaction_time_window: Some(compaction_time_window),
612 sst_infos: all_sst_infos,
613 })
614 }
615
616 async fn update_manifest(
617 &self,
618 compaction_region: &CompactionRegion,
619 merge_output: MergeOutput,
620 ) -> Result<(RegionEdit, ManifestVersion)> {
621 let edit = RegionEdit {
623 files_to_add: merge_output.files_to_add,
624 files_to_remove: merge_output.files_to_remove,
625 timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
627 compaction_time_window: merge_output
628 .compaction_time_window
629 .map(|seconds| Duration::from_secs(seconds as u64)),
630 flushed_entry_id: None,
631 flushed_sequence: None,
632 committed_sequence: None,
633 };
634
635 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
636 let manifest_version = compaction_region
638 .manifest_ctx
639 .update_manifest_for_compaction(action_list)
640 .await?;
641
642 Ok((edit, manifest_version))
643 }
644}
645
646#[cfg(test)]
647mod tests {
648 use std::sync::atomic::{AtomicUsize, Ordering};
649 use std::sync::{Arc, Mutex};
650 use std::time::Duration;
651
652 use store_api::storage::{FileId, RegionId};
653 use tokio::time::sleep;
654
655 use super::{DefaultCompactor, *};
656 use crate::cache::CacheManager;
657 use crate::compaction::picker::PickerOutput;
658 use crate::error::Result;
659 use crate::sst::file::FileHandle;
660 use crate::sst::file_purger::NoopFilePurger;
661 use crate::sst::version::SstVersion;
662 use crate::test_util::memtable_util::metadata_for_test;
663 use crate::test_util::scheduler_util::SchedulerEnv;
664
665 fn dummy_file_meta() -> FileMeta {
666 FileMeta {
667 region_id: RegionId::new(1, 1),
668 file_id: FileId::random(),
669 file_size: 100,
670 ..Default::default()
671 }
672 }
673
674 fn new_file_handle(meta: FileMeta) -> FileHandle {
675 FileHandle::new(meta, Arc::new(NoopFilePurger))
676 }
677
678 async fn new_test_compaction_region() -> CompactionRegion {
681 let env = SchedulerEnv::new().await;
682 let metadata = metadata_for_test();
683 let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
684 CompactionRegion {
685 region_id: RegionId::new(1, 1),
686 region_options: RegionOptions::default(),
687 engine_config: Arc::new(MitoConfig::default()),
688 region_metadata: metadata.clone(),
689 cache_manager: Arc::new(CacheManager::default()),
690 access_layer: env.access_layer.clone(),
691 manifest_ctx,
692 current_version: CompactionVersion {
693 metadata,
694 options: RegionOptions::default(),
695 ssts: Arc::new(SstVersion::new()),
696 compaction_time_window: None,
697 },
698 file_purger: None,
699 ttl: None,
700 max_parallelism: 1,
701 plugins: Plugins::new(),
702 }
703 }
704
705 #[derive(Clone)]
709 struct MockMerger {
710 results: Arc<Mutex<Vec<Result<Vec<FileMeta>>>>>,
711 call_idx: Arc<AtomicUsize>,
712 }
713
714 impl MockMerger {
715 fn new(results: Vec<Result<Vec<FileMeta>>>) -> Self {
716 Self {
717 results: Arc::new(Mutex::new(results)),
718 call_idx: Arc::new(AtomicUsize::new(0)),
719 }
720 }
721 }
722
723 #[async_trait::async_trait]
724 impl SstMerger for MockMerger {
725 async fn merge_single_output(
726 &self,
727 _compaction_region: CompactionRegion,
728 _output: CompactionOutput,
729 _write_opts: WriteOptions,
730 ) -> Result<(Vec<FileMeta>, Vec<SstInfo>)> {
731 let idx = self.call_idx.fetch_add(1, Ordering::SeqCst);
732 match self.results.lock().unwrap().get(idx) {
733 Some(Ok(files)) => Ok((files.clone(), Vec::new())),
734 Some(Err(_)) => error::InvalidMetaSnafu {
735 reason: format!("simulated failure at index {idx}"),
736 }
737 .fail(),
738 None => panic!("MockMerger: no result configured for call index {idx}"),
739 }
740 }
741 }
742
743 #[tokio::test]
744 async fn test_partial_merge_failure_collects_only_successful_outputs() {
745 common_telemetry::init_default_ut_logging();
746
747 let compaction_region = new_test_compaction_region().await;
748
749 let input_meta_0 = dummy_file_meta();
751 let input_meta_1 = dummy_file_meta();
752 let input_meta_2 = dummy_file_meta();
753
754 let output_meta_0 = vec![dummy_file_meta()];
755 let output_meta_2 = vec![dummy_file_meta(), dummy_file_meta()];
756
757 let merger = MockMerger::new(vec![
758 Ok(output_meta_0.clone()),
759 Err(error::InvalidMetaSnafu {
760 reason: "boom".to_string(),
761 }
762 .build()),
763 Ok(output_meta_2.clone()),
764 ]);
765 let compactor = DefaultCompactor::with_merger(merger);
766
767 let picker_output = PickerOutput {
768 outputs: vec![
769 CompactionOutput {
770 output_level: 1,
771 inputs: vec![new_file_handle(input_meta_0.clone())],
772 filter_deleted: false,
773 output_time_range: None,
774 },
775 CompactionOutput {
776 output_level: 1,
777 inputs: vec![new_file_handle(input_meta_1.clone())],
778 filter_deleted: false,
779 output_time_range: None,
780 },
781 CompactionOutput {
782 output_level: 1,
783 inputs: vec![new_file_handle(input_meta_2.clone())],
784 filter_deleted: false,
785 output_time_range: None,
786 },
787 ],
788 expired_ssts: vec![],
789 time_window_size: 3600,
790 max_file_size: None,
791 };
792
793 let merge_output = compactor
794 .merge_ssts(&compaction_region, picker_output)
795 .await
796 .unwrap();
797
798 assert_eq!(merge_output.files_to_add.len(), 3);
800 assert_eq!(merge_output.files_to_remove.len(), 2);
802
803 let removed_ids: Vec<_> = merge_output
804 .files_to_remove
805 .iter()
806 .map(|f| f.file_id)
807 .collect();
808 assert!(removed_ids.contains(&input_meta_0.file_id));
809 assert!(removed_ids.contains(&input_meta_2.file_id));
810 assert!(!removed_ids.contains(&input_meta_1.file_id));
812 }
813
814 #[tokio::test]
815 async fn test_all_outputs_succeed() {
816 common_telemetry::init_default_ut_logging();
817
818 let compaction_region = new_test_compaction_region().await;
819 let input_meta = dummy_file_meta();
820 let output_meta = vec![dummy_file_meta()];
821
822 let merger = MockMerger::new(vec![Ok(output_meta.clone())]);
823 let compactor = DefaultCompactor::with_merger(merger);
824
825 let picker_output = PickerOutput {
826 outputs: vec![CompactionOutput {
827 output_level: 1,
828 inputs: vec![new_file_handle(input_meta.clone())],
829 filter_deleted: false,
830 output_time_range: None,
831 }],
832 expired_ssts: vec![],
833 time_window_size: 3600,
834 max_file_size: None,
835 };
836
837 let merge_output = compactor
838 .merge_ssts(&compaction_region, picker_output)
839 .await
840 .unwrap();
841
842 assert_eq!(merge_output.files_to_add.len(), 1);
843 assert_eq!(merge_output.files_to_add[0].file_id, output_meta[0].file_id);
844 assert_eq!(merge_output.files_to_remove.len(), 1);
845 assert_eq!(merge_output.files_to_remove[0].file_id, input_meta.file_id);
846 }
847
848 #[tokio::test]
849 async fn test_expired_ssts_always_removed() {
850 common_telemetry::init_default_ut_logging();
851
852 let compaction_region = new_test_compaction_region().await;
853 let input_meta = dummy_file_meta();
854 let expired_meta = dummy_file_meta();
855
856 let merger = MockMerger::new(vec![Err(error::InvalidMetaSnafu {
858 reason: "fail".to_string(),
859 }
860 .build())]);
861 let compactor = DefaultCompactor::with_merger(merger);
862
863 let picker_output = PickerOutput {
864 outputs: vec![CompactionOutput {
865 output_level: 1,
866 inputs: vec![new_file_handle(input_meta.clone())],
867 filter_deleted: false,
868 output_time_range: None,
869 }],
870 expired_ssts: vec![new_file_handle(expired_meta.clone())],
871 time_window_size: 3600,
872 max_file_size: None,
873 };
874
875 let merge_output = compactor
876 .merge_ssts(&compaction_region, picker_output)
877 .await
878 .unwrap();
879
880 assert!(merge_output.files_to_add.is_empty());
882 assert_eq!(merge_output.files_to_remove.len(), 1);
884 assert_eq!(
885 merge_output.files_to_remove[0].file_id,
886 expired_meta.file_id
887 );
888 }
889
890 #[derive(Clone)]
891 struct BlockingMerger {
892 call_idx: Arc<AtomicUsize>,
893 }
894
895 #[async_trait::async_trait]
896 impl SstMerger for BlockingMerger {
897 async fn merge_single_output(
898 &self,
899 _compaction_region: CompactionRegion,
900 _output: CompactionOutput,
901 _write_opts: WriteOptions,
902 ) -> Result<(Vec<FileMeta>, Vec<SstInfo>)> {
903 self.call_idx.fetch_add(1, Ordering::SeqCst);
904 std::future::pending().await
905 }
906 }
907
908 #[tokio::test(flavor = "multi_thread")]
909 async fn test_merge_ssts_cancels_spawned_tasks() {
910 common_telemetry::init_default_ut_logging();
911
912 let mut compaction_region = new_test_compaction_region().await;
913 compaction_region.max_parallelism = 2;
914
915 let cancel_handle = Arc::new(CancellationHandle::default());
916 let call_idx = Arc::new(AtomicUsize::new(0));
917 let compactor = DefaultCompactor {
918 merger: BlockingMerger {
919 call_idx: call_idx.clone(),
920 },
921 cancel_handle: cancel_handle.clone(),
922 };
923
924 let picker_output = PickerOutput {
925 outputs: vec![
926 CompactionOutput {
927 output_level: 1,
928 inputs: vec![new_file_handle(dummy_file_meta())],
929 filter_deleted: false,
930 output_time_range: None,
931 },
932 CompactionOutput {
933 output_level: 1,
934 inputs: vec![new_file_handle(dummy_file_meta())],
935 filter_deleted: false,
936 output_time_range: None,
937 },
938 CompactionOutput {
939 output_level: 1,
940 inputs: vec![new_file_handle(dummy_file_meta())],
941 filter_deleted: false,
942 output_time_range: None,
943 },
944 ],
945 expired_ssts: vec![],
946 time_window_size: 3600,
947 max_file_size: None,
948 };
949
950 let task = tokio::spawn(async move {
951 compactor
952 .merge_ssts(&compaction_region, picker_output)
953 .await
954 });
955
956 sleep(Duration::from_millis(100)).await;
957 cancel_handle.cancel();
958
959 let merge_output = task
960 .await
961 .expect("merge_ssts should stop after cancellation")
962 .unwrap();
963
964 let started = call_idx.load(Ordering::SeqCst);
965
966 assert!(merge_output.files_to_add.is_empty());
967 assert!(merge_output.files_to_remove.is_empty());
968 assert_eq!(started, 2);
969 }
970}