Skip to main content

mito2/compaction/
compactor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::num::NonZero;
16use std::sync::Arc;
17use std::time::Duration;
18
19use common_base::Plugins;
20use common_base::cancellation::{CancellableFuture, CancellationHandle};
21use common_meta::key::SchemaMetadataManagerRef;
22use common_telemetry::{debug, info, warn};
23use common_time::TimeToLive;
24use either::Either;
25use itertools::Itertools;
26use object_store::manager::ObjectStoreManagerRef;
27use partition::expr::PartitionExpr;
28use serde::{Deserialize, Serialize};
29use snafu::{OptionExt, ResultExt};
30use store_api::ManifestVersion;
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_request::PathType;
33use store_api::storage::RegionId;
34
35use crate::access_layer::{
36    AccessLayer, AccessLayerRef, Metrics, OperationType, SstWriteRequest, WriteType,
37};
38use crate::cache::{CacheManager, CacheManagerRef};
39use crate::compaction::picker::PickerOutput;
40use crate::compaction::{CompactionOutput, CompactionSstReaderBuilder, find_dynamic_options};
41use crate::config::MitoConfig;
42use crate::engine::region_hook::RegionHookRef;
43use crate::error;
44use crate::error::{
45    EmptyRegionDirSnafu, InvalidPartitionExprSnafu, ObjectStoreNotFoundSnafu, Result,
46};
47use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
48use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
49use crate::region::options::RegionOptions;
50use crate::region::version::VersionRef;
51use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState};
52use crate::schedule::scheduler::LocalScheduler;
53use crate::sst::FormatType;
54use crate::sst::file::FileMeta;
55use crate::sst::file_purger::LocalFilePurger;
56use crate::sst::index::intermediate::IntermediateManager;
57use crate::sst::index::puffin_manager::PuffinManagerFactory;
58use crate::sst::location::region_dir_from_table_dir;
59use crate::sst::parquet::metadata::extract_primary_key_range;
60use crate::sst::parquet::{SstInfo, WriteOptions};
61use crate::sst::version::{SstVersion, SstVersionRef};
62
63/// Region version for compaction that does not hold memtables.
64#[derive(Clone)]
65pub struct CompactionVersion {
66    /// Metadata of the region.
67    ///
68    /// Altering metadata isn't frequent, storing metadata in Arc to allow sharing
69    /// metadata and reuse metadata when creating a new `Version`.
70    pub(crate) metadata: RegionMetadataRef,
71    /// Options of the region.
72    pub(crate) options: RegionOptions,
73    /// SSTs of the region.
74    pub(crate) ssts: SstVersionRef,
75    /// Inferred compaction time window.
76    pub(crate) compaction_time_window: Option<Duration>,
77}
78
79impl From<VersionRef> for CompactionVersion {
80    fn from(value: VersionRef) -> Self {
81        Self {
82            metadata: value.metadata.clone(),
83            options: value.options.clone(),
84            ssts: value.ssts.clone(),
85            compaction_time_window: value.compaction_time_window,
86        }
87    }
88}
89
90/// CompactionRegion represents a region that needs to be compacted.
91/// It's the subset of MitoRegion.
92#[derive(Clone)]
93pub struct CompactionRegion {
94    pub region_id: RegionId,
95    pub region_options: RegionOptions,
96
97    pub(crate) engine_config: Arc<MitoConfig>,
98    pub(crate) region_metadata: RegionMetadataRef,
99    pub(crate) cache_manager: CacheManagerRef,
100    /// Access layer to get the table path and path type.
101    pub access_layer: AccessLayerRef,
102    pub(crate) manifest_ctx: Arc<ManifestContext>,
103    pub(crate) current_version: CompactionVersion,
104    pub(crate) file_purger: Option<Arc<LocalFilePurger>>,
105    pub(crate) ttl: Option<TimeToLive>,
106
107    /// Controls the parallelism of this compaction task. Default is 1.
108    ///
109    /// The parallel is inside this compaction task, not across different compaction tasks.
110    /// It can be different windows of the same compaction task or something like this.
111    pub max_parallelism: usize,
112
113    pub(crate) plugins: Plugins,
114}
115
116/// OpenCompactionRegionRequest represents the request to open a compaction region.
117#[derive(Clone)]
118pub struct OpenCompactionRegionRequest {
119    pub region_id: RegionId,
120    pub table_dir: String,
121    pub path_type: PathType,
122    pub region_options: RegionOptions,
123    pub max_parallelism: usize,
124    /// Plugins for the compaction region, used to look up the [`RegionHook`](crate::engine::region_hook::RegionHook).
125    pub plugins: Plugins,
126}
127
128/// Open a compaction region from a compaction request.
129/// It's simple version of RegionOpener::open().
130pub async fn open_compaction_region(
131    req: &OpenCompactionRegionRequest,
132    mito_config: &MitoConfig,
133    object_store_manager: ObjectStoreManagerRef,
134    ttl_provider: Either<TimeToLive, SchemaMetadataManagerRef>,
135) -> Result<CompactionRegion> {
136    let object_store = {
137        let name = &req.region_options.storage;
138        if let Some(name) = name {
139            object_store_manager
140                .find(name)
141                .with_context(|| ObjectStoreNotFoundSnafu {
142                    object_store: name.clone(),
143                })?
144        } else {
145            object_store_manager.default_object_store()
146        }
147    };
148
149    let access_layer = {
150        let puffin_manager_factory = PuffinManagerFactory::new(
151            &mito_config.index.aux_path,
152            mito_config.index.staging_size.as_bytes(),
153            Some(mito_config.index.write_buffer_size.as_bytes() as _),
154            mito_config.index.staging_ttl,
155        )
156        .await?;
157        let intermediate_manager =
158            IntermediateManager::init_fs(mito_config.index.aux_path.clone()).await?;
159
160        Arc::new(AccessLayer::new(
161            &req.table_dir,
162            req.path_type,
163            object_store.clone(),
164            puffin_manager_factory,
165            intermediate_manager,
166        ))
167    };
168
169    let manifest_manager = {
170        let region_dir = region_dir_from_table_dir(&req.table_dir, req.region_id, req.path_type);
171        let region_manifest_options =
172            RegionManifestOptions::new(mito_config, &region_dir, object_store);
173
174        RegionManifestManager::open(region_manifest_options, &Default::default())
175            .await?
176            .with_context(|| EmptyRegionDirSnafu {
177                region_id: req.region_id,
178                region_dir: region_dir_from_table_dir(&req.table_dir, req.region_id, req.path_type),
179            })?
180    };
181
182    let manifest = manifest_manager.manifest();
183    let region_metadata = manifest.metadata.clone();
184    let hook: Option<RegionHookRef> = req.plugins.get();
185    let manifest_ctx = Arc::new(ManifestContext::new(
186        manifest_manager,
187        RegionRoleState::Leader(RegionLeaderState::Writable),
188        hook,
189    ));
190
191    let file_purger = {
192        let purge_scheduler = Arc::new(LocalScheduler::new(mito_config.max_background_purges));
193        Arc::new(LocalFilePurger::new(
194            purge_scheduler.clone(),
195            access_layer.clone(),
196            None,
197        ))
198    };
199
200    let current_version = {
201        let mut ssts = SstVersion::new();
202        ssts.add_files(file_purger.clone(), manifest.files.values().cloned());
203        CompactionVersion {
204            metadata: region_metadata.clone(),
205            options: req.region_options.clone(),
206            ssts: Arc::new(ssts),
207            compaction_time_window: manifest.compaction_time_window,
208        }
209    };
210
211    let ttl = match ttl_provider {
212        // Use the specified ttl.
213        Either::Left(ttl) => ttl,
214        // Get the ttl from the schema metadata manager.
215        Either::Right(schema_metadata_manager) => {
216            let (_, ttl) =
217                find_dynamic_options(req.region_id, &req.region_options, &schema_metadata_manager)
218                    .await
219                    .unwrap_or_else(|e| {
220                        warn!(e; "Failed to get ttl for region: {}", region_metadata.region_id);
221                        (
222                            crate::region::options::CompactionOptions::default(),
223                            TimeToLive::default(),
224                        )
225                    });
226            ttl
227        }
228    };
229
230    Ok(CompactionRegion {
231        region_id: req.region_id,
232        region_options: req.region_options.clone(),
233        engine_config: Arc::new(mito_config.clone()),
234        region_metadata: region_metadata.clone(),
235        cache_manager: Arc::new(CacheManager::default()),
236        access_layer,
237        manifest_ctx,
238        current_version,
239        file_purger: Some(file_purger),
240        ttl: Some(ttl),
241        max_parallelism: req.max_parallelism,
242        plugins: req.plugins.clone(),
243    })
244}
245
246impl CompactionRegion {
247    /// Get the file purger of the compaction region.
248    pub fn file_purger(&self) -> Option<Arc<LocalFilePurger>> {
249        self.file_purger.clone()
250    }
251
252    /// Stop the file purger scheduler of the compaction region.
253    pub async fn stop_purger_scheduler(&self) -> Result<()> {
254        if let Some(file_purger) = &self.file_purger {
255            file_purger.stop_scheduler().await
256        } else {
257            Ok(())
258        }
259    }
260}
261
262/// `[MergeOutput]` represents the output of merging SST files.
263#[derive(Default, Debug, Serialize, Deserialize)]
264pub struct MergeOutput {
265    pub files_to_add: Vec<FileMeta>,
266    pub files_to_remove: Vec<FileMeta>,
267    pub compaction_time_window: Option<i64>,
268    #[serde(skip)]
269    pub sst_infos: Vec<SstInfo>,
270}
271
272impl MergeOutput {
273    pub fn is_empty(&self) -> bool {
274        self.files_to_add.is_empty() && self.files_to_remove.is_empty()
275    }
276
277    pub fn input_file_size(&self) -> u64 {
278        self.files_to_remove.iter().map(|f| f.file_size).sum()
279    }
280
281    pub fn output_file_size(&self) -> u64 {
282        self.files_to_add.iter().map(|f| f.file_size).sum()
283    }
284}
285
286/// Compactor is the trait that defines the compaction logic.
287#[async_trait::async_trait]
288pub trait Compactor: Send + Sync + 'static {
289    /// Merge SST files for a region.
290    async fn merge_ssts(
291        &self,
292        compaction_region: &CompactionRegion,
293        picker_output: PickerOutput,
294    ) -> Result<MergeOutput>;
295
296    /// Update the manifest after merging SST files.
297    async fn update_manifest(
298        &self,
299        compaction_region: &CompactionRegion,
300        merge_output: MergeOutput,
301    ) -> Result<(RegionEdit, ManifestVersion)>;
302}
303
304/// Trait for merging a single compaction output into SST files.
305///
306/// This is extracted from `DefaultCompactor` to allow injecting mock
307/// implementations in tests.
308#[async_trait::async_trait]
309pub trait SstMerger: Send + Sync + 'static {
310    async fn merge_single_output(
311        &self,
312        compaction_region: CompactionRegion,
313        output: CompactionOutput,
314        write_opts: WriteOptions,
315    ) -> Result<(Vec<FileMeta>, Vec<SstInfo>)>;
316}
317
318/// The production [`SstMerger`] that reads, merges, and writes SST files.
319#[derive(Clone)]
320pub struct DefaultSstMerger;
321
322#[async_trait::async_trait]
323impl SstMerger for DefaultSstMerger {
324    async fn merge_single_output(
325        &self,
326        compaction_region: CompactionRegion,
327        output: CompactionOutput,
328        write_opts: WriteOptions,
329    ) -> Result<(Vec<FileMeta>, Vec<SstInfo>)> {
330        let region_id = compaction_region.region_id;
331        let storage = compaction_region.region_options.storage.clone();
332        let index_options = compaction_region
333            .current_version
334            .options
335            .index_options
336            .clone();
337        let append_mode = compaction_region.current_version.options.append_mode;
338        let merge_mode = compaction_region.current_version.options.merge_mode();
339        let flat_format = compaction_region
340            .region_options
341            .sst_format
342            .map(|format| format == FormatType::Flat)
343            .unwrap_or(compaction_region.engine_config.default_flat_format);
344
345        let index_config = compaction_region.engine_config.index.clone();
346        let inverted_index_config = compaction_region.engine_config.inverted_index.clone();
347        let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone();
348        let bloom_filter_index_config = compaction_region.engine_config.bloom_filter_index.clone();
349        #[cfg(feature = "vector_index")]
350        let vector_index_config = compaction_region.engine_config.vector_index.clone();
351
352        let input_file_names = output
353            .inputs
354            .iter()
355            .map(|f| f.file_id().to_string())
356            .join(",");
357        let max_sequence = output
358            .inputs
359            .iter()
360            .map(|f| f.meta_ref().sequence)
361            .max()
362            .flatten();
363        let builder = CompactionSstReaderBuilder {
364            metadata: compaction_region.region_metadata.clone(),
365            sst_layer: compaction_region.access_layer.clone(),
366            cache: compaction_region.cache_manager.clone(),
367            inputs: &output.inputs,
368            append_mode,
369            filter_deleted: output.filter_deleted,
370            time_range: output.output_time_range,
371            merge_mode,
372        };
373        let source = builder.build_flat_sst_reader().await?;
374
375        let mut metrics = Metrics::new(WriteType::Compaction);
376        let region_metadata = compaction_region.region_metadata.clone();
377        let sst_infos = compaction_region
378            .access_layer
379            .write_sst(
380                SstWriteRequest {
381                    op_type: OperationType::Compact,
382                    metadata: region_metadata.clone(),
383                    source,
384                    cache_manager: compaction_region.cache_manager.clone(),
385                    storage,
386                    max_sequence: max_sequence.map(NonZero::get),
387                    sst_write_format: if flat_format {
388                        FormatType::Flat
389                    } else {
390                        FormatType::PrimaryKey
391                    },
392                    index_options,
393                    index_config,
394                    inverted_index_config,
395                    fulltext_index_config,
396                    bloom_filter_index_config,
397                    #[cfg(feature = "vector_index")]
398                    vector_index_config,
399                },
400                &write_opts,
401                &mut metrics,
402            )
403            .await?;
404        // Convert partition expression once outside the map
405        let partition_expr = match &region_metadata.partition_expr {
406            None => None,
407            Some(json_str) if json_str.is_empty() => None,
408            Some(json_str) => PartitionExpr::from_json_str(json_str).with_context(|_| {
409                InvalidPartitionExprSnafu {
410                    expr: json_str.clone(),
411                }
412            })?,
413        };
414
415        let output_files = sst_infos
416            .iter()
417            .map(|sst_info| {
418                let pk_range = sst_info
419                    .file_metadata
420                    .as_ref()
421                    .and_then(|meta| extract_primary_key_range(meta, &region_metadata));
422                let (primary_key_min, primary_key_max) = match pk_range {
423                    Some((min, max)) => (Some(min), Some(max)),
424                    None => (None, None),
425                };
426
427                FileMeta {
428                    region_id,
429                    file_id: sst_info.file_id,
430                    time_range: sst_info.time_range,
431                    level: output.output_level,
432                    file_size: sst_info.file_size,
433                    max_row_group_uncompressed_size: sst_info.max_row_group_uncompressed_size,
434                    available_indexes: sst_info.index_metadata.build_available_indexes(),
435                    indexes: sst_info.index_metadata.build_indexes(),
436                    index_file_size: sst_info.index_metadata.file_size,
437                    index_version: 0,
438                    num_rows: sst_info.num_rows as u64,
439                    num_row_groups: sst_info.num_row_groups,
440                    sequence: max_sequence,
441                    partition_expr: partition_expr.clone(),
442                    num_series: sst_info.num_series,
443                    primary_key_min,
444                    primary_key_max,
445                }
446            })
447            .collect::<Vec<_>>();
448        let output_file_names = output_files.iter().map(|f| f.file_id.to_string()).join(",");
449        info!(
450            "Region {} compaction inputs: [{}], outputs: [{}], flat_format: {}, metrics: {:?}",
451            region_id, input_file_names, output_file_names, flat_format, metrics
452        );
453        metrics.observe();
454        Ok((output_files, sst_infos.into_iter().collect()))
455    }
456}
457
458/// DefaultCompactor is the default implementation of Compactor.
459///
460/// It is parameterized by an [`SstMerger`] to allow injecting mock
461/// implementations in tests.
462pub struct DefaultCompactor<M = DefaultSstMerger> {
463    merger: M,
464    cancel_handle: Arc<CancellationHandle>,
465}
466
467#[cfg(test)]
468impl<M: SstMerger> DefaultCompactor<M> {
469    pub fn with_merger(merger: M) -> Self {
470        Self {
471            merger,
472            cancel_handle: Arc::new(CancellationHandle::default()),
473        }
474    }
475}
476
477impl DefaultCompactor {
478    pub fn with_cancel_handle(cancel_handle: Arc<CancellationHandle>) -> Self {
479        Self {
480            merger: DefaultSstMerger,
481            cancel_handle,
482        }
483    }
484}
485
486#[async_trait::async_trait]
487impl<M: SstMerger> Compactor for DefaultCompactor<M>
488where
489    M: Clone,
490{
491    async fn merge_ssts(
492        &self,
493        compaction_region: &CompactionRegion,
494        mut picker_output: PickerOutput,
495    ) -> Result<MergeOutput> {
496        let internal_parallelism = compaction_region.max_parallelism.max(1);
497        let compaction_time_window = picker_output.time_window_size;
498        let region_id = compaction_region.region_id;
499
500        // Build tasks along with their input file metas so we can track which
501        // inputs correspond to each task.
502        let mut tasks: Vec<(Vec<FileMeta>, _)> = Vec::with_capacity(picker_output.outputs.len());
503
504        for output in picker_output.outputs.drain(..) {
505            let inputs_to_remove: Vec<_> =
506                output.inputs.iter().map(|f| f.meta_ref().clone()).collect();
507            let write_opts = WriteOptions {
508                write_buffer_size: compaction_region.engine_config.sst_write_buffer_size,
509                max_file_size: picker_output.max_file_size,
510                ..Default::default()
511            };
512            let merger = self.merger.clone();
513            let compaction_region = compaction_region.clone();
514            let fut = async move {
515                merger
516                    .merge_single_output(compaction_region, output, write_opts)
517                    .await
518            };
519            tasks.push((inputs_to_remove, fut));
520        }
521
522        let hook: Option<RegionHookRef> = compaction_region.plugins.get();
523        let mut output_files = Vec::with_capacity(tasks.len());
524        let mut all_sst_infos: Vec<SstInfo> = Vec::new();
525        let mut compacted_inputs = Vec::with_capacity(
526            tasks.iter().map(|(inputs, _)| inputs.len()).sum::<usize>()
527                + picker_output.expired_ssts.len(),
528        );
529
530        while !tasks.is_empty() {
531            let mut chunk: Vec<(Vec<FileMeta>, _)> = Vec::with_capacity(internal_parallelism);
532            for _ in 0..internal_parallelism {
533                if let Some(task) = tasks.pop() {
534                    chunk.push(task);
535                }
536            }
537            let mut spawned: Vec<_> = chunk
538                .into_iter()
539                .map(|(inputs, fut)| {
540                    let handle = common_runtime::spawn_compact(fut);
541                    (inputs, handle)
542                })
543                .collect();
544
545            while let Some((inputs, handle)) = spawned.pop() {
546                let abort_handle = handle.abort_handle();
547                match CancellableFuture::new(handle, self.cancel_handle.clone()).await {
548                    Ok(Ok(Ok((files, infos)))) => {
549                        output_files.extend(files);
550                        if hook.is_some() {
551                            all_sst_infos.extend(infos);
552                        }
553                        compacted_inputs.extend(inputs);
554                    }
555                    Ok(Ok(Err(e))) => {
556                        warn!(
557                            e; "Failed to merge compaction output for region: {}, inputs: [{}]",
558                            region_id,
559                            inputs.iter().map(|f| f.file_id.to_string()).join(",")
560                        );
561                    }
562                    Ok(Err(e)) => {
563                        warn!(
564                            "Region {} compaction task join error for inputs: [{}], skipping: {}",
565                            region_id,
566                            inputs.iter().map(|f| f.file_id.to_string()).join(","),
567                            e
568                        );
569                        // If the cancel handle is cancelled,
570                        // cancel the remaining tasks before returns the error.
571                        if self.cancel_handle.is_cancelled() {
572                            abort_handle.abort();
573                            for (_, handle) in spawned {
574                                handle.abort();
575                            }
576                        }
577                        return Err(e).context(error::JoinSnafu);
578                    }
579                    Err(_) => {
580                        debug!(
581                            "Compaction merge cancelled for region: {}, aborting remaining {} spawned tasks",
582                            region_id,
583                            spawned.len(),
584                        );
585                        abort_handle.abort();
586                        for (_, handle) in spawned {
587                            handle.abort();
588                        }
589                        break;
590                    }
591                }
592            }
593
594            if self.cancel_handle.is_cancelled() {
595                info!("Compaction merge cancelled for region: {}", region_id);
596                break;
597            }
598        }
599
600        // Include expired SSTs in removals — these don't depend on merge success.
601        compacted_inputs.extend(
602            picker_output
603                .expired_ssts
604                .iter()
605                .map(|f| f.meta_ref().clone()),
606        );
607
608        Ok(MergeOutput {
609            files_to_add: output_files,
610            files_to_remove: compacted_inputs,
611            compaction_time_window: Some(compaction_time_window),
612            sst_infos: all_sst_infos,
613        })
614    }
615
616    async fn update_manifest(
617        &self,
618        compaction_region: &CompactionRegion,
619        merge_output: MergeOutput,
620    ) -> Result<(RegionEdit, ManifestVersion)> {
621        // Write region edit to manifest.
622        let edit = RegionEdit {
623            files_to_add: merge_output.files_to_add,
624            files_to_remove: merge_output.files_to_remove,
625            // Use current timestamp as the edit timestamp.
626            timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
627            compaction_time_window: merge_output
628                .compaction_time_window
629                .map(|seconds| Duration::from_secs(seconds as u64)),
630            flushed_entry_id: None,
631            flushed_sequence: None,
632            committed_sequence: None,
633        };
634
635        let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
636        // TODO: We might leak files if we fail to update manifest. We can add a cleanup task to remove them later.
637        let manifest_version = compaction_region
638            .manifest_ctx
639            .update_manifest_for_compaction(action_list)
640            .await?;
641
642        Ok((edit, manifest_version))
643    }
644}
645
646#[cfg(test)]
647mod tests {
648    use std::sync::atomic::{AtomicUsize, Ordering};
649    use std::sync::{Arc, Mutex};
650    use std::time::Duration;
651
652    use store_api::storage::{FileId, RegionId};
653    use tokio::time::sleep;
654
655    use super::{DefaultCompactor, *};
656    use crate::cache::CacheManager;
657    use crate::compaction::picker::PickerOutput;
658    use crate::error::Result;
659    use crate::sst::file::FileHandle;
660    use crate::sst::file_purger::NoopFilePurger;
661    use crate::sst::version::SstVersion;
662    use crate::test_util::memtable_util::metadata_for_test;
663    use crate::test_util::scheduler_util::SchedulerEnv;
664
665    fn dummy_file_meta() -> FileMeta {
666        FileMeta {
667            region_id: RegionId::new(1, 1),
668            file_id: FileId::random(),
669            file_size: 100,
670            ..Default::default()
671        }
672    }
673
674    fn new_file_handle(meta: FileMeta) -> FileHandle {
675        FileHandle::new(meta, Arc::new(NoopFilePurger))
676    }
677
678    /// Build a minimal [`CompactionRegion`] suitable for tests where the
679    /// [`SstMerger`] is mocked and never touches the access layer.
680    async fn new_test_compaction_region() -> CompactionRegion {
681        let env = SchedulerEnv::new().await;
682        let metadata = metadata_for_test();
683        let manifest_ctx = env.mock_manifest_context(metadata.clone()).await;
684        CompactionRegion {
685            region_id: RegionId::new(1, 1),
686            region_options: RegionOptions::default(),
687            engine_config: Arc::new(MitoConfig::default()),
688            region_metadata: metadata.clone(),
689            cache_manager: Arc::new(CacheManager::default()),
690            access_layer: env.access_layer.clone(),
691            manifest_ctx,
692            current_version: CompactionVersion {
693                metadata,
694                options: RegionOptions::default(),
695                ssts: Arc::new(SstVersion::new()),
696                compaction_time_window: None,
697            },
698            file_purger: None,
699            ttl: None,
700            max_parallelism: 1,
701            plugins: Plugins::new(),
702        }
703    }
704
705    /// An [`SstMerger`] that returns pre-configured results per call index.
706    ///
707    /// Call 0 gets `results[0]`, call 1 gets `results[1]`, etc.
708    #[derive(Clone)]
709    struct MockMerger {
710        results: Arc<Mutex<Vec<Result<Vec<FileMeta>>>>>,
711        call_idx: Arc<AtomicUsize>,
712    }
713
714    impl MockMerger {
715        fn new(results: Vec<Result<Vec<FileMeta>>>) -> Self {
716            Self {
717                results: Arc::new(Mutex::new(results)),
718                call_idx: Arc::new(AtomicUsize::new(0)),
719            }
720        }
721    }
722
723    #[async_trait::async_trait]
724    impl SstMerger for MockMerger {
725        async fn merge_single_output(
726            &self,
727            _compaction_region: CompactionRegion,
728            _output: CompactionOutput,
729            _write_opts: WriteOptions,
730        ) -> Result<(Vec<FileMeta>, Vec<SstInfo>)> {
731            let idx = self.call_idx.fetch_add(1, Ordering::SeqCst);
732            match self.results.lock().unwrap().get(idx) {
733                Some(Ok(files)) => Ok((files.clone(), Vec::new())),
734                Some(Err(_)) => error::InvalidMetaSnafu {
735                    reason: format!("simulated failure at index {idx}"),
736                }
737                .fail(),
738                None => panic!("MockMerger: no result configured for call index {idx}"),
739            }
740        }
741    }
742
743    #[tokio::test]
744    async fn test_partial_merge_failure_collects_only_successful_outputs() {
745        common_telemetry::init_default_ut_logging();
746
747        let compaction_region = new_test_compaction_region().await;
748
749        // Prepare 3 compaction outputs: output 0 and 2 succeed, output 1 fails.
750        let input_meta_0 = dummy_file_meta();
751        let input_meta_1 = dummy_file_meta();
752        let input_meta_2 = dummy_file_meta();
753
754        let output_meta_0 = vec![dummy_file_meta()];
755        let output_meta_2 = vec![dummy_file_meta(), dummy_file_meta()];
756
757        let merger = MockMerger::new(vec![
758            Ok(output_meta_0.clone()),
759            Err(error::InvalidMetaSnafu {
760                reason: "boom".to_string(),
761            }
762            .build()),
763            Ok(output_meta_2.clone()),
764        ]);
765        let compactor = DefaultCompactor::with_merger(merger);
766
767        let picker_output = PickerOutput {
768            outputs: vec![
769                CompactionOutput {
770                    output_level: 1,
771                    inputs: vec![new_file_handle(input_meta_0.clone())],
772                    filter_deleted: false,
773                    output_time_range: None,
774                },
775                CompactionOutput {
776                    output_level: 1,
777                    inputs: vec![new_file_handle(input_meta_1.clone())],
778                    filter_deleted: false,
779                    output_time_range: None,
780                },
781                CompactionOutput {
782                    output_level: 1,
783                    inputs: vec![new_file_handle(input_meta_2.clone())],
784                    filter_deleted: false,
785                    output_time_range: None,
786                },
787            ],
788            expired_ssts: vec![],
789            time_window_size: 3600,
790            max_file_size: None,
791        };
792
793        let merge_output = compactor
794            .merge_ssts(&compaction_region, picker_output)
795            .await
796            .unwrap();
797
798        // Outputs 0 and 2 succeeded (1 + 2 = 3 files added).
799        assert_eq!(merge_output.files_to_add.len(), 3);
800        // Only inputs from successful merges should be removed.
801        assert_eq!(merge_output.files_to_remove.len(), 2);
802
803        let removed_ids: Vec<_> = merge_output
804            .files_to_remove
805            .iter()
806            .map(|f| f.file_id)
807            .collect();
808        assert!(removed_ids.contains(&input_meta_0.file_id));
809        assert!(removed_ids.contains(&input_meta_2.file_id));
810        // The failed output's input must NOT be removed.
811        assert!(!removed_ids.contains(&input_meta_1.file_id));
812    }
813
814    #[tokio::test]
815    async fn test_all_outputs_succeed() {
816        common_telemetry::init_default_ut_logging();
817
818        let compaction_region = new_test_compaction_region().await;
819        let input_meta = dummy_file_meta();
820        let output_meta = vec![dummy_file_meta()];
821
822        let merger = MockMerger::new(vec![Ok(output_meta.clone())]);
823        let compactor = DefaultCompactor::with_merger(merger);
824
825        let picker_output = PickerOutput {
826            outputs: vec![CompactionOutput {
827                output_level: 1,
828                inputs: vec![new_file_handle(input_meta.clone())],
829                filter_deleted: false,
830                output_time_range: None,
831            }],
832            expired_ssts: vec![],
833            time_window_size: 3600,
834            max_file_size: None,
835        };
836
837        let merge_output = compactor
838            .merge_ssts(&compaction_region, picker_output)
839            .await
840            .unwrap();
841
842        assert_eq!(merge_output.files_to_add.len(), 1);
843        assert_eq!(merge_output.files_to_add[0].file_id, output_meta[0].file_id);
844        assert_eq!(merge_output.files_to_remove.len(), 1);
845        assert_eq!(merge_output.files_to_remove[0].file_id, input_meta.file_id);
846    }
847
848    #[tokio::test]
849    async fn test_expired_ssts_always_removed() {
850        common_telemetry::init_default_ut_logging();
851
852        let compaction_region = new_test_compaction_region().await;
853        let input_meta = dummy_file_meta();
854        let expired_meta = dummy_file_meta();
855
856        // The single merge output fails, but expired SSTs should still be removed.
857        let merger = MockMerger::new(vec![Err(error::InvalidMetaSnafu {
858            reason: "fail".to_string(),
859        }
860        .build())]);
861        let compactor = DefaultCompactor::with_merger(merger);
862
863        let picker_output = PickerOutput {
864            outputs: vec![CompactionOutput {
865                output_level: 1,
866                inputs: vec![new_file_handle(input_meta.clone())],
867                filter_deleted: false,
868                output_time_range: None,
869            }],
870            expired_ssts: vec![new_file_handle(expired_meta.clone())],
871            time_window_size: 3600,
872            max_file_size: None,
873        };
874
875        let merge_output = compactor
876            .merge_ssts(&compaction_region, picker_output)
877            .await
878            .unwrap();
879
880        // No files added (merge failed).
881        assert!(merge_output.files_to_add.is_empty());
882        // Only the expired SST should be in files_to_remove (not the failed merge's input).
883        assert_eq!(merge_output.files_to_remove.len(), 1);
884        assert_eq!(
885            merge_output.files_to_remove[0].file_id,
886            expired_meta.file_id
887        );
888    }
889
890    #[derive(Clone)]
891    struct BlockingMerger {
892        call_idx: Arc<AtomicUsize>,
893    }
894
895    #[async_trait::async_trait]
896    impl SstMerger for BlockingMerger {
897        async fn merge_single_output(
898            &self,
899            _compaction_region: CompactionRegion,
900            _output: CompactionOutput,
901            _write_opts: WriteOptions,
902        ) -> Result<(Vec<FileMeta>, Vec<SstInfo>)> {
903            self.call_idx.fetch_add(1, Ordering::SeqCst);
904            std::future::pending().await
905        }
906    }
907
908    #[tokio::test(flavor = "multi_thread")]
909    async fn test_merge_ssts_cancels_spawned_tasks() {
910        common_telemetry::init_default_ut_logging();
911
912        let mut compaction_region = new_test_compaction_region().await;
913        compaction_region.max_parallelism = 2;
914
915        let cancel_handle = Arc::new(CancellationHandle::default());
916        let call_idx = Arc::new(AtomicUsize::new(0));
917        let compactor = DefaultCompactor {
918            merger: BlockingMerger {
919                call_idx: call_idx.clone(),
920            },
921            cancel_handle: cancel_handle.clone(),
922        };
923
924        let picker_output = PickerOutput {
925            outputs: vec![
926                CompactionOutput {
927                    output_level: 1,
928                    inputs: vec![new_file_handle(dummy_file_meta())],
929                    filter_deleted: false,
930                    output_time_range: None,
931                },
932                CompactionOutput {
933                    output_level: 1,
934                    inputs: vec![new_file_handle(dummy_file_meta())],
935                    filter_deleted: false,
936                    output_time_range: None,
937                },
938                CompactionOutput {
939                    output_level: 1,
940                    inputs: vec![new_file_handle(dummy_file_meta())],
941                    filter_deleted: false,
942                    output_time_range: None,
943                },
944            ],
945            expired_ssts: vec![],
946            time_window_size: 3600,
947            max_file_size: None,
948        };
949
950        let task = tokio::spawn(async move {
951            compactor
952                .merge_ssts(&compaction_region, picker_output)
953                .await
954        });
955
956        sleep(Duration::from_millis(100)).await;
957        cancel_handle.cancel();
958
959        let merge_output = task
960            .await
961            .expect("merge_ssts should stop after cancellation")
962            .unwrap();
963
964        let started = call_idx.load(Ordering::SeqCst);
965
966        assert!(merge_output.files_to_add.is_empty());
967        assert!(merge_output.files_to_remove.is_empty());
968        assert_eq!(started, 2);
969    }
970}