mito2/compaction/
compactor.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::num::NonZero;
16use std::sync::Arc;
17use std::time::Duration;
18
19use api::v1::region::compact_request;
20use common_meta::key::SchemaMetadataManagerRef;
21use common_telemetry::{info, warn};
22use common_time::TimeToLive;
23use itertools::Itertools;
24use object_store::manager::ObjectStoreManagerRef;
25use serde::{Deserialize, Serialize};
26use snafu::{OptionExt, ResultExt};
27use store_api::metadata::RegionMetadataRef;
28use store_api::storage::RegionId;
29
30use crate::access_layer::{AccessLayer, AccessLayerRef, OperationType, SstWriteRequest};
31use crate::cache::{CacheManager, CacheManagerRef};
32use crate::compaction::picker::{new_picker, PickerOutput};
33use crate::compaction::{find_ttl, CompactionSstReaderBuilder};
34use crate::config::MitoConfig;
35use crate::error::{EmptyRegionDirSnafu, JoinSnafu, ObjectStoreNotFoundSnafu, Result};
36use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
37use crate::manifest::manager::{RegionManifestManager, RegionManifestOptions};
38use crate::manifest::storage::manifest_compress_type;
39use crate::read::Source;
40use crate::region::opener::new_manifest_dir;
41use crate::region::options::RegionOptions;
42use crate::region::version::VersionRef;
43use crate::region::{ManifestContext, RegionLeaderState, RegionRoleState};
44use crate::schedule::scheduler::LocalScheduler;
45use crate::sst::file::FileMeta;
46use crate::sst::file_purger::LocalFilePurger;
47use crate::sst::index::intermediate::IntermediateManager;
48use crate::sst::index::puffin_manager::PuffinManagerFactory;
49use crate::sst::parquet::WriteOptions;
50use crate::sst::version::{SstVersion, SstVersionRef};
51
52/// Region version for compaction that does not hold memtables.
53#[derive(Clone)]
54pub struct CompactionVersion {
55    /// Metadata of the region.
56    ///
57    /// Altering metadata isn't frequent, storing metadata in Arc to allow sharing
58    /// metadata and reuse metadata when creating a new `Version`.
59    pub(crate) metadata: RegionMetadataRef,
60    /// Options of the region.
61    pub(crate) options: RegionOptions,
62    /// SSTs of the region.
63    pub(crate) ssts: SstVersionRef,
64    /// Inferred compaction time window.
65    pub(crate) compaction_time_window: Option<Duration>,
66}
67
68impl From<VersionRef> for CompactionVersion {
69    fn from(value: VersionRef) -> Self {
70        Self {
71            metadata: value.metadata.clone(),
72            options: value.options.clone(),
73            ssts: value.ssts.clone(),
74            compaction_time_window: value.compaction_time_window,
75        }
76    }
77}
78
79/// CompactionRegion represents a region that needs to be compacted.
80/// It's the subset of MitoRegion.
81#[derive(Clone)]
82pub struct CompactionRegion {
83    pub region_id: RegionId,
84    pub region_options: RegionOptions,
85    pub region_dir: String,
86
87    pub(crate) engine_config: Arc<MitoConfig>,
88    pub(crate) region_metadata: RegionMetadataRef,
89    pub(crate) cache_manager: CacheManagerRef,
90    pub(crate) access_layer: AccessLayerRef,
91    pub(crate) manifest_ctx: Arc<ManifestContext>,
92    pub(crate) current_version: CompactionVersion,
93    pub(crate) file_purger: Option<Arc<LocalFilePurger>>,
94    pub(crate) ttl: Option<TimeToLive>,
95
96    /// Controls the parallelism of this compaction task. Default is 1.
97    ///
98    /// The parallel is inside this compaction task, not across different compaction tasks.
99    /// It can be different windows of the same compaction task or something like this.
100    pub max_parallelism: usize,
101}
102
103/// OpenCompactionRegionRequest represents the request to open a compaction region.
104#[derive(Debug, Clone)]
105pub struct OpenCompactionRegionRequest {
106    pub region_id: RegionId,
107    pub region_dir: String,
108    pub region_options: RegionOptions,
109    pub max_parallelism: usize,
110}
111
112/// Open a compaction region from a compaction request.
113/// It's simple version of RegionOpener::open().
114pub async fn open_compaction_region(
115    req: &OpenCompactionRegionRequest,
116    mito_config: &MitoConfig,
117    object_store_manager: ObjectStoreManagerRef,
118    schema_metadata_manager: SchemaMetadataManagerRef,
119) -> Result<CompactionRegion> {
120    let object_store = {
121        let name = &req.region_options.storage;
122        if let Some(name) = name {
123            object_store_manager
124                .find(name)
125                .context(ObjectStoreNotFoundSnafu {
126                    object_store: name.to_string(),
127                })?
128        } else {
129            object_store_manager.default_object_store()
130        }
131    };
132
133    let access_layer = {
134        let puffin_manager_factory = PuffinManagerFactory::new(
135            &mito_config.index.aux_path,
136            mito_config.index.staging_size.as_bytes(),
137            Some(mito_config.index.write_buffer_size.as_bytes() as _),
138            mito_config.index.staging_ttl,
139        )
140        .await?;
141        let intermediate_manager =
142            IntermediateManager::init_fs(mito_config.index.aux_path.clone()).await?;
143
144        Arc::new(AccessLayer::new(
145            req.region_dir.as_str(),
146            object_store.clone(),
147            puffin_manager_factory,
148            intermediate_manager,
149        ))
150    };
151
152    let manifest_manager = {
153        let region_manifest_options = RegionManifestOptions {
154            manifest_dir: new_manifest_dir(req.region_dir.as_str()),
155            object_store: object_store.clone(),
156            compress_type: manifest_compress_type(mito_config.compress_manifest),
157            checkpoint_distance: mito_config.manifest_checkpoint_distance,
158        };
159
160        RegionManifestManager::open(
161            region_manifest_options,
162            Default::default(),
163            Default::default(),
164        )
165        .await?
166        .context(EmptyRegionDirSnafu {
167            region_id: req.region_id,
168            region_dir: req.region_dir.as_str(),
169        })?
170    };
171
172    let manifest = manifest_manager.manifest();
173    let region_metadata = manifest.metadata.clone();
174    let manifest_ctx = Arc::new(ManifestContext::new(
175        manifest_manager,
176        RegionRoleState::Leader(RegionLeaderState::Writable),
177    ));
178
179    let file_purger = {
180        let purge_scheduler = Arc::new(LocalScheduler::new(mito_config.max_background_purges));
181        Arc::new(LocalFilePurger::new(
182            purge_scheduler.clone(),
183            access_layer.clone(),
184            None,
185        ))
186    };
187
188    let current_version = {
189        let mut ssts = SstVersion::new();
190        ssts.add_files(file_purger.clone(), manifest.files.values().cloned());
191        CompactionVersion {
192            metadata: region_metadata.clone(),
193            options: req.region_options.clone(),
194            ssts: Arc::new(ssts),
195            compaction_time_window: manifest.compaction_time_window,
196        }
197    };
198
199    let ttl = find_ttl(
200        req.region_id.table_id(),
201        current_version.options.ttl,
202        &schema_metadata_manager,
203    )
204    .await
205    .unwrap_or_else(|e| {
206        warn!(e; "Failed to get ttl for region: {}", region_metadata.region_id);
207        TimeToLive::default()
208    });
209    Ok(CompactionRegion {
210        region_id: req.region_id,
211        region_options: req.region_options.clone(),
212        region_dir: req.region_dir.clone(),
213        engine_config: Arc::new(mito_config.clone()),
214        region_metadata: region_metadata.clone(),
215        cache_manager: Arc::new(CacheManager::default()),
216        access_layer,
217        manifest_ctx,
218        current_version,
219        file_purger: Some(file_purger),
220        ttl: Some(ttl),
221        max_parallelism: req.max_parallelism,
222    })
223}
224
225impl CompactionRegion {
226    pub fn file_purger(&self) -> Option<Arc<LocalFilePurger>> {
227        self.file_purger.clone()
228    }
229}
230
231/// `[MergeOutput]` represents the output of merging SST files.
232#[derive(Default, Clone, Debug, Serialize, Deserialize)]
233pub struct MergeOutput {
234    pub files_to_add: Vec<FileMeta>,
235    pub files_to_remove: Vec<FileMeta>,
236    pub compaction_time_window: Option<i64>,
237}
238
239impl MergeOutput {
240    pub fn is_empty(&self) -> bool {
241        self.files_to_add.is_empty() && self.files_to_remove.is_empty()
242    }
243}
244
245/// Compactor is the trait that defines the compaction logic.
246#[async_trait::async_trait]
247pub trait Compactor: Send + Sync + 'static {
248    /// Merge SST files for a region.
249    async fn merge_ssts(
250        &self,
251        compaction_region: &CompactionRegion,
252        picker_output: PickerOutput,
253    ) -> Result<MergeOutput>;
254
255    /// Update the manifest after merging SST files.
256    async fn update_manifest(
257        &self,
258        compaction_region: &CompactionRegion,
259        merge_output: MergeOutput,
260    ) -> Result<RegionEdit>;
261
262    /// Execute compaction for a region.
263    async fn compact(
264        &self,
265        compaction_region: &CompactionRegion,
266        compact_request_options: compact_request::Options,
267    ) -> Result<()>;
268}
269
270/// DefaultCompactor is the default implementation of Compactor.
271pub struct DefaultCompactor;
272
273#[async_trait::async_trait]
274impl Compactor for DefaultCompactor {
275    async fn merge_ssts(
276        &self,
277        compaction_region: &CompactionRegion,
278        mut picker_output: PickerOutput,
279    ) -> Result<MergeOutput> {
280        let mut futs = Vec::with_capacity(picker_output.outputs.len());
281        let mut compacted_inputs =
282            Vec::with_capacity(picker_output.outputs.iter().map(|o| o.inputs.len()).sum());
283        let internal_parallelism = compaction_region.max_parallelism.max(1);
284
285        for output in picker_output.outputs.drain(..) {
286            compacted_inputs.extend(output.inputs.iter().map(|f| f.meta_ref().clone()));
287            let write_opts = WriteOptions {
288                write_buffer_size: compaction_region.engine_config.sst_write_buffer_size,
289                ..Default::default()
290            };
291
292            let region_metadata = compaction_region.region_metadata.clone();
293            let sst_layer = compaction_region.access_layer.clone();
294            let region_id = compaction_region.region_id;
295            let cache_manager = compaction_region.cache_manager.clone();
296            let storage = compaction_region.region_options.storage.clone();
297            let index_options = compaction_region
298                .current_version
299                .options
300                .index_options
301                .clone();
302            let append_mode = compaction_region.current_version.options.append_mode;
303            let merge_mode = compaction_region.current_version.options.merge_mode();
304            let inverted_index_config = compaction_region.engine_config.inverted_index.clone();
305            let fulltext_index_config = compaction_region.engine_config.fulltext_index.clone();
306            let bloom_filter_index_config =
307                compaction_region.engine_config.bloom_filter_index.clone();
308            let max_sequence = output
309                .inputs
310                .iter()
311                .map(|f| f.meta_ref().sequence)
312                .max()
313                .flatten();
314            futs.push(async move {
315                let input_file_names = output
316                    .inputs
317                    .iter()
318                    .map(|f| f.file_id().to_string())
319                    .join(",");
320                let reader = CompactionSstReaderBuilder {
321                    metadata: region_metadata.clone(),
322                    sst_layer: sst_layer.clone(),
323                    cache: cache_manager.clone(),
324                    inputs: &output.inputs,
325                    append_mode,
326                    filter_deleted: output.filter_deleted,
327                    time_range: output.output_time_range,
328                    merge_mode,
329                }
330                .build_sst_reader()
331                .await?;
332                let output_files = sst_layer
333                    .write_sst(
334                        SstWriteRequest {
335                            op_type: OperationType::Compact,
336                            metadata: region_metadata,
337                            source: Source::Reader(reader),
338                            cache_manager,
339                            storage,
340                            max_sequence: max_sequence.map(NonZero::get),
341                            index_options,
342                            inverted_index_config,
343                            fulltext_index_config,
344                            bloom_filter_index_config,
345                        },
346                        &write_opts,
347                    )
348                    .await?
349                    .into_iter()
350                    .map(|sst_info| FileMeta {
351                        region_id,
352                        file_id: sst_info.file_id,
353                        time_range: sst_info.time_range,
354                        level: output.output_level,
355                        file_size: sst_info.file_size,
356                        available_indexes: sst_info.index_metadata.build_available_indexes(),
357                        index_file_size: sst_info.index_metadata.file_size,
358                        num_rows: sst_info.num_rows as u64,
359                        num_row_groups: sst_info.num_row_groups,
360                        sequence: max_sequence,
361                    })
362                    .collect::<Vec<_>>();
363                let output_file_names =
364                    output_files.iter().map(|f| f.file_id.to_string()).join(",");
365                info!(
366                    "Region {} compaction inputs: [{}], outputs: [{}]",
367                    region_id, input_file_names, output_file_names
368                );
369                Ok(output_files)
370            });
371        }
372        let mut output_files = Vec::with_capacity(futs.len());
373        while !futs.is_empty() {
374            let mut task_chunk = Vec::with_capacity(internal_parallelism);
375            for _ in 0..internal_parallelism {
376                if let Some(task) = futs.pop() {
377                    task_chunk.push(common_runtime::spawn_compact(task));
378                }
379            }
380            let metas = futures::future::try_join_all(task_chunk)
381                .await
382                .context(JoinSnafu)?
383                .into_iter()
384                .collect::<Result<Vec<Vec<_>>>>()?;
385            output_files.extend(metas.into_iter().flatten());
386        }
387
388        let mut inputs: Vec<_> = compacted_inputs.into_iter().collect();
389        inputs.extend(
390            picker_output
391                .expired_ssts
392                .iter()
393                .map(|f| f.meta_ref().clone()),
394        );
395
396        Ok(MergeOutput {
397            files_to_add: output_files,
398            files_to_remove: inputs,
399            compaction_time_window: Some(picker_output.time_window_size),
400        })
401    }
402
403    async fn update_manifest(
404        &self,
405        compaction_region: &CompactionRegion,
406        merge_output: MergeOutput,
407    ) -> Result<RegionEdit> {
408        // Write region edit to manifest.
409        let edit = RegionEdit {
410            files_to_add: merge_output.files_to_add,
411            files_to_remove: merge_output.files_to_remove,
412            compaction_time_window: merge_output
413                .compaction_time_window
414                .map(|seconds| Duration::from_secs(seconds as u64)),
415            flushed_entry_id: None,
416            flushed_sequence: None,
417        };
418
419        let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
420        // TODO: We might leak files if we fail to update manifest. We can add a cleanup task to remove them later.
421        compaction_region
422            .manifest_ctx
423            .update_manifest(RegionLeaderState::Writable, action_list)
424            .await?;
425
426        Ok(edit)
427    }
428
429    // The default implementation of compact combines the merge_ssts and update_manifest functions.
430    // Note: It's local compaction and only used for testing purpose.
431    async fn compact(
432        &self,
433        compaction_region: &CompactionRegion,
434        compact_request_options: compact_request::Options,
435    ) -> Result<()> {
436        let picker_output = {
437            let picker_output = new_picker(
438                &compact_request_options,
439                &compaction_region.region_options.compaction,
440                compaction_region.region_options.append_mode,
441            )
442            .pick(compaction_region);
443
444            if let Some(picker_output) = picker_output {
445                picker_output
446            } else {
447                info!(
448                    "No files to compact for region_id: {}",
449                    compaction_region.region_id
450                );
451                return Ok(());
452            }
453        };
454
455        let merge_output = self.merge_ssts(compaction_region, picker_output).await?;
456        if merge_output.is_empty() {
457            info!(
458                "No files to compact for region_id: {}",
459                compaction_region.region_id
460            );
461            return Ok(());
462        }
463        self.update_manifest(compaction_region, merge_output)
464            .await?;
465
466        Ok(())
467    }
468}