mito2/compaction/
compactor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::num::NonZero;
16use std::sync::Arc;
17use std::time::Duration;
18
19use api::v1::region::compact_request;
20use common_meta::key::SchemaMetadataManagerRef;
21use common_telemetry::{info, warn};
22use common_time::TimeToLive;
23use either::Either;
24use itertools::Itertools;
25use object_store::manager::ObjectStoreManagerRef;
26use partition::expr::PartitionExpr;
27use serde::{Deserialize, Serialize};
28use snafu::{OptionExt, ResultExt};
29use store_api::metadata::RegionMetadataRef;
30use store_api::region_request::PathType;
31use store_api::storage::RegionId;
32
33use crate::access_layer::{AccessLayer, AccessLayerRef, OperationType, SstWriteRequest, WriteType};
34use crate::cache::{CacheManager, CacheManagerRef};
35use crate::compaction::picker::{PickerOutput, new_picker};
36use crate::compaction::{CompactionSstReaderBuilder, find_ttl};
37use crate::config::MitoConfig;
38use crate::error::{
39    EmptyRegionDirSnafu, InvalidPartitionExprSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Result,
40};
41use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
42use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, RemoveFileOptions};
43use crate::manifest::storage::manifest_compress_type;
44use crate::metrics;
45use crate::read::{FlatSource, Source};
46use crate::region::opener::new_manifest_dir;
47use crate::region::options::RegionOptions;
48use crate::region::version::VersionRef;
49use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState};
50use crate::schedule::scheduler::LocalScheduler;
51use crate::sst::file::FileMeta;
52use crate::sst::file_purger::LocalFilePurger;
53use crate::sst::index::intermediate::IntermediateManager;
54use crate::sst::index::puffin_manager::PuffinManagerFactory;
55use crate::sst::location::region_dir_from_table_dir;
56use crate::sst::parquet::WriteOptions;
57use crate::sst::version::{SstVersion, SstVersionRef};
58
59/// Region version for compaction that does not hold memtables.
60#[derive(Clone)]
61pub struct CompactionVersion {
62    /// Metadata of the region.
63    ///
64    /// Altering metadata isn't frequent, storing metadata in Arc to allow sharing
65    /// metadata and reuse metadata when creating a new `Version`.
66    pub(crate) metadata: RegionMetadataRef,
67    /// Options of the region.
68    pub(crate) options: RegionOptions,
69    /// SSTs of the region.
70    pub(crate) ssts: SstVersionRef,
71    /// Inferred compaction time window.
72    pub(crate) compaction_time_window: Option<Duration>,
73}
74
75impl From<VersionRef> for CompactionVersion {
76    fn from(value: VersionRef) -> Self {
77        Self {
78            metadata: value.metadata.clone(),
79            options: value.options.clone(),
80            ssts: value.ssts.clone(),
81            compaction_time_window: value.compaction_time_window,
82        }
83    }
84}
85
86/// CompactionRegion represents a region that needs to be compacted.
87/// It's the subset of MitoRegion.
88#[derive(Clone)]
89pub struct CompactionRegion {
90    pub region_id: RegionId,
91    pub region_options: RegionOptions,
92
93    pub(crate) engine_config: Arc<MitoConfig>,
94    pub(crate) region_metadata: RegionMetadataRef,
95    pub(crate) cache_manager: CacheManagerRef,
96    /// Access layer to get the table path and path type.
97    pub access_layer: AccessLayerRef,
98    pub(crate) manifest_ctx: Arc<ManifestContext>,
99    pub(crate) current_version: CompactionVersion,
100    pub(crate) file_purger: Option<Arc<LocalFilePurger>>,
101    pub(crate) ttl: Option<TimeToLive>,
102
103    /// Controls the parallelism of this compaction task. Default is 1.
104    ///
105    /// The parallel is inside this compaction task, not across different compaction tasks.
106    /// It can be different windows of the same compaction task or something like this.
107    pub max_parallelism: usize,
108}
109
110/// OpenCompactionRegionRequest represents the request to open a compaction region.
111#[derive(Debug, Clone)]
112pub struct OpenCompactionRegionRequest {
113    pub region_id: RegionId,
114    pub table_dir: String,
115    pub path_type: PathType,
116    pub region_options: RegionOptions,
117    pub max_parallelism: usize,
118}
119
120/// Open a compaction region from a compaction request.
121/// It's simple version of RegionOpener::open().
122pub async fn open_compaction_region(
123    req: &OpenCompactionRegionRequest,
124    mito_config: &MitoConfig,
125    object_store_manager: ObjectStoreManagerRef,
126    ttl_provider: Either<TimeToLive, SchemaMetadataManagerRef>,
127) -> Result<CompactionRegion> {
128    let object_store = {
129        let name = &req.region_options.storage;
130        if let Some(name) = name {
131            object_store_manager
132                .find(name)
133                .context(ObjectStoreNotFoundSnafu {
134                    object_store: name.to_string(),
135                })?
136        } else {
137            object_store_manager.default_object_store()
138        }
139    };
140
141    let access_layer = {
142        let puffin_manager_factory = PuffinManagerFactory::new(
143            &mito_config.index.aux_path,
144            mito_config.index.staging_size.as_bytes(),
145            Some(mito_config.index.write_buffer_size.as_bytes() as _),
146            mito_config.index.staging_ttl,
147        )
148        .await?;
149        let intermediate_manager =
150            IntermediateManager::init_fs(mito_config.index.aux_path.clone()).await?;
151
152        Arc::new(AccessLayer::new(
153            &req.table_dir,
154            req.path_type,
155            object_store.clone(),
156            puffin_manager_factory,
157            intermediate_manager,
158        ))
159    };
160
161    let manifest_manager = {
162        let region_manifest_options = RegionManifestOptions {
163            manifest_dir: new_manifest_dir(&region_dir_from_table_dir(
164                &req.table_dir,
165                req.region_id,
166                req.path_type,
167            )),
168            object_store: object_store.clone(),
169            compress_type: manifest_compress_type(mito_config.compress_manifest),
170            checkpoint_distance: mito_config.manifest_checkpoint_distance,
171            remove_file_options: RemoveFileOptions {
172                keep_count: mito_config.experimental_manifest_keep_removed_file_count,
173                keep_ttl: mito_config.experimental_manifest_keep_removed_file_ttl,
174            },
175        };
176
177        RegionManifestManager::open(
178            region_manifest_options,
179            Default::default(),
180            Default::default(),
181        )
182        .await?
183        .context(EmptyRegionDirSnafu {
184            region_id: req.region_id,
185            region_dir: &region_dir_from_table_dir(&req.table_dir, req.region_id, req.path_type),
186        })?
187    };
188
189    let manifest = manifest_manager.manifest();
190    let region_metadata = manifest.metadata.clone();
191    let manifest_ctx = Arc::new(ManifestContext::new(
192        manifest_manager,
193        RegionRoleState::Leader(RegionLeaderState::Writable),
194    ));
195
196    let file_purger = {
197        let purge_scheduler = Arc::new(LocalScheduler::new(mito_config.max_background_purges));
198        Arc::new(LocalFilePurger::new(
199            purge_scheduler.clone(),
200            access_layer.clone(),
201            None,
202        ))
203    };
204
205    let current_version = {
206        let mut ssts = SstVersion::new();
207        ssts.add_files(file_purger.clone(), manifest.files.values().cloned());
208        CompactionVersion {
209            metadata: region_metadata.clone(),
210            options: req.region_options.clone(),
211            ssts: Arc::new(ssts),
212            compaction_time_window: manifest.compaction_time_window,
213        }
214    };
215
216    let ttl = match ttl_provider {
217        // Use the specified ttl.
218        Either::Left(ttl) => ttl,
219        // Get the ttl from the schema metadata manager.
220        Either::Right(schema_metadata_manager) => find_ttl(
221            req.region_id.table_id(),
222            current_version.options.ttl,
223            &schema_metadata_manager,
224        )
225        .await
226        .unwrap_or_else(|e| {
227            warn!(e; "Failed to get ttl for region: {}", region_metadata.region_id);
228            TimeToLive::default()
229        }),
230    };
231
232    Ok(CompactionRegion {
233        region_id: req.region_id,
234        region_options: req.region_options.clone(),
235        engine_config: Arc::new(mito_config.clone()),
236        region_metadata: region_metadata.clone(),
237        cache_manager: Arc::new(CacheManager::default()),
238        access_layer,
239        manifest_ctx,
240        current_version,
241        file_purger: Some(file_purger),
242        ttl: Some(ttl),
243        max_parallelism: req.max_parallelism,
244    })
245}
246
247impl CompactionRegion {
248    /// Get the file purger of the compaction region.
249    pub fn file_purger(&self) -> Option<Arc<LocalFilePurger>> {
250        self.file_purger.clone()
251    }
252
253    /// Stop the file purger scheduler of the compaction region.
254    pub async fn stop_purger_scheduler(&self) -> Result<()> {
255        if let Some(file_purger) = &self.file_purger {
256            file_purger.stop_scheduler().await
257        } else {
258            Ok(())
259        }
260    }
261}
262
263/// `[MergeOutput]` represents the output of merging SST files.
264#[derive(Default, Clone, Debug, Serialize, Deserialize)]
265pub struct MergeOutput {
266    pub files_to_add: Vec<FileMeta>,
267    pub files_to_remove: Vec<FileMeta>,
268    pub compaction_time_window: Option<i64>,
269}
270
271impl MergeOutput {
272    pub fn is_empty(&self) -> bool {
273        self.files_to_add.is_empty() && self.files_to_remove.is_empty()
274    }
275
276    pub fn input_file_size(&self) -> u64 {
277        self.files_to_remove.iter().map(|f| f.file_size).sum()
278    }
279
280    pub fn output_file_size(&self) -> u64 {
281        self.files_to_add.iter().map(|f| f.file_size).sum()
282    }
283}
284
285/// Compactor is the trait that defines the compaction logic.
286#[async_trait::async_trait]
287pub trait Compactor: Send + Sync + 'static {
288    /// Merge SST files for a region.
289    async fn merge_ssts(
290        &self,
291        compaction_region: &CompactionRegion,
292        picker_output: PickerOutput,
293    ) -> Result<MergeOutput>;
294
295    /// Update the manifest after merging SST files.
296    async fn update_manifest(
297        &self,
298        compaction_region: &CompactionRegion,
299        merge_output: MergeOutput,
300    ) -> Result<RegionEdit>;
301
302    /// Execute compaction for a region.
303    async fn compact(
304        &self,
305        compaction_region: &CompactionRegion,
306        compact_request_options: compact_request::Options,
307    ) -> Result<()>;
308}
309
310/// DefaultCompactor is the default implementation of Compactor.
311pub struct DefaultCompactor;
312
313#[async_trait::async_trait]
314impl Compactor for DefaultCompactor {
315    async fn merge_ssts(
316        &self,
317        compaction_region: &CompactionRegion,
318        mut picker_output: PickerOutput,
319    ) -> Result<MergeOutput> {
320        let mut futs = Vec::with_capacity(picker_output.outputs.len());
321        let mut compacted_inputs =
322            Vec::with_capacity(picker_output.outputs.iter().map(|o| o.inputs.len()).sum());
323        let internal_parallelism = compaction_region.max_parallelism.max(1);
324
325        for output in picker_output.outputs.drain(..) {
326            compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone()));
327            let write_opts = WriteOptions {
328                write_buffer_size: compaction_region.engine_config.sst_write_buffer_size,
329                max_file_size: picker_output.max_file_size,
330                ..Default::default()
331            };
332
333            let region_metadata = compaction_region.region_metadata.clone();
334            let sst_layer = compaction_region.access_layer.clone();
335            let region_id = compaction_region.region_id;
336            let cache_manager = compaction_region.cache_manager.clone();
337            let storage = compaction_region.region_options.storage.clone();
338            let index_options = compaction_region
339                .current_version
340                .options
341                .index_options
342                .clone();
343            let append_mode = compaction_region.current_version.options.append_mode;
344            let merge_mode = compaction_region.current_version.options.merge_mode();
345            let flat_format = compaction_region
346                .engine_config
347                .enable_experimental_flat_format;
348            let inverted_index_config = compaction_region.engine_config.inverted_index.clone();
349            let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone();
350            let bloom_filter_index_config =
351                compaction_region.engine_config.bloom_filter_index.clone();
352            let max_sequence = output
353                .inputs
354                .iter()
355                .map(|f| f.meta_ref().sequence)
356                .max()
357                .flatten();
358            let region_metadata_for_filemeta = region_metadata.clone();
359            futs.push(async move {
360                let input_file_names = output
361                    .inputs
362                    .iter()
363                    .map(|f| f.file_id().to_string())
364                    .join(",");
365                let builder = CompactionSstReaderBuilder {
366                    metadata: region_metadata.clone(),
367                    sst_layer: sst_layer.clone(),
368                    cache: cache_manager.clone(),
369                    inputs: &output.inputs,
370                    append_mode,
371                    filter_deleted: output.filter_deleted,
372                    time_range: output.output_time_range,
373                    merge_mode,
374                };
375                let source = if flat_format {
376                    let reader = builder.build_flat_sst_reader().await?;
377                    either::Right(FlatSource::Stream(reader))
378                } else {
379                    let reader = builder.build_sst_reader().await?;
380                    either::Left(Source::Reader(reader))
381                };
382                let (sst_infos, metrics) = sst_layer
383                    .write_sst(
384                        SstWriteRequest {
385                            op_type: OperationType::Compact,
386                            metadata: region_metadata,
387                            source,
388                            cache_manager,
389                            storage,
390                            max_sequence: max_sequence.map(NonZero::get),
391                            index_options,
392                            inverted_index_config,
393                            fulltext_index_config,
394                            bloom_filter_index_config,
395                        },
396                        &write_opts,
397                        WriteType::Compaction,
398                    )
399                    .await?;
400                // Convert partition expression once outside the map
401                let partition_expr = match &region_metadata_for_filemeta.partition_expr {
402                    None => None,
403                    Some(json_str) if json_str.is_empty() => None,
404                    Some(json_str) => {
405                        PartitionExpr::from_json_str(json_str).with_context(|_| {
406                            InvalidPartitionExprSnafu {
407                                expr: json_str.clone(),
408                            }
409                        })?
410                    }
411                };
412
413                let output_files = sst_infos
414                    .into_iter()
415                    .map(|sst_info| FileMeta {
416                        region_id,
417                        file_id: sst_info.file_id,
418                        time_range: sst_info.time_range,
419                        level: output.output_level,
420                        file_size: sst_info.file_size,
421                        available_indexes: sst_info.index_metadata.build_available_indexes(),
422                        index_file_size: sst_info.index_metadata.file_size,
423                        num_rows: sst_info.num_rows as u64,
424                        num_row_groups: sst_info.num_row_groups,
425                        sequence: max_sequence,
426                        partition_expr: partition_expr.clone(),
427                    })
428                    .collect::<Vec<_>>();
429                let output_file_names =
430                    output_files.iter().map(|f| f.file_id.to_string()).join(",");
431                info!(
432                    "Region {} compaction inputs: [{}], outputs: [{}], metrics: {:?}",
433                    region_id, input_file_names, output_file_names, metrics
434                );
435                metrics.observe();
436                Ok(output_files)
437            });
438        }
439        let mut output_files = Vec::with_capacity(futs.len());
440        while !futs.is_empty() {
441            let mut task_chunk = Vec::with_capacity(internal_parallelism);
442            for _ in 0..internal_parallelism {
443                if let Some(task) = futs.pop() {
444                    task_chunk.push(common_runtime::spawn_compact(task));
445                }
446            }
447            let metas = futures::future::try_join_all(task_chunk)
448                .await
449                .context(JoinSnafu)?
450                .into_iter()
451                .collect::<Result<Vec<Vec<_>>>>()?;
452            output_files.extend(metas.into_iter().flatten());
453        }
454
455        let mut inputs: Vec<_> = compacted_inputs.into_iter().collect();
456        inputs.extend(
457            picker_output
458                .expired_ssts
459                .iter()
460                .map(|f| f.meta_ref().clone()),
461        );
462
463        Ok(MergeOutput {
464            files_to_add: output_files,
465            files_to_remove: inputs,
466            compaction_time_window: Some(picker_output.time_window_size),
467        })
468    }
469
470    async fn update_manifest(
471        &self,
472        compaction_region: &CompactionRegion,
473        merge_output: MergeOutput,
474    ) -> Result<RegionEdit> {
475        // Write region edit to manifest.
476        let edit = RegionEdit {
477            files_to_add: merge_output.files_to_add,
478            files_to_remove: merge_output.files_to_remove,
479            // Use current timestamp as the edit timestamp.
480            timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
481            compaction_time_window: merge_output
482                .compaction_time_window
483                .map(|seconds| Duration::from_secs(seconds as u64)),
484            flushed_entry_id: None,
485            flushed_sequence: None,
486            committed_sequence: None,
487        };
488
489        let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
490        // TODO: We might leak files if we fail to update manifest. We can add a cleanup task to remove them later.
491        compaction_region
492            .manifest_ctx
493            .update_manifest(RegionLeaderState::Writable, action_list)
494            .await?;
495
496        Ok(edit)
497    }
498
499    // The default implementation of compact combines the merge_ssts and update_manifest functions.
500    // Note: It's local compaction and only used for testing purpose.
501    async fn compact(
502        &self,
503        compaction_region: &CompactionRegion,
504        compact_request_options: compact_request::Options,
505    ) -> Result<()> {
506        let picker_output = {
507            let picker_output = new_picker(
508                &compact_request_options,
509                &compaction_region.region_options.compaction,
510                compaction_region.region_options.append_mode,
511            )
512            .pick(compaction_region);
513
514            if let Some(picker_output) = picker_output {
515                picker_output
516            } else {
517                info!(
518                    "No files to compact for region_id: {}",
519                    compaction_region.region_id
520                );
521                return Ok(());
522            }
523        };
524
525        let merge_output = self.merge_ssts(compaction_region, picker_output).await?;
526        if merge_output.is_empty() {
527            info!(
528                "No files to compact for region_id: {}",
529                compaction_region.region_id
530            );
531            return Ok(());
532        }
533
534        metrics::COMPACTION_INPUT_BYTES.inc_by(merge_output.input_file_size() as f64);
535        metrics::COMPACTION_OUTPUT_BYTES.inc_by(merge_output.output_file_size() as f64);
536        self.update_manifest(compaction_region, merge_output)
537            .await?;
538
539        Ok(())
540    }
541}