mito2/worker/
handle_truncate.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handling truncate related requests.
16
17use common_telemetry::{debug, info};
18use store_api::logstore::LogStore;
19use store_api::region_request::RegionTruncateRequest;
20use store_api::storage::RegionId;
21
22use crate::error::RegionNotFoundSnafu;
23use crate::manifest::action::{RegionTruncate, TruncateKind};
24use crate::region::RegionLeaderState;
25use crate::request::{OptionOutputTx, TruncateResult};
26use crate::worker::RegionWorkerLoop;
27
28impl<S: LogStore> RegionWorkerLoop<S> {
29    pub(crate) async fn handle_truncate_request(
30        &mut self,
31        region_id: RegionId,
32        req: RegionTruncateRequest,
33        mut sender: OptionOutputTx,
34    ) {
35        let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
36            return;
37        };
38        let version_data = region.version_control.current();
39
40        match req {
41            RegionTruncateRequest::All => {
42                info!("Try to fully truncate region {}", region_id);
43
44                let truncated_entry_id = version_data.last_entry_id;
45                let truncated_sequence = version_data.committed_sequence;
46
47                // Write region truncated to manifest.
48                let truncate = RegionTruncate {
49                    region_id,
50                    kind: TruncateKind::All {
51                        truncated_entry_id,
52                        truncated_sequence,
53                    },
54                };
55
56                self.handle_manifest_truncate_action(region, truncate, sender);
57            }
58            RegionTruncateRequest::ByTimeRanges { time_ranges } => {
59                info!(
60                    "Try to partially truncate region {} by time ranges: {:?}",
61                    region_id, time_ranges
62                );
63                // find all files that are fully contained in the time ranges
64                let mut files_to_truncate = Vec::new();
65                for level in version_data.version.ssts.levels() {
66                    for file in level.files() {
67                        let file_time_range = file.time_range();
68                        // TODO(discord9): This is a naive way to check if is contained, we should
69                        // optimize it later.
70                        let is_subset = time_ranges.iter().any(|(start, end)| {
71                            file_time_range.0 >= *start && file_time_range.1 <= *end
72                        });
73
74                        if is_subset {
75                            files_to_truncate.push(file.meta_ref().clone());
76                        }
77                    }
78                }
79                debug!(
80                    "Found {} files to partially truncate in region {}",
81                    files_to_truncate.len(),
82                    region_id
83                );
84
85                // this could happen if all files are not fully contained in the time ranges
86                if files_to_truncate.is_empty() {
87                    info!("No files to truncate in region {}", region_id);
88                    // directly send success back as no files to truncate
89                    // and no background notify is needed
90                    sender.send(Ok(0));
91                    return;
92                }
93                let truncate = RegionTruncate {
94                    region_id,
95                    kind: TruncateKind::Partial {
96                        files_to_remove: files_to_truncate,
97                    },
98                };
99                self.handle_manifest_truncate_action(region, truncate, sender);
100            }
101        };
102    }
103
104    /// Handles truncate result.
105    pub(crate) async fn handle_truncate_result(&mut self, truncate_result: TruncateResult) {
106        let region_id = truncate_result.region_id;
107        let Some(region) = self.regions.get_region(region_id) else {
108            truncate_result.sender.send(
109                RegionNotFoundSnafu {
110                    region_id: truncate_result.region_id,
111                }
112                .fail(),
113            );
114            return;
115        };
116
117        // We are already in the worker loop so we can set the state first.
118        region.switch_state_to_writable(RegionLeaderState::Truncating);
119
120        match truncate_result.result {
121            Ok(()) => {
122                // Applies the truncate action to the region.
123                region
124                    .version_control
125                    .truncate(truncate_result.kind.clone(), &region.memtable_builder);
126            }
127            Err(e) => {
128                // Unable to truncate the region.
129                truncate_result.sender.send(Err(e));
130                return;
131            }
132        }
133
134        // Notifies flush scheduler.
135        self.flush_scheduler.on_region_truncated(region_id);
136        // Notifies compaction scheduler.
137        self.compaction_scheduler.on_region_truncated(region_id);
138
139        if let TruncateKind::All {
140            truncated_entry_id,
141            truncated_sequence: _,
142        } = &truncate_result.kind
143        {
144            // Make all data obsolete.
145            if let Err(e) = self
146                .wal
147                .obsolete(region_id, *truncated_entry_id, &region.provider)
148                .await
149            {
150                truncate_result.sender.send(Err(e));
151                return;
152            }
153        }
154
155        info!(
156            "Complete truncating region: {}, kind: {:?}.",
157            region_id, truncate_result.kind
158        );
159
160        truncate_result.sender.send(Ok(0));
161    }
162}