mito2/worker/
handle_truncate.rs1use common_telemetry::{debug, info};
18use store_api::logstore::LogStore;
19use store_api::region_request::RegionTruncateRequest;
20use store_api::storage::RegionId;
21
22use crate::error::RegionNotFoundSnafu;
23use crate::manifest::action::{RegionTruncate, TruncateKind};
24use crate::region::RegionLeaderState;
25use crate::request::{OptionOutputTx, TruncateResult};
26use crate::worker::RegionWorkerLoop;
27
28impl<S: LogStore> RegionWorkerLoop<S> {
29 pub(crate) async fn handle_truncate_request(
30 &mut self,
31 region_id: RegionId,
32 req: RegionTruncateRequest,
33 sender: OptionOutputTx,
34 ) {
35 let region = match self.regions.writable_non_staging_region(region_id) {
36 Ok(region) => region,
37 Err(e) => {
38 sender.send(Err(e));
39 return;
40 }
41 };
42
43 let version_data = region.version_control.current();
44
45 match req {
46 RegionTruncateRequest::All => {
47 info!("Try to fully truncate region {}", region_id);
48
49 let truncated_entry_id = version_data.last_entry_id;
50 let truncated_sequence = version_data.committed_sequence;
51
52 let truncate = RegionTruncate {
54 region_id,
55 kind: TruncateKind::All {
56 truncated_entry_id,
57 truncated_sequence,
58 },
59 timestamp_ms: None,
60 };
61
62 self.handle_manifest_truncate_action(region, truncate, sender);
63 }
64 RegionTruncateRequest::ByTimeRanges { time_ranges } => {
65 info!(
66 "Try to partially truncate region {} by time ranges: {:?}",
67 region_id, time_ranges
68 );
69 let mut files_to_truncate = Vec::new();
71 for level in version_data.version.ssts.levels() {
72 for file in level.files() {
73 let file_time_range = file.time_range();
74 let is_subset = time_ranges.iter().any(|(start, end)| {
77 file_time_range.0 >= *start && file_time_range.1 <= *end
78 });
79
80 if is_subset {
81 files_to_truncate.push(file.meta_ref().clone());
82 }
83 }
84 }
85 debug!(
86 "Found {} files to partially truncate in region {}",
87 files_to_truncate.len(),
88 region_id
89 );
90
91 if files_to_truncate.is_empty() {
93 info!("No files to truncate in region {}", region_id);
94 sender.send(Ok(0));
97 return;
98 }
99 let truncate = RegionTruncate {
100 region_id,
101 kind: TruncateKind::Partial {
102 files_to_remove: files_to_truncate,
103 },
104 timestamp_ms: None,
105 };
106 self.handle_manifest_truncate_action(region, truncate, sender);
107 }
108 };
109 }
110
111 pub(crate) async fn handle_truncate_result(&mut self, truncate_result: TruncateResult) {
113 let region_id = truncate_result.region_id;
114 let Some(region) = self.regions.get_region(region_id) else {
115 truncate_result.sender.send(
116 RegionNotFoundSnafu {
117 region_id: truncate_result.region_id,
118 }
119 .fail(),
120 );
121 return;
122 };
123
124 region.switch_state_to_writable(RegionLeaderState::Truncating);
126
127 match truncate_result.result {
128 Ok(()) => {
129 region
131 .version_control
132 .truncate(truncate_result.kind.clone(), ®ion.memtable_builder);
133 }
134 Err(e) => {
135 truncate_result.sender.send(Err(e));
137 return;
138 }
139 }
140
141 self.flush_scheduler.on_region_truncated(region_id);
143 self.compaction_scheduler.on_region_truncated(region_id);
145
146 if let TruncateKind::All {
147 truncated_entry_id,
148 truncated_sequence: _,
149 } = &truncate_result.kind
150 {
151 if let Err(e) = self
153 .wal
154 .obsolete(region_id, *truncated_entry_id, ®ion.provider)
155 .await
156 {
157 truncate_result.sender.send(Err(e));
158 return;
159 }
160 }
161
162 info!(
163 "Complete truncating region: {}, kind: {:?}.",
164 region_id, truncate_result.kind
165 );
166
167 truncate_result.sender.send(Ok(0));
168 }
169}