mito2/worker/
handle_truncate.rs1use common_telemetry::{debug, info};
18use store_api::logstore::LogStore;
19use store_api::region_request::RegionTruncateRequest;
20use store_api::storage::RegionId;
21
22use crate::error::RegionNotFoundSnafu;
23use crate::manifest::action::{RegionTruncate, TruncateKind};
24use crate::region::RegionLeaderState;
25use crate::request::{OptionOutputTx, TruncateResult};
26use crate::worker::RegionWorkerLoop;
27
28impl<S: LogStore> RegionWorkerLoop<S> {
29 pub(crate) async fn handle_truncate_request(
30 &mut self,
31 region_id: RegionId,
32 req: RegionTruncateRequest,
33 mut sender: OptionOutputTx,
34 ) {
35 let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
36 return;
37 };
38 let version_data = region.version_control.current();
39
40 match req {
41 RegionTruncateRequest::All => {
42 info!("Try to fully truncate region {}", region_id);
43
44 let truncated_entry_id = version_data.last_entry_id;
45 let truncated_sequence = version_data.committed_sequence;
46
47 let truncate = RegionTruncate {
49 region_id,
50 kind: TruncateKind::All {
51 truncated_entry_id,
52 truncated_sequence,
53 },
54 };
55
56 self.handle_manifest_truncate_action(region, truncate, sender);
57 }
58 RegionTruncateRequest::ByTimeRanges { time_ranges } => {
59 info!(
60 "Try to partially truncate region {} by time ranges: {:?}",
61 region_id, time_ranges
62 );
63 let mut files_to_truncate = Vec::new();
65 for level in version_data.version.ssts.levels() {
66 for file in level.files() {
67 let file_time_range = file.time_range();
68 let is_subset = time_ranges.iter().any(|(start, end)| {
71 file_time_range.0 >= *start && file_time_range.1 <= *end
72 });
73
74 if is_subset {
75 files_to_truncate.push(file.meta_ref().clone());
76 }
77 }
78 }
79 debug!(
80 "Found {} files to partially truncate in region {}",
81 files_to_truncate.len(),
82 region_id
83 );
84
85 if files_to_truncate.is_empty() {
87 info!("No files to truncate in region {}", region_id);
88 sender.send(Ok(0));
91 return;
92 }
93 let truncate = RegionTruncate {
94 region_id,
95 kind: TruncateKind::Partial {
96 files_to_remove: files_to_truncate,
97 },
98 };
99 self.handle_manifest_truncate_action(region, truncate, sender);
100 }
101 };
102 }
103
104 pub(crate) async fn handle_truncate_result(&mut self, truncate_result: TruncateResult) {
106 let region_id = truncate_result.region_id;
107 let Some(region) = self.regions.get_region(region_id) else {
108 truncate_result.sender.send(
109 RegionNotFoundSnafu {
110 region_id: truncate_result.region_id,
111 }
112 .fail(),
113 );
114 return;
115 };
116
117 region.switch_state_to_writable(RegionLeaderState::Truncating);
119
120 match truncate_result.result {
121 Ok(()) => {
122 region
124 .version_control
125 .truncate(truncate_result.kind.clone(), ®ion.memtable_builder);
126 }
127 Err(e) => {
128 truncate_result.sender.send(Err(e));
130 return;
131 }
132 }
133
134 self.flush_scheduler.on_region_truncated(region_id);
136 self.compaction_scheduler.on_region_truncated(region_id);
138
139 if let TruncateKind::All {
140 truncated_entry_id,
141 truncated_sequence: _,
142 } = &truncate_result.kind
143 {
144 if let Err(e) = self
146 .wal
147 .obsolete(region_id, *truncated_entry_id, ®ion.provider)
148 .await
149 {
150 truncate_result.sender.send(Err(e));
151 return;
152 }
153 }
154
155 info!(
156 "Complete truncating region: {}, kind: {:?}.",
157 region_id, truncate_result.kind
158 );
159
160 truncate_result.sender.send(Ok(0));
161 }
162}