mito2/compaction/
compactor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::num::NonZero;
16use std::sync::Arc;
17use std::time::Duration;
18
19use api::v1::region::compact_request;
20use common_meta::key::SchemaMetadataManagerRef;
21use common_telemetry::{info, warn};
22use common_time::TimeToLive;
23use either::Either;
24use itertools::Itertools;
25use object_store::manager::ObjectStoreManagerRef;
26use partition::expr::PartitionExpr;
27use serde::{Deserialize, Serialize};
28use snafu::{OptionExt, ResultExt};
29use store_api::metadata::RegionMetadataRef;
30use store_api::region_request::PathType;
31use store_api::storage::RegionId;
32
33use crate::access_layer::{AccessLayer, AccessLayerRef, OperationType, SstWriteRequest, WriteType};
34use crate::cache::{CacheManager, CacheManagerRef};
35use crate::compaction::picker::{PickerOutput, new_picker};
36use crate::compaction::{CompactionSstReaderBuilder, find_ttl};
37use crate::config::MitoConfig;
38use crate::error::{
39    EmptyRegionDirSnafu, InvalidPartitionExprSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Result,
40};
41use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
42use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, RemoveFileOptions};
43use crate::manifest::storage::manifest_compress_type;
44use crate::metrics;
45use crate::read::{FlatSource, Source};
46use crate::region::opener::new_manifest_dir;
47use crate::region::options::RegionOptions;
48use crate::region::version::VersionRef;
49use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState};
50use crate::schedule::scheduler::LocalScheduler;
51use crate::sst::FormatType;
52use crate::sst::file::FileMeta;
53use crate::sst::file_purger::LocalFilePurger;
54use crate::sst::index::intermediate::IntermediateManager;
55use crate::sst::index::puffin_manager::PuffinManagerFactory;
56use crate::sst::location::region_dir_from_table_dir;
57use crate::sst::parquet::WriteOptions;
58use crate::sst::version::{SstVersion, SstVersionRef};
59
60/// Region version for compaction that does not hold memtables.
61#[derive(Clone)]
62pub struct CompactionVersion {
63    /// Metadata of the region.
64    ///
65    /// Altering metadata isn't frequent, storing metadata in Arc to allow sharing
66    /// metadata and reuse metadata when creating a new `Version`.
67    pub(crate) metadata: RegionMetadataRef,
68    /// Options of the region.
69    pub(crate) options: RegionOptions,
70    /// SSTs of the region.
71    pub(crate) ssts: SstVersionRef,
72    /// Inferred compaction time window.
73    pub(crate) compaction_time_window: Option<Duration>,
74}
75
76impl From<VersionRef> for CompactionVersion {
77    fn from(value: VersionRef) -> Self {
78        Self {
79            metadata: value.metadata.clone(),
80            options: value.options.clone(),
81            ssts: value.ssts.clone(),
82            compaction_time_window: value.compaction_time_window,
83        }
84    }
85}
86
87/// CompactionRegion represents a region that needs to be compacted.
88/// It's the subset of MitoRegion.
89#[derive(Clone)]
90pub struct CompactionRegion {
91    pub region_id: RegionId,
92    pub region_options: RegionOptions,
93
94    pub(crate) engine_config: Arc<MitoConfig>,
95    pub(crate) region_metadata: RegionMetadataRef,
96    pub(crate) cache_manager: CacheManagerRef,
97    /// Access layer to get the table path and path type.
98    pub access_layer: AccessLayerRef,
99    pub(crate) manifest_ctx: Arc<ManifestContext>,
100    pub(crate) current_version: CompactionVersion,
101    pub(crate) file_purger: Option<Arc<LocalFilePurger>>,
102    pub(crate) ttl: Option<TimeToLive>,
103
104    /// Controls the parallelism of this compaction task. Default is 1.
105    ///
106    /// The parallel is inside this compaction task, not across different compaction tasks.
107    /// It can be different windows of the same compaction task or something like this.
108    pub max_parallelism: usize,
109}
110
111/// OpenCompactionRegionRequest represents the request to open a compaction region.
112#[derive(Debug, Clone)]
113pub struct OpenCompactionRegionRequest {
114    pub region_id: RegionId,
115    pub table_dir: String,
116    pub path_type: PathType,
117    pub region_options: RegionOptions,
118    pub max_parallelism: usize,
119}
120
121/// Open a compaction region from a compaction request.
122/// It's simple version of RegionOpener::open().
123pub async fn open_compaction_region(
124    req: &OpenCompactionRegionRequest,
125    mito_config: &MitoConfig,
126    object_store_manager: ObjectStoreManagerRef,
127    ttl_provider: Either<TimeToLive, SchemaMetadataManagerRef>,
128) -> Result<CompactionRegion> {
129    let object_store = {
130        let name = &req.region_options.storage;
131        if let Some(name) = name {
132            object_store_manager
133                .find(name)
134                .with_context(|| ObjectStoreNotFoundSnafu {
135                    object_store: name.clone(),
136                })?
137        } else {
138            object_store_manager.default_object_store()
139        }
140    };
141
142    let access_layer = {
143        let puffin_manager_factory = PuffinManagerFactory::new(
144            &mito_config.index.aux_path,
145            mito_config.index.staging_size.as_bytes(),
146            Some(mito_config.index.write_buffer_size.as_bytes() as _),
147            mito_config.index.staging_ttl,
148        )
149        .await?;
150        let intermediate_manager =
151            IntermediateManager::init_fs(mito_config.index.aux_path.clone()).await?;
152
153        Arc::new(AccessLayer::new(
154            &req.table_dir,
155            req.path_type,
156            object_store.clone(),
157            puffin_manager_factory,
158            intermediate_manager,
159        ))
160    };
161
162    let manifest_manager = {
163        let region_manifest_options = RegionManifestOptions {
164            manifest_dir: new_manifest_dir(&region_dir_from_table_dir(
165                &req.table_dir,
166                req.region_id,
167                req.path_type,
168            )),
169            object_store: object_store.clone(),
170            compress_type: manifest_compress_type(mito_config.compress_manifest),
171            checkpoint_distance: mito_config.manifest_checkpoint_distance,
172            remove_file_options: RemoveFileOptions {
173                keep_count: mito_config.experimental_manifest_keep_removed_file_count,
174                keep_ttl: mito_config.experimental_manifest_keep_removed_file_ttl,
175            },
176        };
177
178        RegionManifestManager::open(
179            region_manifest_options,
180            Default::default(),
181            Default::default(),
182        )
183        .await?
184        .context(EmptyRegionDirSnafu {
185            region_id: req.region_id,
186            region_dir: &region_dir_from_table_dir(&req.table_dir, req.region_id, req.path_type),
187        })?
188    };
189
190    let manifest = manifest_manager.manifest();
191    let region_metadata = manifest.metadata.clone();
192    let manifest_ctx = Arc::new(ManifestContext::new(
193        manifest_manager,
194        RegionRoleState::Leader(RegionLeaderState::Writable),
195    ));
196
197    let file_purger = {
198        let purge_scheduler = Arc::new(LocalScheduler::new(mito_config.max_background_purges));
199        Arc::new(LocalFilePurger::new(
200            purge_scheduler.clone(),
201            access_layer.clone(),
202            None,
203        ))
204    };
205
206    let current_version = {
207        let mut ssts = SstVersion::new();
208        ssts.add_files(file_purger.clone(), manifest.files.values().cloned());
209        CompactionVersion {
210            metadata: region_metadata.clone(),
211            options: req.region_options.clone(),
212            ssts: Arc::new(ssts),
213            compaction_time_window: manifest.compaction_time_window,
214        }
215    };
216
217    let ttl = match ttl_provider {
218        // Use the specified ttl.
219        Either::Left(ttl) => ttl,
220        // Get the ttl from the schema metadata manager.
221        Either::Right(schema_metadata_manager) => find_ttl(
222            req.region_id.table_id(),
223            current_version.options.ttl,
224            &schema_metadata_manager,
225        )
226        .await
227        .unwrap_or_else(|e| {
228            warn!(e; "Failed to get ttl for region: {}", region_metadata.region_id);
229            TimeToLive::default()
230        }),
231    };
232
233    Ok(CompactionRegion {
234        region_id: req.region_id,
235        region_options: req.region_options.clone(),
236        engine_config: Arc::new(mito_config.clone()),
237        region_metadata: region_metadata.clone(),
238        cache_manager: Arc::new(CacheManager::default()),
239        access_layer,
240        manifest_ctx,
241        current_version,
242        file_purger: Some(file_purger),
243        ttl: Some(ttl),
244        max_parallelism: req.max_parallelism,
245    })
246}
247
248impl CompactionRegion {
249    /// Get the file purger of the compaction region.
250    pub fn file_purger(&self) -> Option<Arc<LocalFilePurger>> {
251        self.file_purger.clone()
252    }
253
254    /// Stop the file purger scheduler of the compaction region.
255    pub async fn stop_purger_scheduler(&self) -> Result<()> {
256        if let Some(file_purger) = &self.file_purger {
257            file_purger.stop_scheduler().await
258        } else {
259            Ok(())
260        }
261    }
262}
263
264/// `[MergeOutput]` represents the output of merging SST files.
265#[derive(Default, Clone, Debug, Serialize, Deserialize)]
266pub struct MergeOutput {
267    pub files_to_add: Vec<FileMeta>,
268    pub files_to_remove: Vec<FileMeta>,
269    pub compaction_time_window: Option<i64>,
270}
271
272impl MergeOutput {
273    pub fn is_empty(&self) -> bool {
274        self.files_to_add.is_empty() && self.files_to_remove.is_empty()
275    }
276
277    pub fn input_file_size(&self) -> u64 {
278        self.files_to_remove.iter().map(|f| f.file_size).sum()
279    }
280
281    pub fn output_file_size(&self) -> u64 {
282        self.files_to_add.iter().map(|f| f.file_size).sum()
283    }
284}
285
286/// Compactor is the trait that defines the compaction logic.
287#[async_trait::async_trait]
288pub trait Compactor: Send + Sync + 'static {
289    /// Merge SST files for a region.
290    async fn merge_ssts(
291        &self,
292        compaction_region: &CompactionRegion,
293        picker_output: PickerOutput,
294    ) -> Result<MergeOutput>;
295
296    /// Update the manifest after merging SST files.
297    async fn update_manifest(
298        &self,
299        compaction_region: &CompactionRegion,
300        merge_output: MergeOutput,
301    ) -> Result<RegionEdit>;
302
303    /// Execute compaction for a region.
304    async fn compact(
305        &self,
306        compaction_region: &CompactionRegion,
307        compact_request_options: compact_request::Options,
308    ) -> Result<()>;
309}
310
311/// DefaultCompactor is the default implementation of Compactor.
312pub struct DefaultCompactor;
313
314#[async_trait::async_trait]
315impl Compactor for DefaultCompactor {
316    async fn merge_ssts(
317        &self,
318        compaction_region: &CompactionRegion,
319        mut picker_output: PickerOutput,
320    ) -> Result<MergeOutput> {
321        let mut futs = Vec::with_capacity(picker_output.outputs.len());
322        let mut compacted_inputs =
323            Vec::with_capacity(picker_output.outputs.iter().map(|o| o.inputs.len()).sum());
324        let internal_parallelism = compaction_region.max_parallelism.max(1);
325
326        for output in picker_output.outputs.drain(..) {
327            compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone()));
328            let write_opts = WriteOptions {
329                write_buffer_size: compaction_region.engine_config.sst_write_buffer_size,
330                max_file_size: picker_output.max_file_size,
331                ..Default::default()
332            };
333
334            let region_metadata = compaction_region.region_metadata.clone();
335            let sst_layer = compaction_region.access_layer.clone();
336            let region_id = compaction_region.region_id;
337            let cache_manager = compaction_region.cache_manager.clone();
338            let storage = compaction_region.region_options.storage.clone();
339            let index_options = compaction_region
340                .current_version
341                .options
342                .index_options
343                .clone();
344            let append_mode = compaction_region.current_version.options.append_mode;
345            let merge_mode = compaction_region.current_version.options.merge_mode();
346            let flat_format = compaction_region
347                .region_options
348                .sst_format
349                .map(|format| format == FormatType::Flat)
350                .unwrap_or(
351                    compaction_region
352                        .engine_config
353                        .default_experimental_flat_format,
354                );
355            let index_config = compaction_region.engine_config.index.clone();
356            let inverted_index_config = compaction_region.engine_config.inverted_index.clone();
357            let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone();
358            let bloom_filter_index_config =
359                compaction_region.engine_config.bloom_filter_index.clone();
360            let max_sequence = output
361                .inputs
362                .iter()
363                .map(|f| f.meta_ref().sequence)
364                .max()
365                .flatten();
366            let region_metadata_for_filemeta = region_metadata.clone();
367            futs.push(async move {
368                let input_file_names = output
369                    .inputs
370                    .iter()
371                    .map(|f| f.file_id().to_string())
372                    .join(",");
373                let builder = CompactionSstReaderBuilder {
374                    metadata: region_metadata.clone(),
375                    sst_layer: sst_layer.clone(),
376                    cache: cache_manager.clone(),
377                    inputs: &output.inputs,
378                    append_mode,
379                    filter_deleted: output.filter_deleted,
380                    time_range: output.output_time_range,
381                    merge_mode,
382                };
383                let source = if flat_format {
384                    let reader = builder.build_flat_sst_reader().await?;
385                    either::Right(FlatSource::Stream(reader))
386                } else {
387                    let reader = builder.build_sst_reader().await?;
388                    either::Left(Source::Reader(reader))
389                };
390                let (sst_infos, metrics) = sst_layer
391                    .write_sst(
392                        SstWriteRequest {
393                            op_type: OperationType::Compact,
394                            metadata: region_metadata,
395                            source,
396                            cache_manager,
397                            storage,
398                            max_sequence: max_sequence.map(NonZero::get),
399                            index_options,
400                            index_config,
401                            inverted_index_config,
402                            fulltext_index_config,
403                            bloom_filter_index_config,
404                        },
405                        &write_opts,
406                        WriteType::Compaction,
407                    )
408                    .await?;
409                // Convert partition expression once outside the map
410                let partition_expr = match &region_metadata_for_filemeta.partition_expr {
411                    None => None,
412                    Some(json_str) if json_str.is_empty() => None,
413                    Some(json_str) => {
414                        PartitionExpr::from_json_str(json_str).with_context(|_| {
415                            InvalidPartitionExprSnafu {
416                                expr: json_str.clone(),
417                            }
418                        })?
419                    }
420                };
421
422                let output_files = sst_infos
423                    .into_iter()
424                    .map(|sst_info| FileMeta {
425                        region_id,
426                        file_id: sst_info.file_id,
427                        time_range: sst_info.time_range,
428                        level: output.output_level,
429                        file_size: sst_info.file_size,
430                        available_indexes: sst_info.index_metadata.build_available_indexes(),
431                        index_file_size: sst_info.index_metadata.file_size,
432                        num_rows: sst_info.num_rows as u64,
433                        num_row_groups: sst_info.num_row_groups,
434                        sequence: max_sequence,
435                        partition_expr: partition_expr.clone(),
436                    })
437                    .collect::<Vec<_>>();
438                let output_file_names =
439                    output_files.iter().map(|f| f.file_id.to_string()).join(",");
440                info!(
441                    "Region {} compaction inputs: [{}], outputs: [{}], flat_format: {}, metrics: {:?}",
442                    region_id, input_file_names, output_file_names, flat_format, metrics
443                );
444                metrics.observe();
445                Ok(output_files)
446            });
447        }
448        let mut output_files = Vec::with_capacity(futs.len());
449        while !futs.is_empty() {
450            let mut task_chunk = Vec::with_capacity(internal_parallelism);
451            for _ in 0..internal_parallelism {
452                if let Some(task) = futs.pop() {
453                    task_chunk.push(common_runtime::spawn_compact(task));
454                }
455            }
456            let metas = futures::future::try_join_all(task_chunk)
457                .await
458                .context(JoinSnafu)?
459                .into_iter()
460                .collect::<Result<Vec<Vec<_>>>>()?;
461            output_files.extend(metas.into_iter().flatten());
462        }
463
464        let mut inputs: Vec<_> = compacted_inputs.into_iter().collect();
465        inputs.extend(
466            picker_output
467                .expired_ssts
468                .iter()
469                .map(|f| f.meta_ref().clone()),
470        );
471
472        Ok(MergeOutput {
473            files_to_add: output_files,
474            files_to_remove: inputs,
475            compaction_time_window: Some(picker_output.time_window_size),
476        })
477    }
478
479    async fn update_manifest(
480        &self,
481        compaction_region: &CompactionRegion,
482        merge_output: MergeOutput,
483    ) -> Result<RegionEdit> {
484        // Write region edit to manifest.
485        let edit = RegionEdit {
486            files_to_add: merge_output.files_to_add,
487            files_to_remove: merge_output.files_to_remove,
488            // Use current timestamp as the edit timestamp.
489            timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
490            compaction_time_window: merge_output
491                .compaction_time_window
492                .map(|seconds| Duration::from_secs(seconds as u64)),
493            flushed_entry_id: None,
494            flushed_sequence: None,
495            committed_sequence: None,
496        };
497
498        let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
499        // TODO: We might leak files if we fail to update manifest. We can add a cleanup task to remove them later.
500        compaction_region
501            .manifest_ctx
502            .update_manifest(RegionLeaderState::Writable, action_list)
503            .await?;
504
505        Ok(edit)
506    }
507
508    // The default implementation of compact combines the merge_ssts and update_manifest functions.
509    // Note: It's local compaction and only used for testing purpose.
510    async fn compact(
511        &self,
512        compaction_region: &CompactionRegion,
513        compact_request_options: compact_request::Options,
514    ) -> Result<()> {
515        let picker_output = {
516            let picker_output = new_picker(
517                &compact_request_options,
518                &compaction_region.region_options.compaction,
519                compaction_region.region_options.append_mode,
520            )
521            .pick(compaction_region);
522
523            if let Some(picker_output) = picker_output {
524                picker_output
525            } else {
526                info!(
527                    "No files to compact for region_id: {}",
528                    compaction_region.region_id
529                );
530                return Ok(());
531            }
532        };
533
534        let merge_output = self.merge_ssts(compaction_region, picker_output).await?;
535        if merge_output.is_empty() {
536            info!(
537                "No files to compact for region_id: {}",
538                compaction_region.region_id
539            );
540            return Ok(());
541        }
542
543        metrics::COMPACTION_INPUT_BYTES.inc_by(merge_output.input_file_size() as f64);
544        metrics::COMPACTION_OUTPUT_BYTES.inc_by(merge_output.output_file_size() as f64);
545        self.update_manifest(compaction_region, merge_output)
546            .await?;
547
548        Ok(())
549    }
550}