Skip to main content

mito2/worker/
handle_compaction.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::region::compact_request;
16use common_telemetry::{error, info, warn};
17use store_api::logstore::LogStore;
18use store_api::region_request::RegionCompactRequest;
19use store_api::storage::RegionId;
20
21use crate::config::IndexBuildMode;
22use crate::error::RegionNotFoundSnafu;
23use crate::metrics::COMPACTION_REQUEST_COUNT;
24use crate::region::MitoRegionRef;
25use crate::request::{
26    BuildIndexRequest, CompactionCancelled, CompactionFailed, CompactionFinished, OnFailure,
27    OptionOutputTx,
28};
29use crate::sst::index::IndexBuildType;
30use crate::worker::RegionWorkerLoop;
31
32impl<S> RegionWorkerLoop<S> {
33    /// Handles compaction request submitted to region worker.
34    pub(crate) async fn handle_compaction_request(
35        &mut self,
36        region_id: RegionId,
37        req: RegionCompactRequest,
38        mut sender: OptionOutputTx,
39    ) {
40        let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
41            return;
42        };
43        COMPACTION_REQUEST_COUNT.inc();
44        let parallelism = req.parallelism.unwrap_or(1) as usize;
45        if let Err(e) = self
46            .compaction_scheduler
47            .schedule_compaction(
48                region.region_id,
49                req.options,
50                &region.version_control,
51                &region.access_layer,
52                sender,
53                &region.manifest_ctx,
54                self.schema_metadata_manager.clone(),
55                parallelism,
56            )
57            .await
58        {
59            error!(e; "Failed to schedule compaction task for region: {}", region_id);
60        } else {
61            info!(
62                "Successfully scheduled compaction task for region: {}",
63                region_id
64            );
65        }
66    }
67
68    /// Handles compaction finished, update region version and manifest, deleted compacted files.
69    pub(crate) async fn handle_compaction_finished(
70        &mut self,
71        region_id: RegionId,
72        mut request: CompactionFinished,
73    ) where
74        S: LogStore,
75    {
76        let region = match self.regions.get_region(region_id) {
77            Some(region) => region,
78            None => {
79                request.on_failure(RegionNotFoundSnafu { region_id }.build());
80                return;
81            }
82        };
83        region.update_compaction_millis();
84
85        region.version_control.apply_edit(
86            Some(request.edit.clone()),
87            &[],
88            region.file_purger.clone(),
89        );
90
91        let index_build_file_metas = std::mem::take(&mut request.edit.files_to_add);
92
93        // compaction finished.
94        request.on_success();
95
96        // In async mode, create indexes after compact if new files are created.
97        if self.config.index.build_mode == IndexBuildMode::Async
98            && !index_build_file_metas.is_empty()
99        {
100            self.handle_rebuild_index(
101                BuildIndexRequest {
102                    region_id,
103                    build_type: IndexBuildType::Compact,
104                    file_metas: index_build_file_metas,
105                },
106                OptionOutputTx::new(None),
107            )
108            .await;
109        }
110
111        // Schedule next compaction if necessary.
112        let mut pending_ddls = self
113            .compaction_scheduler
114            .on_compaction_finished(
115                region_id,
116                &region.manifest_ctx,
117                self.schema_metadata_manager.clone(),
118            )
119            .await;
120        self.handle_ddl_requests(&mut pending_ddls).await;
121    }
122
123    pub(crate) async fn handle_compaction_cancelled(
124        &mut self,
125        region_id: RegionId,
126        request: CompactionCancelled,
127    ) where
128        S: LogStore,
129    {
130        request.on_success();
131
132        // Reuse the scheduler's finish path to wake pending DDLs after a cooperative stop.
133        let mut pending_ddls = match self.regions.get_region(region_id) {
134            Some(_) => {
135                self.compaction_scheduler
136                    .on_compaction_cancelled(region_id)
137                    .await
138            }
139            None => Vec::new(),
140        };
141
142        self.handle_ddl_requests(&mut pending_ddls).await;
143    }
144
145    /// When compaction fails, we simply log the error.
146    pub(crate) async fn handle_compaction_failure(&mut self, req: CompactionFailed) {
147        error!(req.err; "Failed to compact region: {}", req.region_id);
148
149        self.compaction_scheduler
150            .on_compaction_failed(req.region_id, req.err);
151    }
152
153    /// Schedule compaction for the region if necessary.
154    pub(crate) async fn schedule_compaction(&mut self, region: &MitoRegionRef) {
155        if region.is_staging() || region.is_enter_staging() {
156            info!(
157                "Region {} is staging or entering staging, skip compaction",
158                region.region_id
159            );
160            return;
161        }
162        let now = self.time_provider.current_time_millis();
163        if now - region.last_compaction_millis()
164            >= self.config.min_compaction_interval.as_millis() as i64
165            && let Err(e) = self
166                .compaction_scheduler
167                .schedule_compaction(
168                    region.region_id,
169                    compact_request::Options::Regular(Default::default()),
170                    &region.version_control,
171                    &region.access_layer,
172                    OptionOutputTx::none(),
173                    &region.manifest_ctx,
174                    self.schema_metadata_manager.clone(),
175                    1, // Default for automatic compaction
176                )
177                .await
178        {
179            warn!(
180                "Failed to schedule compaction for region: {}, err: {}",
181                region.region_id, e
182            );
183        }
184    }
185}