mito2/compaction/
compactor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::num::NonZero;
16use std::sync::Arc;
17use std::time::Duration;
18
19use api::v1::region::compact_request;
20use common_meta::key::SchemaMetadataManagerRef;
21use common_telemetry::{info, warn};
22use common_time::TimeToLive;
23use itertools::Itertools;
24use object_store::manager::ObjectStoreManagerRef;
25use serde::{Deserialize, Serialize};
26use snafu::{OptionExt, ResultExt};
27use store_api::metadata::RegionMetadataRef;
28use store_api::storage::RegionId;
29
30use crate::access_layer::{AccessLayer, AccessLayerRef, OperationType, SstWriteRequest};
31use crate::cache::{CacheManager, CacheManagerRef};
32use crate::compaction::picker::{new_picker, PickerOutput};
33use crate::compaction::{find_ttl, CompactionSstReaderBuilder};
34use crate::config::MitoConfig;
35use crate::error::{EmptyRegionDirSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Result};
36use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
37use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
38use crate::manifest::storage::manifest_compress_type;
39use crate::metrics;
40use crate::read::Source;
41use crate::region::opener::new_manifest_dir;
42use crate::region::options::RegionOptions;
43use crate::region::version::VersionRef;
44use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState};
45use crate::schedule::scheduler::LocalScheduler;
46use crate::sst::file::FileMeta;
47use crate::sst::file_purger::LocalFilePurger;
48use crate::sst::index::intermediate::IntermediateManager;
49use crate::sst::index::puffin_manager::PuffinManagerFactory;
50use crate::sst::parquet::WriteOptions;
51use crate::sst::version::{SstVersion, SstVersionRef};
52
53/// Region version for compaction that does not hold memtables.
54#[derive(Clone)]
55pub struct CompactionVersion {
56    /// Metadata of the region.
57    ///
58    /// Altering metadata isn't frequent, storing metadata in Arc to allow sharing
59    /// metadata and reuse metadata when creating a new `Version`.
60    pub(crate) metadata: RegionMetadataRef,
61    /// Options of the region.
62    pub(crate) options: RegionOptions,
63    /// SSTs of the region.
64    pub(crate) ssts: SstVersionRef,
65    /// Inferred compaction time window.
66    pub(crate) compaction_time_window: Option<Duration>,
67}
68
69impl From<VersionRef> for CompactionVersion {
70    fn from(value: VersionRef) -> Self {
71        Self {
72            metadata: value.metadata.clone(),
73            options: value.options.clone(),
74            ssts: value.ssts.clone(),
75            compaction_time_window: value.compaction_time_window,
76        }
77    }
78}
79
80/// CompactionRegion represents a region that needs to be compacted.
81/// It's the subset of MitoRegion.
82#[derive(Clone)]
83pub struct CompactionRegion {
84    pub region_id: RegionId,
85    pub region_options: RegionOptions,
86    pub region_dir: String,
87
88    pub(crate) engine_config: Arc<MitoConfig>,
89    pub(crate) region_metadata: RegionMetadataRef,
90    pub(crate) cache_manager: CacheManagerRef,
91    pub(crate) access_layer: AccessLayerRef,
92    pub(crate) manifest_ctx: Arc<ManifestContext>,
93    pub(crate) current_version: CompactionVersion,
94    pub(crate) file_purger: Option<Arc<LocalFilePurger>>,
95    pub(crate) ttl: Option<TimeToLive>,
96
97    /// Controls the parallelism of this compaction task. Default is 1.
98    ///
99    /// The parallel is inside this compaction task, not across different compaction tasks.
100    /// It can be different windows of the same compaction task or something like this.
101    pub max_parallelism: usize,
102}
103
104/// OpenCompactionRegionRequest represents the request to open a compaction region.
105#[derive(Debug, Clone)]
106pub struct OpenCompactionRegionRequest {
107    pub region_id: RegionId,
108    pub region_dir: String,
109    pub region_options: RegionOptions,
110    pub max_parallelism: usize,
111}
112
113/// Open a compaction region from a compaction request.
114/// It's simple version of RegionOpener::open().
115pub async fn open_compaction_region(
116    req: &OpenCompactionRegionRequest,
117    mito_config: &MitoConfig,
118    object_store_manager: ObjectStoreManagerRef,
119    schema_metadata_manager: SchemaMetadataManagerRef,
120) -> Result<CompactionRegion> {
121    let object_store = {
122        let name = &req.region_options.storage;
123        if let Some(name) = name {
124            object_store_manager
125                .find(name)
126                .context(ObjectStoreNotFoundSnafu {
127                    object_store: name.to_string(),
128                })?
129        } else {
130            object_store_manager.default_object_store()
131        }
132    };
133
134    let access_layer = {
135        let puffin_manager_factory = PuffinManagerFactory::new(
136            &mito_config.index.aux_path,
137            mito_config.index.staging_size.as_bytes(),
138            Some(mito_config.index.write_buffer_size.as_bytes() as _),
139            mito_config.index.staging_ttl,
140        )
141        .await?;
142        let intermediate_manager =
143            IntermediateManager::init_fs(mito_config.index.aux_path.clone()).await?;
144
145        Arc::new(AccessLayer::new(
146            req.region_dir.as_str(),
147            object_store.clone(),
148            puffin_manager_factory,
149            intermediate_manager,
150        ))
151    };
152
153    let manifest_manager = {
154        let region_manifest_options = RegionManifestOptions {
155            manifest_dir: new_manifest_dir(req.region_dir.as_str()),
156            object_store: object_store.clone(),
157            compress_type: manifest_compress_type(mito_config.compress_manifest),
158            checkpoint_distance: mito_config.manifest_checkpoint_distance,
159        };
160
161        RegionManifestManager::open(
162            region_manifest_options,
163            Default::default(),
164            Default::default(),
165        )
166        .await?
167        .context(EmptyRegionDirSnafu {
168            region_id: req.region_id,
169            region_dir: req.region_dir.as_str(),
170        })?
171    };
172
173    let manifest = manifest_manager.manifest();
174    let region_metadata = manifest.metadata.clone();
175    let manifest_ctx = Arc::new(ManifestContext::new(
176        manifest_manager,
177        RegionRoleState::Leader(RegionLeaderState::Writable),
178    ));
179
180    let file_purger = {
181        let purge_scheduler = Arc::new(LocalScheduler::new(mito_config.max_background_purges));
182        Arc::new(LocalFilePurger::new(
183            purge_scheduler.clone(),
184            access_layer.clone(),
185            None,
186        ))
187    };
188
189    let current_version = {
190        let mut ssts = SstVersion::new();
191        ssts.add_files(file_purger.clone(), manifest.files.values().cloned());
192        CompactionVersion {
193            metadata: region_metadata.clone(),
194            options: req.region_options.clone(),
195            ssts: Arc::new(ssts),
196            compaction_time_window: manifest.compaction_time_window,
197        }
198    };
199
200    let ttl = find_ttl(
201        req.region_id.table_id(),
202        current_version.options.ttl,
203        &schema_metadata_manager,
204    )
205    .await
206    .unwrap_or_else(|e| {
207        warn!(e; "Failed to get ttl for region: {}", region_metadata.region_id);
208        TimeToLive::default()
209    });
210    Ok(CompactionRegion {
211        region_id: req.region_id,
212        region_options: req.region_options.clone(),
213        region_dir: req.region_dir.clone(),
214        engine_config: Arc::new(mito_config.clone()),
215        region_metadata: region_metadata.clone(),
216        cache_manager: Arc::new(CacheManager::default()),
217        access_layer,
218        manifest_ctx,
219        current_version,
220        file_purger: Some(file_purger),
221        ttl: Some(ttl),
222        max_parallelism: req.max_parallelism,
223    })
224}
225
226impl CompactionRegion {
227    pub fn file_purger(&self) -> Option<Arc<LocalFilePurger>> {
228        self.file_purger.clone()
229    }
230}
231
232/// `[MergeOutput]` represents the output of merging SST files.
233#[derive(Default, Clone, Debug, Serialize, Deserialize)]
234pub struct MergeOutput {
235    pub files_to_add: Vec<FileMeta>,
236    pub files_to_remove: Vec<FileMeta>,
237    pub compaction_time_window: Option<i64>,
238}
239
240impl MergeOutput {
241    pub fn is_empty(&self) -> bool {
242        self.files_to_add.is_empty() && self.files_to_remove.is_empty()
243    }
244
245    pub fn input_file_size(&self) -> u64 {
246        self.files_to_remove.iter().map(|f| f.file_size).sum()
247    }
248
249    pub fn output_file_size(&self) -> u64 {
250        self.files_to_add.iter().map(|f| f.file_size).sum()
251    }
252}
253
254/// Compactor is the trait that defines the compaction logic.
255#[async_trait::async_trait]
256pub trait Compactor: Send + Sync + 'static {
257    /// Merge SST files for a region.
258    async fn merge_ssts(
259        &self,
260        compaction_region: &CompactionRegion,
261        picker_output: PickerOutput,
262    ) -> Result<MergeOutput>;
263
264    /// Update the manifest after merging SST files.
265    async fn update_manifest(
266        &self,
267        compaction_region: &CompactionRegion,
268        merge_output: MergeOutput,
269    ) -> Result<RegionEdit>;
270
271    /// Execute compaction for a region.
272    async fn compact(
273        &self,
274        compaction_region: &CompactionRegion,
275        compact_request_options: compact_request::Options,
276    ) -> Result<()>;
277}
278
279/// DefaultCompactor is the default implementation of Compactor.
280pub struct DefaultCompactor;
281
282#[async_trait::async_trait]
283impl Compactor for DefaultCompactor {
284    async fn merge_ssts(
285        &self,
286        compaction_region: &CompactionRegion,
287        mut picker_output: PickerOutput,
288    ) -> Result<MergeOutput> {
289        let mut futs = Vec::with_capacity(picker_output.outputs.len());
290        let mut compacted_inputs =
291            Vec::with_capacity(picker_output.outputs.iter().map(|o| o.inputs.len()).sum());
292        let internal_parallelism = compaction_region.max_parallelism.max(1);
293
294        for output in picker_output.outputs.drain(..) {
295            compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone()));
296            let write_opts = WriteOptions {
297                write_buffer_size: compaction_region.engine_config.sst_write_buffer_size,
298                max_file_size: picker_output.max_file_size,
299                ..Default::default()
300            };
301
302            let region_metadata = compaction_region.region_metadata.clone();
303            let sst_layer = compaction_region.access_layer.clone();
304            let region_id = compaction_region.region_id;
305            let cache_manager = compaction_region.cache_manager.clone();
306            let storage = compaction_region.region_options.storage.clone();
307            let index_options = compaction_region
308                .current_version
309                .options
310                .index_options
311                .clone();
312            let append_mode = compaction_region.current_version.options.append_mode;
313            let merge_mode = compaction_region.current_version.options.merge_mode();
314            let inverted_index_config = compaction_region.engine_config.inverted_index.clone();
315            let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone();
316            let bloom_filter_index_config =
317                compaction_region.engine_config.bloom_filter_index.clone();
318            let max_sequence = output
319                .inputs
320                .iter()
321                .map(|f| f.meta_ref().sequence)
322                .max()
323                .flatten();
324            futs.push(async move {
325                let input_file_names = output
326                    .inputs
327                    .iter()
328                    .map(|f| f.file_id().to_string())
329                    .join(",");
330                let reader = CompactionSstReaderBuilder {
331                    metadata: region_metadata.clone(),
332                    sst_layer: sst_layer.clone(),
333                    cache: cache_manager.clone(),
334                    inputs: &output.inputs,
335                    append_mode,
336                    filter_deleted: output.filter_deleted,
337                    time_range: output.output_time_range,
338                    merge_mode,
339                }
340                .build_sst_reader()
341                .await?;
342                let output_files = sst_layer
343                    .write_sst(
344                        SstWriteRequest {
345                            op_type: OperationType::Compact,
346                            metadata: region_metadata,
347                            source: Source::Reader(reader),
348                            cache_manager,
349                            storage,
350                            max_sequence: max_sequence.map(NonZero::get),
351                            index_options,
352                            inverted_index_config,
353                            fulltext_index_config,
354                            bloom_filter_index_config,
355                        },
356                        &write_opts,
357                    )
358                    .await?
359                    .into_iter()
360                    .map(|sst_info| FileMeta {
361                        region_id,
362                        file_id: sst_info.file_id,
363                        time_range: sst_info.time_range,
364                        level: output.output_level,
365                        file_size: sst_info.file_size,
366                        available_indexes: sst_info.index_metadata.build_available_indexes(),
367                        index_file_size: sst_info.index_metadata.file_size,
368                        num_rows: sst_info.num_rows as u64,
369                        num_row_groups: sst_info.num_row_groups,
370                        sequence: max_sequence,
371                    })
372                    .collect::<Vec<_>>();
373                let output_file_names =
374                    output_files.iter().map(|f| f.file_id.to_string()).join(",");
375                info!(
376                    "Region {} compaction inputs: [{}], outputs: [{}]",
377                    region_id, input_file_names, output_file_names
378                );
379                Ok(output_files)
380            });
381        }
382        let mut output_files = Vec::with_capacity(futs.len());
383        while !futs.is_empty() {
384            let mut task_chunk = Vec::with_capacity(internal_parallelism);
385            for _ in 0..internal_parallelism {
386                if let Some(task) = futs.pop() {
387                    task_chunk.push(common_runtime::spawn_compact(task));
388                }
389            }
390            let metas = futures::future::try_join_all(task_chunk)
391                .await
392                .context(JoinSnafu)?
393                .into_iter()
394                .collect::<Result<Vec<Vec<_>>>>()?;
395            output_files.extend(metas.into_iter().flatten());
396        }
397
398        let mut inputs: Vec<_> = compacted_inputs.into_iter().collect();
399        inputs.extend(
400            picker_output
401                .expired_ssts
402                .iter()
403                .map(|f| f.meta_ref().clone()),
404        );
405
406        Ok(MergeOutput {
407            files_to_add: output_files,
408            files_to_remove: inputs,
409            compaction_time_window: Some(picker_output.time_window_size),
410        })
411    }
412
413    async fn update_manifest(
414        &self,
415        compaction_region: &CompactionRegion,
416        merge_output: MergeOutput,
417    ) -> Result<RegionEdit> {
418        // Write region edit to manifest.
419        let edit = RegionEdit {
420            files_to_add: merge_output.files_to_add,
421            files_to_remove: merge_output.files_to_remove,
422            compaction_time_window: merge_output
423                .compaction_time_window
424                .map(|seconds| Duration::from_secs(seconds as u64)),
425            flushed_entry_id: None,
426            flushed_sequence: None,
427        };
428
429        let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
430        // TODO: We might leak files if we fail to update manifest. We can add a cleanup task to remove them later.
431        compaction_region
432            .manifest_ctx
433            .update_manifest(RegionLeaderState::Writable, action_list)
434            .await?;
435
436        Ok(edit)
437    }
438
439    // The default implementation of compact combines the merge_ssts and update_manifest functions.
440    // Note: It's local compaction and only used for testing purpose.
441    async fn compact(
442        &self,
443        compaction_region: &CompactionRegion,
444        compact_request_options: compact_request::Options,
445    ) -> Result<()> {
446        let picker_output = {
447            let picker_output = new_picker(
448                &compact_request_options,
449                &compaction_region.region_options.compaction,
450                compaction_region.region_options.append_mode,
451            )
452            .pick(compaction_region);
453
454            if let Some(picker_output) = picker_output {
455                picker_output
456            } else {
457                info!(
458                    "No files to compact for region_id: {}",
459                    compaction_region.region_id
460                );
461                return Ok(());
462            }
463        };
464
465        let merge_output = self.merge_ssts(compaction_region, picker_output).await?;
466        if merge_output.is_empty() {
467            info!(
468                "No files to compact for region_id: {}",
469                compaction_region.region_id
470            );
471            return Ok(());
472        }
473
474        metrics::COMPACTION_INPUT_BYTES.inc_by(merge_output.input_file_size() as f64);
475        metrics::COMPACTION_OUTPUT_BYTES.inc_by(merge_output.output_file_size() as f64);
476        self.update_manifest(compaction_region, merge_output)
477            .await?;
478
479        Ok(())
480    }
481}