mito2/compaction/
compactor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::num::NonZero;
16use std::sync::Arc;
17use std::time::Duration;
18
19use api::v1::region::compact_request;
20use common_meta::key::SchemaMetadataManagerRef;
21use common_telemetry::{info, warn};
22use common_time::TimeToLive;
23use either::Either;
24use itertools::Itertools;
25use object_store::manager::ObjectStoreManagerRef;
26use serde::{Deserialize, Serialize};
27use snafu::{OptionExt, ResultExt};
28use store_api::metadata::RegionMetadataRef;
29use store_api::region_request::PathType;
30use store_api::storage::RegionId;
31
32use crate::access_layer::{AccessLayer, AccessLayerRef, OperationType, SstWriteRequest, WriteType};
33use crate::cache::{CacheManager, CacheManagerRef};
34use crate::compaction::picker::{new_picker, PickerOutput};
35use crate::compaction::{find_ttl, CompactionSstReaderBuilder};
36use crate::config::MitoConfig;
37use crate::error::{EmptyRegionDirSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Result};
38use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
39use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
40use crate::manifest::storage::manifest_compress_type;
41use crate::metrics;
42use crate::read::Source;
43use crate::region::opener::new_manifest_dir;
44use crate::region::options::RegionOptions;
45use crate::region::version::VersionRef;
46use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState};
47use crate::schedule::scheduler::LocalScheduler;
48use crate::sst::file::FileMeta;
49use crate::sst::file_purger::LocalFilePurger;
50use crate::sst::index::intermediate::IntermediateManager;
51use crate::sst::index::puffin_manager::PuffinManagerFactory;
52use crate::sst::location::region_dir_from_table_dir;
53use crate::sst::parquet::WriteOptions;
54use crate::sst::version::{SstVersion, SstVersionRef};
55
56/// Region version for compaction that does not hold memtables.
57#[derive(Clone)]
58pub struct CompactionVersion {
59    /// Metadata of the region.
60    ///
61    /// Altering metadata isn't frequent, storing metadata in Arc to allow sharing
62    /// metadata and reuse metadata when creating a new `Version`.
63    pub(crate) metadata: RegionMetadataRef,
64    /// Options of the region.
65    pub(crate) options: RegionOptions,
66    /// SSTs of the region.
67    pub(crate) ssts: SstVersionRef,
68    /// Inferred compaction time window.
69    pub(crate) compaction_time_window: Option<Duration>,
70}
71
72impl From<VersionRef> for CompactionVersion {
73    fn from(value: VersionRef) -> Self {
74        Self {
75            metadata: value.metadata.clone(),
76            options: value.options.clone(),
77            ssts: value.ssts.clone(),
78            compaction_time_window: value.compaction_time_window,
79        }
80    }
81}
82
83/// CompactionRegion represents a region that needs to be compacted.
84/// It's the subset of MitoRegion.
85#[derive(Clone)]
86pub struct CompactionRegion {
87    pub region_id: RegionId,
88    pub region_options: RegionOptions,
89
90    pub(crate) engine_config: Arc<MitoConfig>,
91    pub(crate) region_metadata: RegionMetadataRef,
92    pub(crate) cache_manager: CacheManagerRef,
93    pub(crate) access_layer: AccessLayerRef,
94    pub(crate) manifest_ctx: Arc<ManifestContext>,
95    pub(crate) current_version: CompactionVersion,
96    pub(crate) file_purger: Option<Arc<LocalFilePurger>>,
97    pub(crate) ttl: Option<TimeToLive>,
98
99    /// Controls the parallelism of this compaction task. Default is 1.
100    ///
101    /// The parallel is inside this compaction task, not across different compaction tasks.
102    /// It can be different windows of the same compaction task or something like this.
103    pub max_parallelism: usize,
104}
105
106/// OpenCompactionRegionRequest represents the request to open a compaction region.
107#[derive(Debug, Clone)]
108pub struct OpenCompactionRegionRequest {
109    pub region_id: RegionId,
110    pub table_dir: String,
111    pub path_type: PathType,
112    pub region_options: RegionOptions,
113    pub max_parallelism: usize,
114}
115
116/// Open a compaction region from a compaction request.
117/// It's simple version of RegionOpener::open().
118pub async fn open_compaction_region(
119    req: &OpenCompactionRegionRequest,
120    mito_config: &MitoConfig,
121    object_store_manager: ObjectStoreManagerRef,
122    ttl_provider: Either<TimeToLive, SchemaMetadataManagerRef>,
123) -> Result<CompactionRegion> {
124    let object_store = {
125        let name = &req.region_options.storage;
126        if let Some(name) = name {
127            object_store_manager
128                .find(name)
129                .context(ObjectStoreNotFoundSnafu {
130                    object_store: name.to_string(),
131                })?
132        } else {
133            object_store_manager.default_object_store()
134        }
135    };
136
137    let access_layer = {
138        let puffin_manager_factory = PuffinManagerFactory::new(
139            &mito_config.index.aux_path,
140            mito_config.index.staging_size.as_bytes(),
141            Some(mito_config.index.write_buffer_size.as_bytes() as _),
142            mito_config.index.staging_ttl,
143        )
144        .await?;
145        let intermediate_manager =
146            IntermediateManager::init_fs(mito_config.index.aux_path.clone()).await?;
147
148        Arc::new(AccessLayer::new(
149            &req.table_dir,
150            req.path_type,
151            object_store.clone(),
152            puffin_manager_factory,
153            intermediate_manager,
154        ))
155    };
156
157    let manifest_manager = {
158        let region_manifest_options = RegionManifestOptions {
159            manifest_dir: new_manifest_dir(&region_dir_from_table_dir(
160                &req.table_dir,
161                req.region_id,
162                req.path_type,
163            )),
164            object_store: object_store.clone(),
165            compress_type: manifest_compress_type(mito_config.compress_manifest),
166            checkpoint_distance: mito_config.manifest_checkpoint_distance,
167        };
168
169        RegionManifestManager::open(
170            region_manifest_options,
171            Default::default(),
172            Default::default(),
173        )
174        .await?
175        .context(EmptyRegionDirSnafu {
176            region_id: req.region_id,
177            region_dir: &region_dir_from_table_dir(&req.table_dir, req.region_id, req.path_type),
178        })?
179    };
180
181    let manifest = manifest_manager.manifest();
182    let region_metadata = manifest.metadata.clone();
183    let manifest_ctx = Arc::new(ManifestContext::new(
184        manifest_manager,
185        RegionRoleState::Leader(RegionLeaderState::Writable),
186    ));
187
188    let file_purger = {
189        let purge_scheduler = Arc::new(LocalScheduler::new(mito_config.max_background_purges));
190        Arc::new(LocalFilePurger::new(
191            purge_scheduler.clone(),
192            access_layer.clone(),
193            None,
194        ))
195    };
196
197    let current_version = {
198        let mut ssts = SstVersion::new();
199        ssts.add_files(file_purger.clone(), manifest.files.values().cloned());
200        CompactionVersion {
201            metadata: region_metadata.clone(),
202            options: req.region_options.clone(),
203            ssts: Arc::new(ssts),
204            compaction_time_window: manifest.compaction_time_window,
205        }
206    };
207
208    let ttl = match ttl_provider {
209        // Use the specified ttl.
210        Either::Left(ttl) => ttl,
211        // Get the ttl from the schema metadata manager.
212        Either::Right(schema_metadata_manager) => find_ttl(
213            req.region_id.table_id(),
214            current_version.options.ttl,
215            &schema_metadata_manager,
216        )
217        .await
218        .unwrap_or_else(|e| {
219            warn!(e; "Failed to get ttl for region: {}", region_metadata.region_id);
220            TimeToLive::default()
221        }),
222    };
223
224    Ok(CompactionRegion {
225        region_id: req.region_id,
226        region_options: req.region_options.clone(),
227        engine_config: Arc::new(mito_config.clone()),
228        region_metadata: region_metadata.clone(),
229        cache_manager: Arc::new(CacheManager::default()),
230        access_layer,
231        manifest_ctx,
232        current_version,
233        file_purger: Some(file_purger),
234        ttl: Some(ttl),
235        max_parallelism: req.max_parallelism,
236    })
237}
238
239impl CompactionRegion {
240    pub fn file_purger(&self) -> Option<Arc<LocalFilePurger>> {
241        self.file_purger.clone()
242    }
243}
244
245/// `[MergeOutput]` represents the output of merging SST files.
246#[derive(Default, Clone, Debug, Serialize, Deserialize)]
247pub struct MergeOutput {
248    pub files_to_add: Vec<FileMeta>,
249    pub files_to_remove: Vec<FileMeta>,
250    pub compaction_time_window: Option<i64>,
251}
252
253impl MergeOutput {
254    pub fn is_empty(&self) -> bool {
255        self.files_to_add.is_empty() && self.files_to_remove.is_empty()
256    }
257
258    pub fn input_file_size(&self) -> u64 {
259        self.files_to_remove.iter().map(|f| f.file_size).sum()
260    }
261
262    pub fn output_file_size(&self) -> u64 {
263        self.files_to_add.iter().map(|f| f.file_size).sum()
264    }
265}
266
267/// Compactor is the trait that defines the compaction logic.
268#[async_trait::async_trait]
269pub trait Compactor: Send + Sync + 'static {
270    /// Merge SST files for a region.
271    async fn merge_ssts(
272        &self,
273        compaction_region: &CompactionRegion,
274        picker_output: PickerOutput,
275    ) -> Result<MergeOutput>;
276
277    /// Update the manifest after merging SST files.
278    async fn update_manifest(
279        &self,
280        compaction_region: &CompactionRegion,
281        merge_output: MergeOutput,
282    ) -> Result<RegionEdit>;
283
284    /// Execute compaction for a region.
285    async fn compact(
286        &self,
287        compaction_region: &CompactionRegion,
288        compact_request_options: compact_request::Options,
289    ) -> Result<()>;
290}
291
292/// DefaultCompactor is the default implementation of Compactor.
293pub struct DefaultCompactor;
294
295#[async_trait::async_trait]
296impl Compactor for DefaultCompactor {
297    async fn merge_ssts(
298        &self,
299        compaction_region: &CompactionRegion,
300        mut picker_output: PickerOutput,
301    ) -> Result<MergeOutput> {
302        let mut futs = Vec::with_capacity(picker_output.outputs.len());
303        let mut compacted_inputs =
304            Vec::with_capacity(picker_output.outputs.iter().map(|o| o.inputs.len()).sum());
305        let internal_parallelism = compaction_region.max_parallelism.max(1);
306
307        for output in picker_output.outputs.drain(..) {
308            compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone()));
309            let write_opts = WriteOptions {
310                write_buffer_size: compaction_region.engine_config.sst_write_buffer_size,
311                max_file_size: picker_output.max_file_size,
312                ..Default::default()
313            };
314
315            let region_metadata = compaction_region.region_metadata.clone();
316            let sst_layer = compaction_region.access_layer.clone();
317            let region_id = compaction_region.region_id;
318            let cache_manager = compaction_region.cache_manager.clone();
319            let storage = compaction_region.region_options.storage.clone();
320            let index_options = compaction_region
321                .current_version
322                .options
323                .index_options
324                .clone();
325            let append_mode = compaction_region.current_version.options.append_mode;
326            let merge_mode = compaction_region.current_version.options.merge_mode();
327            let inverted_index_config = compaction_region.engine_config.inverted_index.clone();
328            let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone();
329            let bloom_filter_index_config =
330                compaction_region.engine_config.bloom_filter_index.clone();
331            let max_sequence = output
332                .inputs
333                .iter()
334                .map(|f| f.meta_ref().sequence)
335                .max()
336                .flatten();
337            futs.push(async move {
338                let input_file_names = output
339                    .inputs
340                    .iter()
341                    .map(|f| f.file_id().to_string())
342                    .join(",");
343                let reader = CompactionSstReaderBuilder {
344                    metadata: region_metadata.clone(),
345                    sst_layer: sst_layer.clone(),
346                    cache: cache_manager.clone(),
347                    inputs: &output.inputs,
348                    append_mode,
349                    filter_deleted: output.filter_deleted,
350                    time_range: output.output_time_range,
351                    merge_mode,
352                }
353                .build_sst_reader()
354                .await?;
355                let (sst_infos, metrics) = sst_layer
356                    .write_sst(
357                        SstWriteRequest {
358                            op_type: OperationType::Compact,
359                            metadata: region_metadata,
360                            source: Source::Reader(reader),
361                            cache_manager,
362                            storage,
363                            max_sequence: max_sequence.map(NonZero::get),
364                            index_options,
365                            inverted_index_config,
366                            fulltext_index_config,
367                            bloom_filter_index_config,
368                        },
369                        &write_opts,
370                        WriteType::Compaction,
371                    )
372                    .await?;
373                let output_files = sst_infos
374                    .into_iter()
375                    .map(|sst_info| FileMeta {
376                        region_id,
377                        file_id: sst_info.file_id,
378                        time_range: sst_info.time_range,
379                        level: output.output_level,
380                        file_size: sst_info.file_size,
381                        available_indexes: sst_info.index_metadata.build_available_indexes(),
382                        index_file_size: sst_info.index_metadata.file_size,
383                        num_rows: sst_info.num_rows as u64,
384                        num_row_groups: sst_info.num_row_groups,
385                        sequence: max_sequence,
386                    })
387                    .collect::<Vec<_>>();
388                let output_file_names =
389                    output_files.iter().map(|f| f.file_id.to_string()).join(",");
390                info!(
391                    "Region {} compaction inputs: [{}], outputs: [{}], metrics: {:?}",
392                    region_id, input_file_names, output_file_names, metrics
393                );
394                metrics.observe();
395                Ok(output_files)
396            });
397        }
398        let mut output_files = Vec::with_capacity(futs.len());
399        while !futs.is_empty() {
400            let mut task_chunk = Vec::with_capacity(internal_parallelism);
401            for _ in 0..internal_parallelism {
402                if let Some(task) = futs.pop() {
403                    task_chunk.push(common_runtime::spawn_compact(task));
404                }
405            }
406            let metas = futures::future::try_join_all(task_chunk)
407                .await
408                .context(JoinSnafu)?
409                .into_iter()
410                .collect::<Result<Vec<Vec<_>>>>()?;
411            output_files.extend(metas.into_iter().flatten());
412        }
413
414        let mut inputs: Vec<_> = compacted_inputs.into_iter().collect();
415        inputs.extend(
416            picker_output
417                .expired_ssts
418                .iter()
419                .map(|f| f.meta_ref().clone()),
420        );
421
422        Ok(MergeOutput {
423            files_to_add: output_files,
424            files_to_remove: inputs,
425            compaction_time_window: Some(picker_output.time_window_size),
426        })
427    }
428
429    async fn update_manifest(
430        &self,
431        compaction_region: &CompactionRegion,
432        merge_output: MergeOutput,
433    ) -> Result<RegionEdit> {
434        // Write region edit to manifest.
435        let edit = RegionEdit {
436            files_to_add: merge_output.files_to_add,
437            files_to_remove: merge_output.files_to_remove,
438            compaction_time_window: merge_output
439                .compaction_time_window
440                .map(|seconds| Duration::from_secs(seconds as u64)),
441            flushed_entry_id: None,
442            flushed_sequence: None,
443        };
444
445        let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
446        // TODO: We might leak files if we fail to update manifest. We can add a cleanup task to remove them later.
447        compaction_region
448            .manifest_ctx
449            .update_manifest(RegionLeaderState::Writable, action_list)
450            .await?;
451
452        Ok(edit)
453    }
454
455    // The default implementation of compact combines the merge_ssts and update_manifest functions.
456    // Note: It's local compaction and only used for testing purpose.
457    async fn compact(
458        &self,
459        compaction_region: &CompactionRegion,
460        compact_request_options: compact_request::Options,
461    ) -> Result<()> {
462        let picker_output = {
463            let picker_output = new_picker(
464                &compact_request_options,
465                &compaction_region.region_options.compaction,
466                compaction_region.region_options.append_mode,
467            )
468            .pick(compaction_region);
469
470            if let Some(picker_output) = picker_output {
471                picker_output
472            } else {
473                info!(
474                    "No files to compact for region_id: {}",
475                    compaction_region.region_id
476                );
477                return Ok(());
478            }
479        };
480
481        let merge_output = self.merge_ssts(compaction_region, picker_output).await?;
482        if merge_output.is_empty() {
483            info!(
484                "No files to compact for region_id: {}",
485                compaction_region.region_id
486            );
487            return Ok(());
488        }
489
490        metrics::COMPACTION_INPUT_BYTES.inc_by(merge_output.input_file_size() as f64);
491        metrics::COMPACTION_OUTPUT_BYTES.inc_by(merge_output.output_file_size() as f64);
492        self.update_manifest(compaction_region, merge_output)
493            .await?;
494
495        Ok(())
496    }
497}