mito2/compaction/
compactor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::num::NonZero;
16use std::sync::Arc;
17use std::time::Duration;
18
19use api::v1::region::compact_request;
20use common_meta::key::SchemaMetadataManagerRef;
21use common_telemetry::{info, warn};
22use common_time::TimeToLive;
23use either::Either;
24use itertools::Itertools;
25use object_store::manager::ObjectStoreManagerRef;
26use serde::{Deserialize, Serialize};
27use snafu::{OptionExt, ResultExt};
28use store_api::metadata::RegionMetadataRef;
29use store_api::region_request::PathType;
30use store_api::storage::RegionId;
31
32use crate::access_layer::{AccessLayer, AccessLayerRef, OperationType, SstWriteRequest, WriteType};
33use crate::cache::{CacheManager, CacheManagerRef};
34use crate::compaction::picker::{new_picker, PickerOutput};
35use crate::compaction::{find_ttl, CompactionSstReaderBuilder};
36use crate::config::MitoConfig;
37use crate::error::{EmptyRegionDirSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Result};
38use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
39use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions, RemoveFileOptions};
40use crate::manifest::storage::manifest_compress_type;
41use crate::metrics;
42use crate::read::Source;
43use crate::region::opener::new_manifest_dir;
44use crate::region::options::RegionOptions;
45use crate::region::version::VersionRef;
46use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState};
47use crate::schedule::scheduler::LocalScheduler;
48use crate::sst::file::FileMeta;
49use crate::sst::file_purger::LocalFilePurger;
50use crate::sst::index::intermediate::IntermediateManager;
51use crate::sst::index::puffin_manager::PuffinManagerFactory;
52use crate::sst::location::region_dir_from_table_dir;
53use crate::sst::parquet::WriteOptions;
54use crate::sst::version::{SstVersion, SstVersionRef};
55
56/// Region version for compaction that does not hold memtables.
57#[derive(Clone)]
58pub struct CompactionVersion {
59    /// Metadata of the region.
60    ///
61    /// Altering metadata isn't frequent, storing metadata in Arc to allow sharing
62    /// metadata and reuse metadata when creating a new `Version`.
63    pub(crate) metadata: RegionMetadataRef,
64    /// Options of the region.
65    pub(crate) options: RegionOptions,
66    /// SSTs of the region.
67    pub(crate) ssts: SstVersionRef,
68    /// Inferred compaction time window.
69    pub(crate) compaction_time_window: Option<Duration>,
70}
71
72impl From<VersionRef> for CompactionVersion {
73    fn from(value: VersionRef) -> Self {
74        Self {
75            metadata: value.metadata.clone(),
76            options: value.options.clone(),
77            ssts: value.ssts.clone(),
78            compaction_time_window: value.compaction_time_window,
79        }
80    }
81}
82
83/// CompactionRegion represents a region that needs to be compacted.
84/// It's the subset of MitoRegion.
85#[derive(Clone)]
86pub struct CompactionRegion {
87    pub region_id: RegionId,
88    pub region_options: RegionOptions,
89
90    pub(crate) engine_config: Arc<MitoConfig>,
91    pub(crate) region_metadata: RegionMetadataRef,
92    pub(crate) cache_manager: CacheManagerRef,
93    /// Access layer to get the table path and path type.
94    pub access_layer: AccessLayerRef,
95    pub(crate) manifest_ctx: Arc<ManifestContext>,
96    pub(crate) current_version: CompactionVersion,
97    pub(crate) file_purger: Option<Arc<LocalFilePurger>>,
98    pub(crate) ttl: Option<TimeToLive>,
99
100    /// Controls the parallelism of this compaction task. Default is 1.
101    ///
102    /// The parallel is inside this compaction task, not across different compaction tasks.
103    /// It can be different windows of the same compaction task or something like this.
104    pub max_parallelism: usize,
105}
106
107/// OpenCompactionRegionRequest represents the request to open a compaction region.
108#[derive(Debug, Clone)]
109pub struct OpenCompactionRegionRequest {
110    pub region_id: RegionId,
111    pub table_dir: String,
112    pub path_type: PathType,
113    pub region_options: RegionOptions,
114    pub max_parallelism: usize,
115}
116
117/// Open a compaction region from a compaction request.
118/// It's simple version of RegionOpener::open().
119pub async fn open_compaction_region(
120    req: &OpenCompactionRegionRequest,
121    mito_config: &MitoConfig,
122    object_store_manager: ObjectStoreManagerRef,
123    ttl_provider: Either<TimeToLive, SchemaMetadataManagerRef>,
124) -> Result<CompactionRegion> {
125    let object_store = {
126        let name = &req.region_options.storage;
127        if let Some(name) = name {
128            object_store_manager
129                .find(name)
130                .context(ObjectStoreNotFoundSnafu {
131                    object_store: name.to_string(),
132                })?
133        } else {
134            object_store_manager.default_object_store()
135        }
136    };
137
138    let access_layer = {
139        let puffin_manager_factory = PuffinManagerFactory::new(
140            &mito_config.index.aux_path,
141            mito_config.index.staging_size.as_bytes(),
142            Some(mito_config.index.write_buffer_size.as_bytes() as _),
143            mito_config.index.staging_ttl,
144        )
145        .await?;
146        let intermediate_manager =
147            IntermediateManager::init_fs(mito_config.index.aux_path.clone()).await?;
148
149        Arc::new(AccessLayer::new(
150            &req.table_dir,
151            req.path_type,
152            object_store.clone(),
153            puffin_manager_factory,
154            intermediate_manager,
155        ))
156    };
157
158    let manifest_manager = {
159        let region_manifest_options = RegionManifestOptions {
160            manifest_dir: new_manifest_dir(&region_dir_from_table_dir(
161                &req.table_dir,
162                req.region_id,
163                req.path_type,
164            )),
165            object_store: object_store.clone(),
166            compress_type: manifest_compress_type(mito_config.compress_manifest),
167            checkpoint_distance: mito_config.manifest_checkpoint_distance,
168            remove_file_options: RemoveFileOptions {
169                keep_count: mito_config.experimental_manifest_keep_removed_file_count,
170                keep_ttl: mito_config.experimental_manifest_keep_removed_file_ttl,
171            },
172        };
173
174        RegionManifestManager::open(
175            region_manifest_options,
176            Default::default(),
177            Default::default(),
178        )
179        .await?
180        .context(EmptyRegionDirSnafu {
181            region_id: req.region_id,
182            region_dir: &region_dir_from_table_dir(&req.table_dir, req.region_id, req.path_type),
183        })?
184    };
185
186    let manifest = manifest_manager.manifest();
187    let region_metadata = manifest.metadata.clone();
188    let manifest_ctx = Arc::new(ManifestContext::new(
189        manifest_manager,
190        RegionRoleState::Leader(RegionLeaderState::Writable),
191    ));
192
193    let file_purger = {
194        let purge_scheduler = Arc::new(LocalScheduler::new(mito_config.max_background_purges));
195        Arc::new(LocalFilePurger::new(
196            purge_scheduler.clone(),
197            access_layer.clone(),
198            None,
199        ))
200    };
201
202    let current_version = {
203        let mut ssts = SstVersion::new();
204        ssts.add_files(file_purger.clone(), manifest.files.values().cloned());
205        CompactionVersion {
206            metadata: region_metadata.clone(),
207            options: req.region_options.clone(),
208            ssts: Arc::new(ssts),
209            compaction_time_window: manifest.compaction_time_window,
210        }
211    };
212
213    let ttl = match ttl_provider {
214        // Use the specified ttl.
215        Either::Left(ttl) => ttl,
216        // Get the ttl from the schema metadata manager.
217        Either::Right(schema_metadata_manager) => find_ttl(
218            req.region_id.table_id(),
219            current_version.options.ttl,
220            &schema_metadata_manager,
221        )
222        .await
223        .unwrap_or_else(|e| {
224            warn!(e; "Failed to get ttl for region: {}", region_metadata.region_id);
225            TimeToLive::default()
226        }),
227    };
228
229    Ok(CompactionRegion {
230        region_id: req.region_id,
231        region_options: req.region_options.clone(),
232        engine_config: Arc::new(mito_config.clone()),
233        region_metadata: region_metadata.clone(),
234        cache_manager: Arc::new(CacheManager::default()),
235        access_layer,
236        manifest_ctx,
237        current_version,
238        file_purger: Some(file_purger),
239        ttl: Some(ttl),
240        max_parallelism: req.max_parallelism,
241    })
242}
243
244impl CompactionRegion {
245    /// Get the file purger of the compaction region.
246    pub fn file_purger(&self) -> Option<Arc<LocalFilePurger>> {
247        self.file_purger.clone()
248    }
249
250    /// Stop the file purger scheduler of the compaction region.
251    pub async fn stop_purger_scheduler(&self) -> Result<()> {
252        if let Some(file_purger) = &self.file_purger {
253            file_purger.stop_scheduler().await
254        } else {
255            Ok(())
256        }
257    }
258}
259
260/// `[MergeOutput]` represents the output of merging SST files.
261#[derive(Default, Clone, Debug, Serialize, Deserialize)]
262pub struct MergeOutput {
263    pub files_to_add: Vec<FileMeta>,
264    pub files_to_remove: Vec<FileMeta>,
265    pub compaction_time_window: Option<i64>,
266}
267
268impl MergeOutput {
269    pub fn is_empty(&self) -> bool {
270        self.files_to_add.is_empty() && self.files_to_remove.is_empty()
271    }
272
273    pub fn input_file_size(&self) -> u64 {
274        self.files_to_remove.iter().map(|f| f.file_size).sum()
275    }
276
277    pub fn output_file_size(&self) -> u64 {
278        self.files_to_add.iter().map(|f| f.file_size).sum()
279    }
280}
281
282/// Compactor is the trait that defines the compaction logic.
283#[async_trait::async_trait]
284pub trait Compactor: Send + Sync + 'static {
285    /// Merge SST files for a region.
286    async fn merge_ssts(
287        &self,
288        compaction_region: &CompactionRegion,
289        picker_output: PickerOutput,
290    ) -> Result<MergeOutput>;
291
292    /// Update the manifest after merging SST files.
293    async fn update_manifest(
294        &self,
295        compaction_region: &CompactionRegion,
296        merge_output: MergeOutput,
297    ) -> Result<RegionEdit>;
298
299    /// Execute compaction for a region.
300    async fn compact(
301        &self,
302        compaction_region: &CompactionRegion,
303        compact_request_options: compact_request::Options,
304    ) -> Result<()>;
305}
306
307/// DefaultCompactor is the default implementation of Compactor.
308pub struct DefaultCompactor;
309
310#[async_trait::async_trait]
311impl Compactor for DefaultCompactor {
312    async fn merge_ssts(
313        &self,
314        compaction_region: &CompactionRegion,
315        mut picker_output: PickerOutput,
316    ) -> Result<MergeOutput> {
317        let mut futs = Vec::with_capacity(picker_output.outputs.len());
318        let mut compacted_inputs =
319            Vec::with_capacity(picker_output.outputs.iter().map(|o| o.inputs.len()).sum());
320        let internal_parallelism = compaction_region.max_parallelism.max(1);
321
322        for output in picker_output.outputs.drain(..) {
323            compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone()));
324            let write_opts = WriteOptions {
325                write_buffer_size: compaction_region.engine_config.sst_write_buffer_size,
326                max_file_size: picker_output.max_file_size,
327                ..Default::default()
328            };
329
330            let region_metadata = compaction_region.region_metadata.clone();
331            let sst_layer = compaction_region.access_layer.clone();
332            let region_id = compaction_region.region_id;
333            let cache_manager = compaction_region.cache_manager.clone();
334            let storage = compaction_region.region_options.storage.clone();
335            let index_options = compaction_region
336                .current_version
337                .options
338                .index_options
339                .clone();
340            let append_mode = compaction_region.current_version.options.append_mode;
341            let merge_mode = compaction_region.current_version.options.merge_mode();
342            let inverted_index_config = compaction_region.engine_config.inverted_index.clone();
343            let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone();
344            let bloom_filter_index_config =
345                compaction_region.engine_config.bloom_filter_index.clone();
346            let max_sequence = output
347                .inputs
348                .iter()
349                .map(|f| f.meta_ref().sequence)
350                .max()
351                .flatten();
352            futs.push(async move {
353                let input_file_names = output
354                    .inputs
355                    .iter()
356                    .map(|f| f.file_id().to_string())
357                    .join(",");
358                let reader = CompactionSstReaderBuilder {
359                    metadata: region_metadata.clone(),
360                    sst_layer: sst_layer.clone(),
361                    cache: cache_manager.clone(),
362                    inputs: &output.inputs,
363                    append_mode,
364                    filter_deleted: output.filter_deleted,
365                    time_range: output.output_time_range,
366                    merge_mode,
367                }
368                .build_sst_reader()
369                .await?;
370                let (sst_infos, metrics) = sst_layer
371                    .write_sst(
372                        SstWriteRequest {
373                            op_type: OperationType::Compact,
374                            metadata: region_metadata,
375                            source: Source::Reader(reader),
376                            cache_manager,
377                            storage,
378                            max_sequence: max_sequence.map(NonZero::get),
379                            index_options,
380                            inverted_index_config,
381                            fulltext_index_config,
382                            bloom_filter_index_config,
383                        },
384                        &write_opts,
385                        WriteType::Compaction,
386                    )
387                    .await?;
388                let output_files = sst_infos
389                    .into_iter()
390                    .map(|sst_info| FileMeta {
391                        region_id,
392                        file_id: sst_info.file_id,
393                        time_range: sst_info.time_range,
394                        level: output.output_level,
395                        file_size: sst_info.file_size,
396                        available_indexes: sst_info.index_metadata.build_available_indexes(),
397                        index_file_size: sst_info.index_metadata.file_size,
398                        num_rows: sst_info.num_rows as u64,
399                        num_row_groups: sst_info.num_row_groups,
400                        sequence: max_sequence,
401                    })
402                    .collect::<Vec<_>>();
403                let output_file_names =
404                    output_files.iter().map(|f| f.file_id.to_string()).join(",");
405                info!(
406                    "Region {} compaction inputs: [{}], outputs: [{}], metrics: {:?}",
407                    region_id, input_file_names, output_file_names, metrics
408                );
409                metrics.observe();
410                Ok(output_files)
411            });
412        }
413        let mut output_files = Vec::with_capacity(futs.len());
414        while !futs.is_empty() {
415            let mut task_chunk = Vec::with_capacity(internal_parallelism);
416            for _ in 0..internal_parallelism {
417                if let Some(task) = futs.pop() {
418                    task_chunk.push(common_runtime::spawn_compact(task));
419                }
420            }
421            let metas = futures::future::try_join_all(task_chunk)
422                .await
423                .context(JoinSnafu)?
424                .into_iter()
425                .collect::<Result<Vec<Vec<_>>>>()?;
426            output_files.extend(metas.into_iter().flatten());
427        }
428
429        let mut inputs: Vec<_> = compacted_inputs.into_iter().collect();
430        inputs.extend(
431            picker_output
432                .expired_ssts
433                .iter()
434                .map(|f| f.meta_ref().clone()),
435        );
436
437        Ok(MergeOutput {
438            files_to_add: output_files,
439            files_to_remove: inputs,
440            compaction_time_window: Some(picker_output.time_window_size),
441        })
442    }
443
444    async fn update_manifest(
445        &self,
446        compaction_region: &CompactionRegion,
447        merge_output: MergeOutput,
448    ) -> Result<RegionEdit> {
449        // Write region edit to manifest.
450        let edit = RegionEdit {
451            files_to_add: merge_output.files_to_add,
452            files_to_remove: merge_output.files_to_remove,
453            // Use current timestamp as the edit timestamp.
454            timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
455            compaction_time_window: merge_output
456                .compaction_time_window
457                .map(|seconds| Duration::from_secs(seconds as u64)),
458            flushed_entry_id: None,
459            flushed_sequence: None,
460        };
461
462        let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
463        // TODO: We might leak files if we fail to update manifest. We can add a cleanup task to remove them later.
464        compaction_region
465            .manifest_ctx
466            .update_manifest(RegionLeaderState::Writable, action_list)
467            .await?;
468
469        Ok(edit)
470    }
471
472    // The default implementation of compact combines the merge_ssts and update_manifest functions.
473    // Note: It's local compaction and only used for testing purpose.
474    async fn compact(
475        &self,
476        compaction_region: &CompactionRegion,
477        compact_request_options: compact_request::Options,
478    ) -> Result<()> {
479        let picker_output = {
480            let picker_output = new_picker(
481                &compact_request_options,
482                &compaction_region.region_options.compaction,
483                compaction_region.region_options.append_mode,
484            )
485            .pick(compaction_region);
486
487            if let Some(picker_output) = picker_output {
488                picker_output
489            } else {
490                info!(
491                    "No files to compact for region_id: {}",
492                    compaction_region.region_id
493                );
494                return Ok(());
495            }
496        };
497
498        let merge_output = self.merge_ssts(compaction_region, picker_output).await?;
499        if merge_output.is_empty() {
500            info!(
501                "No files to compact for region_id: {}",
502                compaction_region.region_id
503            );
504            return Ok(());
505        }
506
507        metrics::COMPACTION_INPUT_BYTES.inc_by(merge_output.input_file_size() as f64);
508        metrics::COMPACTION_OUTPUT_BYTES.inc_by(merge_output.output_file_size() as f64);
509        self.update_manifest(compaction_region, merge_output)
510            .await?;
511
512        Ok(())
513    }
514}