mito2/compaction/
compactor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::num::NonZero;
16use std::sync::Arc;
17use std::time::Duration;
18
19use api::v1::region::compact_request;
20use common_meta::key::SchemaMetadataManagerRef;
21use common_telemetry::{info, warn};
22use common_time::TimeToLive;
23use either::Either;
24use itertools::Itertools;
25use object_store::manager::ObjectStoreManagerRef;
26use partition::expr::PartitionExpr;
27use serde::{Deserialize, Serialize};
28use snafu::{OptionExt, ResultExt};
29use store_api::metadata::RegionMetadataRef;
30use store_api::region_request::PathType;
31use store_api::storage::RegionId;
32
33use crate::access_layer::{
34    AccessLayer, AccessLayerRef, Metrics, OperationType, SstWriteRequest, WriteType,
35};
36use crate::cache::{CacheManager, CacheManagerRef};
37use crate::compaction::picker::{PickerOutput, new_picker};
38use crate::compaction::{CompactionOutput, CompactionSstReaderBuilder, find_ttl};
39use crate::config::MitoConfig;
40use crate::error::{
41    EmptyRegionDirSnafu, InvalidPartitionExprSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Result,
42};
43use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
44use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, RemoveFileOptions};
45use crate::manifest::storage::manifest_compress_type;
46use crate::metrics;
47use crate::read::{FlatSource, Source};
48use crate::region::opener::new_manifest_dir;
49use crate::region::options::RegionOptions;
50use crate::region::version::VersionRef;
51use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState};
52use crate::schedule::scheduler::LocalScheduler;
53use crate::sst::FormatType;
54use crate::sst::file::FileMeta;
55use crate::sst::file_purger::LocalFilePurger;
56use crate::sst::index::intermediate::IntermediateManager;
57use crate::sst::index::puffin_manager::PuffinManagerFactory;
58use crate::sst::location::region_dir_from_table_dir;
59use crate::sst::parquet::WriteOptions;
60use crate::sst::version::{SstVersion, SstVersionRef};
61
62/// Region version for compaction that does not hold memtables.
63#[derive(Clone)]
64pub struct CompactionVersion {
65    /// Metadata of the region.
66    ///
67    /// Altering metadata isn't frequent, storing metadata in Arc to allow sharing
68    /// metadata and reuse metadata when creating a new `Version`.
69    pub(crate) metadata: RegionMetadataRef,
70    /// Options of the region.
71    pub(crate) options: RegionOptions,
72    /// SSTs of the region.
73    pub(crate) ssts: SstVersionRef,
74    /// Inferred compaction time window.
75    pub(crate) compaction_time_window: Option<Duration>,
76}
77
78impl From<VersionRef> for CompactionVersion {
79    fn from(value: VersionRef) -> Self {
80        Self {
81            metadata: value.metadata.clone(),
82            options: value.options.clone(),
83            ssts: value.ssts.clone(),
84            compaction_time_window: value.compaction_time_window,
85        }
86    }
87}
88
89/// CompactionRegion represents a region that needs to be compacted.
90/// It's the subset of MitoRegion.
91#[derive(Clone)]
92pub struct CompactionRegion {
93    pub region_id: RegionId,
94    pub region_options: RegionOptions,
95
96    pub(crate) engine_config: Arc<MitoConfig>,
97    pub(crate) region_metadata: RegionMetadataRef,
98    pub(crate) cache_manager: CacheManagerRef,
99    /// Access layer to get the table path and path type.
100    pub access_layer: AccessLayerRef,
101    pub(crate) manifest_ctx: Arc<ManifestContext>,
102    pub(crate) current_version: CompactionVersion,
103    pub(crate) file_purger: Option<Arc<LocalFilePurger>>,
104    pub(crate) ttl: Option<TimeToLive>,
105
106    /// Controls the parallelism of this compaction task. Default is 1.
107    ///
108    /// The parallel is inside this compaction task, not across different compaction tasks.
109    /// It can be different windows of the same compaction task or something like this.
110    pub max_parallelism: usize,
111}
112
113/// OpenCompactionRegionRequest represents the request to open a compaction region.
114#[derive(Debug, Clone)]
115pub struct OpenCompactionRegionRequest {
116    pub region_id: RegionId,
117    pub table_dir: String,
118    pub path_type: PathType,
119    pub region_options: RegionOptions,
120    pub max_parallelism: usize,
121}
122
123/// Open a compaction region from a compaction request.
124/// It's simple version of RegionOpener::open().
125pub async fn open_compaction_region(
126    req: &OpenCompactionRegionRequest,
127    mito_config: &MitoConfig,
128    object_store_manager: ObjectStoreManagerRef,
129    ttl_provider: Either<TimeToLive, SchemaMetadataManagerRef>,
130) -> Result<CompactionRegion> {
131    let object_store = {
132        let name = &req.region_options.storage;
133        if let Some(name) = name {
134            object_store_manager
135                .find(name)
136                .with_context(|| ObjectStoreNotFoundSnafu {
137                    object_store: name.clone(),
138                })?
139        } else {
140            object_store_manager.default_object_store()
141        }
142    };
143
144    let access_layer = {
145        let puffin_manager_factory = PuffinManagerFactory::new(
146            &mito_config.index.aux_path,
147            mito_config.index.staging_size.as_bytes(),
148            Some(mito_config.index.write_buffer_size.as_bytes() as _),
149            mito_config.index.staging_ttl,
150        )
151        .await?;
152        let intermediate_manager =
153            IntermediateManager::init_fs(mito_config.index.aux_path.clone()).await?;
154
155        Arc::new(AccessLayer::new(
156            &req.table_dir,
157            req.path_type,
158            object_store.clone(),
159            puffin_manager_factory,
160            intermediate_manager,
161        ))
162    };
163
164    let manifest_manager = {
165        let region_manifest_options = RegionManifestOptions {
166            manifest_dir: new_manifest_dir(&region_dir_from_table_dir(
167                &req.table_dir,
168                req.region_id,
169                req.path_type,
170            )),
171            object_store: object_store.clone(),
172            compress_type: manifest_compress_type(mito_config.compress_manifest),
173            checkpoint_distance: mito_config.manifest_checkpoint_distance,
174            remove_file_options: RemoveFileOptions {
175                keep_count: mito_config.experimental_manifest_keep_removed_file_count,
176                keep_ttl: mito_config.experimental_manifest_keep_removed_file_ttl,
177            },
178        };
179
180        RegionManifestManager::open(
181            region_manifest_options,
182            Default::default(),
183            Default::default(),
184        )
185        .await?
186        .context(EmptyRegionDirSnafu {
187            region_id: req.region_id,
188            region_dir: &region_dir_from_table_dir(&req.table_dir, req.region_id, req.path_type),
189        })?
190    };
191
192    let manifest = manifest_manager.manifest();
193    let region_metadata = manifest.metadata.clone();
194    let manifest_ctx = Arc::new(ManifestContext::new(
195        manifest_manager,
196        RegionRoleState::Leader(RegionLeaderState::Writable),
197    ));
198
199    let file_purger = {
200        let purge_scheduler = Arc::new(LocalScheduler::new(mito_config.max_background_purges));
201        Arc::new(LocalFilePurger::new(
202            purge_scheduler.clone(),
203            access_layer.clone(),
204            None,
205        ))
206    };
207
208    let current_version = {
209        let mut ssts = SstVersion::new();
210        ssts.add_files(file_purger.clone(), manifest.files.values().cloned());
211        CompactionVersion {
212            metadata: region_metadata.clone(),
213            options: req.region_options.clone(),
214            ssts: Arc::new(ssts),
215            compaction_time_window: manifest.compaction_time_window,
216        }
217    };
218
219    let ttl = match ttl_provider {
220        // Use the specified ttl.
221        Either::Left(ttl) => ttl,
222        // Get the ttl from the schema metadata manager.
223        Either::Right(schema_metadata_manager) => find_ttl(
224            req.region_id.table_id(),
225            current_version.options.ttl,
226            &schema_metadata_manager,
227        )
228        .await
229        .unwrap_or_else(|e| {
230            warn!(e; "Failed to get ttl for region: {}", region_metadata.region_id);
231            TimeToLive::default()
232        }),
233    };
234
235    Ok(CompactionRegion {
236        region_id: req.region_id,
237        region_options: req.region_options.clone(),
238        engine_config: Arc::new(mito_config.clone()),
239        region_metadata: region_metadata.clone(),
240        cache_manager: Arc::new(CacheManager::default()),
241        access_layer,
242        manifest_ctx,
243        current_version,
244        file_purger: Some(file_purger),
245        ttl: Some(ttl),
246        max_parallelism: req.max_parallelism,
247    })
248}
249
250impl CompactionRegion {
251    /// Get the file purger of the compaction region.
252    pub fn file_purger(&self) -> Option<Arc<LocalFilePurger>> {
253        self.file_purger.clone()
254    }
255
256    /// Stop the file purger scheduler of the compaction region.
257    pub async fn stop_purger_scheduler(&self) -> Result<()> {
258        if let Some(file_purger) = &self.file_purger {
259            file_purger.stop_scheduler().await
260        } else {
261            Ok(())
262        }
263    }
264}
265
266/// `[MergeOutput]` represents the output of merging SST files.
267#[derive(Default, Clone, Debug, Serialize, Deserialize)]
268pub struct MergeOutput {
269    pub files_to_add: Vec<FileMeta>,
270    pub files_to_remove: Vec<FileMeta>,
271    pub compaction_time_window: Option<i64>,
272}
273
274impl MergeOutput {
275    pub fn is_empty(&self) -> bool {
276        self.files_to_add.is_empty() && self.files_to_remove.is_empty()
277    }
278
279    pub fn input_file_size(&self) -> u64 {
280        self.files_to_remove.iter().map(|f| f.file_size).sum()
281    }
282
283    pub fn output_file_size(&self) -> u64 {
284        self.files_to_add.iter().map(|f| f.file_size).sum()
285    }
286}
287
288/// Compactor is the trait that defines the compaction logic.
289#[async_trait::async_trait]
290pub trait Compactor: Send + Sync + 'static {
291    /// Merge SST files for a region.
292    async fn merge_ssts(
293        &self,
294        compaction_region: &CompactionRegion,
295        picker_output: PickerOutput,
296    ) -> Result<MergeOutput>;
297
298    /// Update the manifest after merging SST files.
299    async fn update_manifest(
300        &self,
301        compaction_region: &CompactionRegion,
302        merge_output: MergeOutput,
303    ) -> Result<RegionEdit>;
304
305    /// Execute compaction for a region.
306    async fn compact(
307        &self,
308        compaction_region: &CompactionRegion,
309        compact_request_options: compact_request::Options,
310    ) -> Result<()>;
311}
312
313/// DefaultCompactor is the default implementation of Compactor.
314pub struct DefaultCompactor;
315
316impl DefaultCompactor {
317    /// Merge a single compaction output into SST files.
318    async fn merge_single_output(
319        compaction_region: CompactionRegion,
320        output: CompactionOutput,
321        write_opts: WriteOptions,
322    ) -> Result<Vec<FileMeta>> {
323        let region_id = compaction_region.region_id;
324        let storage = compaction_region.region_options.storage.clone();
325        let index_options = compaction_region
326            .current_version
327            .options
328            .index_options
329            .clone();
330        let append_mode = compaction_region.current_version.options.append_mode;
331        let merge_mode = compaction_region.current_version.options.merge_mode();
332        let flat_format = compaction_region
333            .region_options
334            .sst_format
335            .map(|format| format == FormatType::Flat)
336            .unwrap_or(
337                compaction_region
338                    .engine_config
339                    .default_experimental_flat_format,
340            );
341
342        let index_config = compaction_region.engine_config.index.clone();
343        let inverted_index_config = compaction_region.engine_config.inverted_index.clone();
344        let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone();
345        let bloom_filter_index_config = compaction_region.engine_config.bloom_filter_index.clone();
346
347        let input_file_names = output
348            .inputs
349            .iter()
350            .map(|f| f.file_id().to_string())
351            .join(",");
352        let max_sequence = output
353            .inputs
354            .iter()
355            .map(|f| f.meta_ref().sequence)
356            .max()
357            .flatten();
358        let builder = CompactionSstReaderBuilder {
359            metadata: compaction_region.region_metadata.clone(),
360            sst_layer: compaction_region.access_layer.clone(),
361            cache: compaction_region.cache_manager.clone(),
362            inputs: &output.inputs,
363            append_mode,
364            filter_deleted: output.filter_deleted,
365            time_range: output.output_time_range,
366            merge_mode,
367        };
368        let source = if flat_format {
369            let reader = builder.build_flat_sst_reader().await?;
370            Either::Right(FlatSource::Stream(reader))
371        } else {
372            let reader = builder.build_sst_reader().await?;
373            Either::Left(Source::Reader(reader))
374        };
375        let mut metrics = Metrics::new(WriteType::Compaction);
376        let region_metadata = compaction_region.region_metadata.clone();
377        let sst_infos = compaction_region
378            .access_layer
379            .write_sst(
380                SstWriteRequest {
381                    op_type: OperationType::Compact,
382                    metadata: region_metadata.clone(),
383                    source,
384                    cache_manager: compaction_region.cache_manager.clone(),
385                    storage,
386                    max_sequence: max_sequence.map(NonZero::get),
387                    index_options,
388                    index_config,
389                    inverted_index_config,
390                    fulltext_index_config,
391                    bloom_filter_index_config,
392                },
393                &write_opts,
394                &mut metrics,
395            )
396            .await?;
397        // Convert partition expression once outside the map
398        let partition_expr = match &region_metadata.partition_expr {
399            None => None,
400            Some(json_str) if json_str.is_empty() => None,
401            Some(json_str) => PartitionExpr::from_json_str(json_str).with_context(|_| {
402                InvalidPartitionExprSnafu {
403                    expr: json_str.clone(),
404                }
405            })?,
406        };
407
408        let output_files = sst_infos
409            .into_iter()
410            .map(|sst_info| FileMeta {
411                region_id,
412                file_id: sst_info.file_id,
413                time_range: sst_info.time_range,
414                level: output.output_level,
415                file_size: sst_info.file_size,
416                available_indexes: sst_info.index_metadata.build_available_indexes(),
417                index_file_size: sst_info.index_metadata.file_size,
418                index_file_id: None,
419                num_rows: sst_info.num_rows as u64,
420                num_row_groups: sst_info.num_row_groups,
421                sequence: max_sequence,
422                partition_expr: partition_expr.clone(),
423                num_series: sst_info.num_series,
424            })
425            .collect::<Vec<_>>();
426        let output_file_names = output_files.iter().map(|f| f.file_id.to_string()).join(",");
427        info!(
428            "Region {} compaction inputs: [{}], outputs: [{}], flat_format: {}, metrics: {:?}",
429            region_id, input_file_names, output_file_names, flat_format, metrics
430        );
431        metrics.observe();
432        Ok(output_files)
433    }
434}
435
436#[async_trait::async_trait]
437impl Compactor for DefaultCompactor {
438    async fn merge_ssts(
439        &self,
440        compaction_region: &CompactionRegion,
441        mut picker_output: PickerOutput,
442    ) -> Result<MergeOutput> {
443        let mut futs = Vec::with_capacity(picker_output.outputs.len());
444        let mut compacted_inputs =
445            Vec::with_capacity(picker_output.outputs.iter().map(|o| o.inputs.len()).sum());
446        let internal_parallelism = compaction_region.max_parallelism.max(1);
447        let compaction_time_window = picker_output.time_window_size;
448
449        for output in picker_output.outputs.drain(..) {
450            let inputs_to_remove: Vec<_> =
451                output.inputs.iter().map(|f| f.meta_ref().clone()).collect();
452            compacted_inputs.extend(inputs_to_remove.iter().cloned());
453            let write_opts = WriteOptions {
454                write_buffer_size: compaction_region.engine_config.sst_write_buffer_size,
455                max_file_size: picker_output.max_file_size,
456                ..Default::default()
457            };
458            futs.push(Self::merge_single_output(
459                compaction_region.clone(),
460                output,
461                write_opts,
462            ));
463        }
464        let mut output_files = Vec::with_capacity(futs.len());
465        while !futs.is_empty() {
466            let mut task_chunk = Vec::with_capacity(internal_parallelism);
467            for _ in 0..internal_parallelism {
468                if let Some(task) = futs.pop() {
469                    task_chunk.push(common_runtime::spawn_compact(task));
470                }
471            }
472            let metas = futures::future::try_join_all(task_chunk)
473                .await
474                .context(JoinSnafu)?
475                .into_iter()
476                .collect::<Result<Vec<Vec<_>>>>()?;
477            output_files.extend(metas.into_iter().flatten());
478        }
479
480        // In case of remote compaction, we still allow the region edit after merge to
481        // clean expired ssts.
482        let mut inputs: Vec<_> = compacted_inputs.into_iter().collect();
483        inputs.extend(
484            picker_output
485                .expired_ssts
486                .iter()
487                .map(|f| f.meta_ref().clone()),
488        );
489
490        Ok(MergeOutput {
491            files_to_add: output_files,
492            files_to_remove: inputs,
493            compaction_time_window: Some(compaction_time_window),
494        })
495    }
496
497    async fn update_manifest(
498        &self,
499        compaction_region: &CompactionRegion,
500        merge_output: MergeOutput,
501    ) -> Result<RegionEdit> {
502        // Write region edit to manifest.
503        let edit = RegionEdit {
504            files_to_add: merge_output.files_to_add,
505            files_to_remove: merge_output.files_to_remove,
506            // Use current timestamp as the edit timestamp.
507            timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
508            compaction_time_window: merge_output
509                .compaction_time_window
510                .map(|seconds| Duration::from_secs(seconds as u64)),
511            flushed_entry_id: None,
512            flushed_sequence: None,
513            committed_sequence: None,
514        };
515
516        let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
517        // TODO: We might leak files if we fail to update manifest. We can add a cleanup task to remove them later.
518        compaction_region
519            .manifest_ctx
520            .update_manifest(RegionLeaderState::Writable, action_list)
521            .await?;
522
523        Ok(edit)
524    }
525
526    // The default implementation of compact combines the merge_ssts and update_manifest functions.
527    // Note: It's local compaction and only used for testing purpose.
528    async fn compact(
529        &self,
530        compaction_region: &CompactionRegion,
531        compact_request_options: compact_request::Options,
532    ) -> Result<()> {
533        let picker_output = {
534            let picker_output = new_picker(
535                &compact_request_options,
536                &compaction_region.region_options.compaction,
537                compaction_region.region_options.append_mode,
538                None,
539            )
540            .pick(compaction_region);
541
542            if let Some(picker_output) = picker_output {
543                picker_output
544            } else {
545                info!(
546                    "No files to compact for region_id: {}",
547                    compaction_region.region_id
548                );
549                return Ok(());
550            }
551        };
552
553        let merge_output = self.merge_ssts(compaction_region, picker_output).await?;
554        if merge_output.is_empty() {
555            info!(
556                "No files to compact for region_id: {}",
557                compaction_region.region_id
558            );
559            return Ok(());
560        }
561
562        metrics::COMPACTION_INPUT_BYTES.inc_by(merge_output.input_file_size() as f64);
563        metrics::COMPACTION_OUTPUT_BYTES.inc_by(merge_output.output_file_size() as f64);
564        self.update_manifest(compaction_region, merge_output)
565            .await?;
566
567        Ok(())
568    }
569}