mito2/worker/
handle_truncate.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handling truncate related requests.
16
17use common_telemetry::{debug, info};
18use store_api::logstore::LogStore;
19use store_api::region_request::RegionTruncateRequest;
20use store_api::storage::RegionId;
21
22use crate::error::RegionNotFoundSnafu;
23use crate::manifest::action::{RegionTruncate, TruncateKind};
24use crate::region::RegionLeaderState;
25use crate::request::{OptionOutputTx, TruncateResult};
26use crate::worker::RegionWorkerLoop;
27
28impl<S: LogStore> RegionWorkerLoop<S> {
29    pub(crate) async fn handle_truncate_request(
30        &mut self,
31        region_id: RegionId,
32        req: RegionTruncateRequest,
33        sender: OptionOutputTx,
34    ) {
35        let region = match self.regions.writable_non_staging_region(region_id) {
36            Ok(region) => region,
37            Err(e) => {
38                sender.send(Err(e));
39                return;
40            }
41        };
42
43        let version_data = region.version_control.current();
44
45        match req {
46            RegionTruncateRequest::All => {
47                info!("Try to fully truncate region {}", region_id);
48
49                let truncated_entry_id = version_data.last_entry_id;
50                let truncated_sequence = version_data.committed_sequence;
51
52                // Write region truncated to manifest.
53                let truncate = RegionTruncate {
54                    region_id,
55                    kind: TruncateKind::All {
56                        truncated_entry_id,
57                        truncated_sequence,
58                    },
59                    timestamp_ms: None,
60                };
61
62                self.handle_manifest_truncate_action(region, truncate, sender);
63            }
64            RegionTruncateRequest::ByTimeRanges { time_ranges } => {
65                info!(
66                    "Try to partially truncate region {} by time ranges: {:?}",
67                    region_id, time_ranges
68                );
69                // find all files that are fully contained in the time ranges
70                let mut files_to_truncate = Vec::new();
71                for level in version_data.version.ssts.levels() {
72                    for file in level.files() {
73                        let file_time_range = file.time_range();
74                        // TODO(discord9): This is a naive way to check if is contained, we should
75                        // optimize it later.
76                        let is_subset = time_ranges.iter().any(|(start, end)| {
77                            file_time_range.0 >= *start && file_time_range.1 <= *end
78                        });
79
80                        if is_subset {
81                            files_to_truncate.push(file.meta_ref().clone());
82                        }
83                    }
84                }
85                debug!(
86                    "Found {} files to partially truncate in region {}",
87                    files_to_truncate.len(),
88                    region_id
89                );
90
91                // this could happen if all files are not fully contained in the time ranges
92                if files_to_truncate.is_empty() {
93                    info!("No files to truncate in region {}", region_id);
94                    // directly send success back as no files to truncate
95                    // and no background notify is needed
96                    sender.send(Ok(0));
97                    return;
98                }
99                let truncate = RegionTruncate {
100                    region_id,
101                    kind: TruncateKind::Partial {
102                        files_to_remove: files_to_truncate,
103                    },
104                    timestamp_ms: None,
105                };
106                self.handle_manifest_truncate_action(region, truncate, sender);
107            }
108        };
109    }
110
111    /// Handles truncate result.
112    pub(crate) async fn handle_truncate_result(&mut self, truncate_result: TruncateResult) {
113        let region_id = truncate_result.region_id;
114        let Some(region) = self.regions.get_region(region_id) else {
115            truncate_result.sender.send(
116                RegionNotFoundSnafu {
117                    region_id: truncate_result.region_id,
118                }
119                .fail(),
120            );
121            return;
122        };
123
124        // We are already in the worker loop so we can set the state first.
125        region.switch_state_to_writable(RegionLeaderState::Truncating);
126
127        match truncate_result.result {
128            Ok(()) => {
129                // Applies the truncate action to the region.
130                region
131                    .version_control
132                    .truncate(truncate_result.kind.clone(), &region.memtable_builder);
133            }
134            Err(e) => {
135                // Unable to truncate the region.
136                truncate_result.sender.send(Err(e));
137                return;
138            }
139        }
140
141        // Notifies flush scheduler.
142        self.flush_scheduler.on_region_truncated(region_id);
143        // Notifies compaction scheduler.
144        self.compaction_scheduler.on_region_truncated(region_id);
145
146        if let TruncateKind::All {
147            truncated_entry_id,
148            truncated_sequence: _,
149        } = &truncate_result.kind
150        {
151            // Make all data obsolete.
152            if let Err(e) = self
153                .wal
154                .obsolete(region_id, *truncated_entry_id, &region.provider)
155                .await
156            {
157                truncate_result.sender.send(Err(e));
158                return;
159            }
160        }
161
162        info!(
163            "Complete truncating region: {}, kind: {:?}.",
164            region_id, truncate_result.kind
165        );
166
167        truncate_result.sender.send(Ok(0));
168    }
169}