mito2/worker/
handle_truncate.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handling truncate related requests.
16
17use common_telemetry::info;
18use store_api::logstore::LogStore;
19use store_api::storage::RegionId;
20
21use crate::error::RegionNotFoundSnafu;
22use crate::manifest::action::RegionTruncate;
23use crate::region::RegionLeaderState;
24use crate::request::{OptionOutputTx, TruncateResult};
25use crate::worker::RegionWorkerLoop;
26
27impl<S: LogStore> RegionWorkerLoop<S> {
28    pub(crate) async fn handle_truncate_request(
29        &mut self,
30        region_id: RegionId,
31        mut sender: OptionOutputTx,
32    ) {
33        let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
34            return;
35        };
36
37        info!("Try to truncate region {}", region_id);
38
39        let version_data = region.version_control.current();
40        let truncated_entry_id = version_data.last_entry_id;
41        let truncated_sequence = version_data.committed_sequence;
42
43        // Write region truncated to manifest.
44        let truncate = RegionTruncate {
45            region_id,
46            truncated_entry_id,
47            truncated_sequence,
48        };
49        self.handle_manifest_truncate_action(region, truncate, sender);
50    }
51
52    /// Handles truncate result.
53    pub(crate) async fn handle_truncate_result(&mut self, truncate_result: TruncateResult) {
54        let region_id = truncate_result.region_id;
55        let Some(region) = self.regions.get_region(region_id) else {
56            truncate_result.sender.send(
57                RegionNotFoundSnafu {
58                    region_id: truncate_result.region_id,
59                }
60                .fail(),
61            );
62            return;
63        };
64
65        // We are already in the worker loop so we can set the state first.
66        region.switch_state_to_writable(RegionLeaderState::Truncating);
67
68        match truncate_result.result {
69            Ok(()) => {
70                // Applies the truncate action to the region.
71                region.version_control.truncate(
72                    truncate_result.truncated_entry_id,
73                    truncate_result.truncated_sequence,
74                    &region.memtable_builder,
75                );
76            }
77            Err(e) => {
78                // Unable to truncate the region.
79                truncate_result.sender.send(Err(e));
80                return;
81            }
82        }
83
84        // Notifies flush scheduler.
85        self.flush_scheduler.on_region_truncated(region_id);
86        // Notifies compaction scheduler.
87        self.compaction_scheduler.on_region_truncated(region_id);
88
89        // Make all data obsolete.
90        if let Err(e) = self
91            .wal
92            .obsolete(
93                region_id,
94                truncate_result.truncated_entry_id,
95                &region.provider,
96            )
97            .await
98        {
99            truncate_result.sender.send(Err(e));
100            return;
101        }
102
103        info!(
104            "Complete truncating region: {}, entry id: {} and sequence: {}.",
105            region_id, truncate_result.truncated_entry_id, truncate_result.truncated_sequence
106        );
107
108        truncate_result.sender.send(Ok(0));
109    }
110}