mito2/worker/
handle_compaction.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::region::compact_request;
16use common_telemetry::{error, info, warn};
17use store_api::region_request::RegionCompactRequest;
18use store_api::storage::RegionId;
19
20use crate::error::RegionNotFoundSnafu;
21use crate::metrics::COMPACTION_REQUEST_COUNT;
22use crate::region::MitoRegionRef;
23use crate::request::{CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx};
24use crate::worker::RegionWorkerLoop;
25
26impl<S> RegionWorkerLoop<S> {
27    /// Handles compaction request submitted to region worker.
28    pub(crate) async fn handle_compaction_request(
29        &mut self,
30        region_id: RegionId,
31        req: RegionCompactRequest,
32        mut sender: OptionOutputTx,
33    ) {
34        let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
35            return;
36        };
37        COMPACTION_REQUEST_COUNT.inc();
38        if let Err(e) = self
39            .compaction_scheduler
40            .schedule_compaction(
41                region.region_id,
42                req.options,
43                &region.version_control,
44                &region.access_layer,
45                sender,
46                &region.manifest_ctx,
47                self.schema_metadata_manager.clone(),
48                // TODO(yingwen): expose this to frontend
49                1,
50            )
51            .await
52        {
53            error!(e; "Failed to schedule compaction task for region: {}", region_id);
54        } else {
55            info!(
56                "Successfully scheduled compaction task for region: {}",
57                region_id
58            );
59        }
60    }
61
62    /// Handles compaction finished, update region version and manifest, deleted compacted files.
63    pub(crate) async fn handle_compaction_finished(
64        &mut self,
65        region_id: RegionId,
66        mut request: CompactionFinished,
67    ) {
68        let region = match self.regions.get_region(region_id) {
69            Some(region) => region,
70            None => {
71                request.on_failure(RegionNotFoundSnafu { region_id }.build());
72                return;
73            }
74        };
75        region.update_compaction_millis();
76
77        region
78            .version_control
79            .apply_edit(request.edit.clone(), &[], region.file_purger.clone());
80
81        // compaction finished.
82        request.on_success();
83
84        // Schedule next compaction if necessary.
85        self.compaction_scheduler
86            .on_compaction_finished(
87                region_id,
88                &region.manifest_ctx,
89                self.schema_metadata_manager.clone(),
90            )
91            .await;
92    }
93
94    /// When compaction fails, we simply log the error.
95    pub(crate) async fn handle_compaction_failure(&mut self, req: CompactionFailed) {
96        error!(req.err; "Failed to compact region: {}", req.region_id);
97
98        self.compaction_scheduler
99            .on_compaction_failed(req.region_id, req.err);
100    }
101
102    /// Schedule compaction for the region if necessary.
103    pub(crate) async fn schedule_compaction(&mut self, region: &MitoRegionRef) {
104        let now = self.time_provider.current_time_millis();
105        if now - region.last_compaction_millis()
106            >= self.config.min_compaction_interval.as_millis() as i64
107        {
108            if let Err(e) = self
109                .compaction_scheduler
110                .schedule_compaction(
111                    region.region_id,
112                    compact_request::Options::Regular(Default::default()),
113                    &region.version_control,
114                    &region.access_layer,
115                    OptionOutputTx::none(),
116                    &region.manifest_ctx,
117                    self.schema_metadata_manager.clone(),
118                    1,
119                )
120                .await
121            {
122                warn!(
123                    "Failed to schedule compaction for region: {}, err: {}",
124                    region.region_id, e
125                );
126            }
127        }
128    }
129}