mito2/worker/
handle_compaction.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::region::compact_request;
16use common_telemetry::{error, info, warn};
17use store_api::region_request::RegionCompactRequest;
18use store_api::storage::RegionId;
19
20use crate::config::IndexBuildMode;
21use crate::error::RegionNotFoundSnafu;
22use crate::metrics::COMPACTION_REQUEST_COUNT;
23use crate::region::MitoRegionRef;
24use crate::request::{
25    BuildIndexRequest, CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx,
26};
27use crate::sst::index::IndexBuildType;
28use crate::worker::RegionWorkerLoop;
29
30impl<S> RegionWorkerLoop<S> {
31    /// Handles compaction request submitted to region worker.
32    pub(crate) async fn handle_compaction_request(
33        &mut self,
34        region_id: RegionId,
35        req: RegionCompactRequest,
36        mut sender: OptionOutputTx,
37    ) {
38        let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
39            return;
40        };
41        COMPACTION_REQUEST_COUNT.inc();
42        let parallelism = req.parallelism.unwrap_or(1) as usize;
43        if let Err(e) = self
44            .compaction_scheduler
45            .schedule_compaction(
46                region.region_id,
47                req.options,
48                &region.version_control,
49                &region.access_layer,
50                sender,
51                &region.manifest_ctx,
52                self.schema_metadata_manager.clone(),
53                parallelism,
54            )
55            .await
56        {
57            error!(e; "Failed to schedule compaction task for region: {}", region_id);
58        } else {
59            info!(
60                "Successfully scheduled compaction task for region: {}",
61                region_id
62            );
63        }
64    }
65
66    /// Handles compaction finished, update region version and manifest, deleted compacted files.
67    pub(crate) async fn handle_compaction_finished(
68        &mut self,
69        region_id: RegionId,
70        mut request: CompactionFinished,
71    ) {
72        let region = match self.regions.get_region(region_id) {
73            Some(region) => region,
74            None => {
75                request.on_failure(RegionNotFoundSnafu { region_id }.build());
76                return;
77            }
78        };
79        region.update_compaction_millis();
80
81        region.version_control.apply_edit(
82            Some(request.edit.clone()),
83            &[],
84            region.file_purger.clone(),
85        );
86
87        let index_build_file_metas = std::mem::take(&mut request.edit.files_to_add);
88
89        // compaction finished.
90        request.on_success();
91
92        // In async mode, create indexes after compact if new files are created.
93        if self.config.index.build_mode == IndexBuildMode::Async
94            && !index_build_file_metas.is_empty()
95        {
96            self.handle_rebuild_index(
97                BuildIndexRequest {
98                    region_id,
99                    build_type: IndexBuildType::Compact,
100                    file_metas: index_build_file_metas,
101                },
102                OptionOutputTx::new(None),
103            )
104            .await;
105        }
106
107        // Schedule next compaction if necessary.
108        self.compaction_scheduler
109            .on_compaction_finished(
110                region_id,
111                &region.manifest_ctx,
112                self.schema_metadata_manager.clone(),
113            )
114            .await;
115    }
116
117    /// When compaction fails, we simply log the error.
118    pub(crate) async fn handle_compaction_failure(&mut self, req: CompactionFailed) {
119        error!(req.err; "Failed to compact region: {}", req.region_id);
120
121        self.compaction_scheduler
122            .on_compaction_failed(req.region_id, req.err);
123    }
124
125    /// Schedule compaction for the region if necessary.
126    pub(crate) async fn schedule_compaction(&mut self, region: &MitoRegionRef) {
127        let now = self.time_provider.current_time_millis();
128        if now - region.last_compaction_millis()
129            >= self.config.min_compaction_interval.as_millis() as i64
130            && let Err(e) = self
131                .compaction_scheduler
132                .schedule_compaction(
133                    region.region_id,
134                    compact_request::Options::Regular(Default::default()),
135                    &region.version_control,
136                    &region.access_layer,
137                    OptionOutputTx::none(),
138                    &region.manifest_ctx,
139                    self.schema_metadata_manager.clone(),
140                    1, // Default for automatic compaction
141                )
142                .await
143        {
144            warn!(
145                "Failed to schedule compaction for region: {}, err: {}",
146                region.region_id, e
147            );
148        }
149    }
150}