mito2/worker/
handle_compaction.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::region::compact_request;
16use common_telemetry::{error, info, warn};
17use store_api::region_request::RegionCompactRequest;
18use store_api::storage::RegionId;
19
20use crate::error::RegionNotFoundSnafu;
21use crate::metrics::COMPACTION_REQUEST_COUNT;
22use crate::region::MitoRegionRef;
23use crate::request::{CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx};
24use crate::worker::RegionWorkerLoop;
25
26impl<S> RegionWorkerLoop<S> {
27    /// Handles compaction request submitted to region worker.
28    pub(crate) async fn handle_compaction_request(
29        &mut self,
30        region_id: RegionId,
31        req: RegionCompactRequest,
32        mut sender: OptionOutputTx,
33    ) {
34        let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
35            return;
36        };
37        COMPACTION_REQUEST_COUNT.inc();
38        if let Err(e) = self
39            .compaction_scheduler
40            .schedule_compaction(
41                region.region_id,
42                req.options,
43                &region.version_control,
44                &region.access_layer,
45                sender,
46                &region.manifest_ctx,
47                self.schema_metadata_manager.clone(),
48                // TODO(yingwen): expose this to frontend
49                1,
50            )
51            .await
52        {
53            error!(e; "Failed to schedule compaction task for region: {}", region_id);
54        } else {
55            info!(
56                "Successfully scheduled compaction task for region: {}",
57                region_id
58            );
59        }
60    }
61
62    /// Handles compaction finished, update region version and manifest, deleted compacted files.
63    pub(crate) async fn handle_compaction_finished(
64        &mut self,
65        region_id: RegionId,
66        mut request: CompactionFinished,
67    ) {
68        let region = match self.regions.get_region(region_id) {
69            Some(region) => region,
70            None => {
71                request.on_failure(RegionNotFoundSnafu { region_id }.build());
72                return;
73            }
74        };
75        region.update_compaction_millis();
76
77        region.version_control.apply_edit(
78            Some(request.edit.clone()),
79            &[],
80            region.file_purger.clone(),
81        );
82
83        // compaction finished.
84        request.on_success();
85
86        // Schedule next compaction if necessary.
87        self.compaction_scheduler
88            .on_compaction_finished(
89                region_id,
90                &region.manifest_ctx,
91                self.schema_metadata_manager.clone(),
92            )
93            .await;
94    }
95
96    /// When compaction fails, we simply log the error.
97    pub(crate) async fn handle_compaction_failure(&mut self, req: CompactionFailed) {
98        error!(req.err; "Failed to compact region: {}", req.region_id);
99
100        self.compaction_scheduler
101            .on_compaction_failed(req.region_id, req.err);
102    }
103
104    /// Schedule compaction for the region if necessary.
105    pub(crate) async fn schedule_compaction(&mut self, region: &MitoRegionRef) {
106        let now = self.time_provider.current_time_millis();
107        if now - region.last_compaction_millis()
108            >= self.config.min_compaction_interval.as_millis() as i64
109            && let Err(e) = self
110                .compaction_scheduler
111                .schedule_compaction(
112                    region.region_id,
113                    compact_request::Options::Regular(Default::default()),
114                    &region.version_control,
115                    &region.access_layer,
116                    OptionOutputTx::none(),
117                    &region.manifest_ctx,
118                    self.schema_metadata_manager.clone(),
119                    1,
120                )
121                .await
122        {
123            warn!(
124                "Failed to schedule compaction for region: {}, err: {}",
125                region.region_id, e
126            );
127        }
128    }
129}