mito2/worker/
handle_compaction.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::region::compact_request;
16use common_telemetry::{error, info, warn};
17use store_api::logstore::LogStore;
18use store_api::region_request::RegionCompactRequest;
19use store_api::storage::RegionId;
20
21use crate::config::IndexBuildMode;
22use crate::error::RegionNotFoundSnafu;
23use crate::metrics::COMPACTION_REQUEST_COUNT;
24use crate::region::MitoRegionRef;
25use crate::request::{
26    BuildIndexRequest, CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx,
27};
28use crate::sst::index::IndexBuildType;
29use crate::worker::RegionWorkerLoop;
30
31impl<S> RegionWorkerLoop<S> {
32    /// Handles compaction request submitted to region worker.
33    pub(crate) async fn handle_compaction_request(
34        &mut self,
35        region_id: RegionId,
36        req: RegionCompactRequest,
37        mut sender: OptionOutputTx,
38    ) {
39        let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
40            return;
41        };
42        COMPACTION_REQUEST_COUNT.inc();
43        let parallelism = req.parallelism.unwrap_or(1) as usize;
44        if let Err(e) = self
45            .compaction_scheduler
46            .schedule_compaction(
47                region.region_id,
48                req.options,
49                &region.version_control,
50                &region.access_layer,
51                sender,
52                &region.manifest_ctx,
53                self.schema_metadata_manager.clone(),
54                parallelism,
55            )
56            .await
57        {
58            error!(e; "Failed to schedule compaction task for region: {}", region_id);
59        } else {
60            info!(
61                "Successfully scheduled compaction task for region: {}",
62                region_id
63            );
64        }
65    }
66
67    /// Handles compaction finished, update region version and manifest, deleted compacted files.
68    pub(crate) async fn handle_compaction_finished(
69        &mut self,
70        region_id: RegionId,
71        mut request: CompactionFinished,
72    ) where
73        S: LogStore,
74    {
75        let region = match self.regions.get_region(region_id) {
76            Some(region) => region,
77            None => {
78                request.on_failure(RegionNotFoundSnafu { region_id }.build());
79                return;
80            }
81        };
82        region.update_compaction_millis();
83
84        region.version_control.apply_edit(
85            Some(request.edit.clone()),
86            &[],
87            region.file_purger.clone(),
88        );
89
90        let index_build_file_metas = std::mem::take(&mut request.edit.files_to_add);
91
92        // compaction finished.
93        request.on_success();
94
95        // In async mode, create indexes after compact if new files are created.
96        if self.config.index.build_mode == IndexBuildMode::Async
97            && !index_build_file_metas.is_empty()
98        {
99            self.handle_rebuild_index(
100                BuildIndexRequest {
101                    region_id,
102                    build_type: IndexBuildType::Compact,
103                    file_metas: index_build_file_metas,
104                },
105                OptionOutputTx::new(None),
106            )
107            .await;
108        }
109
110        // Schedule next compaction if necessary.
111        let mut pending_ddls = self
112            .compaction_scheduler
113            .on_compaction_finished(
114                region_id,
115                &region.manifest_ctx,
116                self.schema_metadata_manager.clone(),
117            )
118            .await;
119        self.handle_ddl_requests(&mut pending_ddls).await;
120    }
121
122    /// When compaction fails, we simply log the error.
123    pub(crate) async fn handle_compaction_failure(&mut self, req: CompactionFailed) {
124        error!(req.err; "Failed to compact region: {}", req.region_id);
125
126        self.compaction_scheduler
127            .on_compaction_failed(req.region_id, req.err);
128    }
129
130    /// Schedule compaction for the region if necessary.
131    pub(crate) async fn schedule_compaction(&mut self, region: &MitoRegionRef) {
132        if region.is_staging() || region.is_enter_staging() {
133            info!(
134                "Region {} is staging or entering staging, skip compaction",
135                region.region_id
136            );
137            return;
138        }
139        let now = self.time_provider.current_time_millis();
140        if now - region.last_compaction_millis()
141            >= self.config.min_compaction_interval.as_millis() as i64
142            && let Err(e) = self
143                .compaction_scheduler
144                .schedule_compaction(
145                    region.region_id,
146                    compact_request::Options::Regular(Default::default()),
147                    &region.version_control,
148                    &region.access_layer,
149                    OptionOutputTx::none(),
150                    &region.manifest_ctx,
151                    self.schema_metadata_manager.clone(),
152                    1, // Default for automatic compaction
153                )
154                .await
155        {
156            warn!(
157                "Failed to schedule compaction for region: {}, err: {}",
158                region.region_id, e
159            );
160        }
161    }
162}