mito2/worker/
handle_truncate.rs1use common_telemetry::info;
18use store_api::logstore::LogStore;
19use store_api::storage::RegionId;
20
21use crate::error::RegionNotFoundSnafu;
22use crate::manifest::action::RegionTruncate;
23use crate::region::RegionLeaderState;
24use crate::request::{OptionOutputTx, TruncateResult};
25use crate::worker::RegionWorkerLoop;
26
27impl<S: LogStore> RegionWorkerLoop<S> {
28 pub(crate) async fn handle_truncate_request(
29 &mut self,
30 region_id: RegionId,
31 mut sender: OptionOutputTx,
32 ) {
33 let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
34 return;
35 };
36
37 info!("Try to truncate region {}", region_id);
38
39 let version_data = region.version_control.current();
40 let truncated_entry_id = version_data.last_entry_id;
41 let truncated_sequence = version_data.committed_sequence;
42
43 let truncate = RegionTruncate {
45 region_id,
46 truncated_entry_id,
47 truncated_sequence,
48 };
49 self.handle_manifest_truncate_action(region, truncate, sender);
50 }
51
52 pub(crate) async fn handle_truncate_result(&mut self, truncate_result: TruncateResult) {
54 let region_id = truncate_result.region_id;
55 let Some(region) = self.regions.get_region(region_id) else {
56 truncate_result.sender.send(
57 RegionNotFoundSnafu {
58 region_id: truncate_result.region_id,
59 }
60 .fail(),
61 );
62 return;
63 };
64
65 region.switch_state_to_writable(RegionLeaderState::Truncating);
67
68 match truncate_result.result {
69 Ok(()) => {
70 region.version_control.truncate(
72 truncate_result.truncated_entry_id,
73 truncate_result.truncated_sequence,
74 ®ion.memtable_builder,
75 );
76 }
77 Err(e) => {
78 truncate_result.sender.send(Err(e));
80 return;
81 }
82 }
83
84 self.flush_scheduler.on_region_truncated(region_id);
86 self.compaction_scheduler.on_region_truncated(region_id);
88
89 if let Err(e) = self
91 .wal
92 .obsolete(
93 region_id,
94 truncate_result.truncated_entry_id,
95 ®ion.provider,
96 )
97 .await
98 {
99 truncate_result.sender.send(Err(e));
100 return;
101 }
102
103 info!(
104 "Complete truncating region: {}, entry id: {} and sequence: {}.",
105 region_id, truncate_result.truncated_entry_id, truncate_result.truncated_sequence
106 );
107
108 truncate_result.sender.send(Ok(0));
109 }
110}