mito2/worker/
handle_enter_staging.rs1use std::time::Instant;
16
17use common_telemetry::{error, info, warn};
18use store_api::logstore::LogStore;
19use store_api::region_request::{EnterStagingRequest, StagingPartitionDirective};
20use store_api::storage::RegionId;
21
22use crate::error::{RegionNotFoundSnafu, Result, StagingPartitionExprMismatchSnafu};
23use crate::flush::FlushReason;
24use crate::manifest::action::{RegionMetaAction, RegionMetaActionList, RegionPartitionExprChange};
25use crate::region::{MitoRegionRef, RegionLeaderState, StagingPartitionInfo};
26use crate::request::{
27 BackgroundNotify, DdlRequest, EnterStagingResult, OptionOutputTx, SenderDdlRequest,
28 WorkerRequest, WorkerRequestWithTime,
29};
30use crate::worker::RegionWorkerLoop;
31
32impl<S: LogStore> RegionWorkerLoop<S> {
33 pub(crate) async fn handle_enter_staging_request(
34 &mut self,
35 region_id: RegionId,
36 partition_directive: StagingPartitionDirective,
37 mut sender: OptionOutputTx,
38 ) {
39 let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
40 return;
41 };
42
43 if region.is_staging() {
45 let staging_partition_info = region.staging_partition_info.lock().unwrap().clone();
46 if staging_partition_info
48 .as_ref()
49 .map(|info| &info.partition_directive)
50 != Some(&partition_directive)
51 {
52 sender.send(Err(StagingPartitionExprMismatchSnafu {
53 manifest_expr: staging_partition_info
54 .as_ref()
55 .and_then(|info| info.partition_expr().map(ToString::to_string)),
56 request_expr: format!("{:?}", partition_directive),
57 }
58 .build()));
59 return;
60 }
61
62 sender.send(Ok(0));
64 return;
65 }
66
67 let version = region.version();
68 if !version.memtables.is_empty() {
69 info!("Flush region: {} before entering staging", region_id);
72 debug_assert!(!region.is_staging());
73 let task = self.new_flush_task(
74 ®ion,
75 FlushReason::EnterStaging,
76 None,
77 self.config.clone(),
78 );
79 if let Err(e) =
80 self.flush_scheduler
81 .schedule_flush(region.region_id, ®ion.version_control, task)
82 {
83 sender.send(Err(e));
85 return;
86 }
87
88 self.flush_scheduler
90 .add_ddl_request_to_pending(SenderDdlRequest {
91 region_id,
92 sender,
93 request: DdlRequest::EnterStaging(EnterStagingRequest {
94 partition_directive: partition_directive.clone(),
95 }),
96 });
97
98 return;
99 }
100
101 self.handle_enter_staging(region, partition_directive, sender);
102 }
103
104 async fn enter_staging(
105 region: &MitoRegionRef,
106 partition_directive: &StagingPartitionDirective,
107 ) -> Result<()> {
108 let now = Instant::now();
109 {
111 let mut manager = region.manifest_ctx.manifest_manager.write().await;
112 manager
113 .clear_staging_manifest_and_dir()
114 .await
115 .inspect_err(|e| {
116 error!(
117 e;
118 "Failed to clear staging manifest files for region {}",
119 region.region_id
120 );
121 })?;
122
123 info!(
124 "Cleared all staging manifest files for region {}, elapsed: {:?}",
125 region.region_id,
126 now.elapsed(),
127 );
128 }
129
130 let partition_expr = match partition_directive {
131 StagingPartitionDirective::UpdatePartitionExpr(partition_expr) => {
132 partition_expr.clone()
133 }
134 StagingPartitionDirective::RejectAllWrites => {
135 info!(
136 "Enter staging with reject all writes, region_id: {}",
137 region.region_id
138 );
139 return Ok(());
141 }
142 };
143
144 let change = RegionPartitionExprChange {
146 partition_expr: Some(partition_expr.clone()),
147 };
148 let action_list =
149 RegionMetaActionList::with_action(RegionMetaAction::PartitionExprChange(change));
150 region
151 .manifest_ctx
152 .update_manifest(RegionLeaderState::EnteringStaging, action_list, true)
153 .await?;
154
155 Ok(())
156 }
157
158 fn handle_enter_staging(
159 &self,
160 region: MitoRegionRef,
161 partition_directive: StagingPartitionDirective,
162 sender: OptionOutputTx,
163 ) {
164 if let Err(e) = region.set_entering_staging() {
165 sender.send(Err(e));
166 return;
167 }
168
169 let listener = self.listener.clone();
170 let request_sender = self.sender.clone();
171 common_runtime::spawn_global(async move {
172 let now = Instant::now();
173 let result = Self::enter_staging(®ion, &partition_directive).await;
174 match result {
175 Ok(_) => {
176 info!(
177 "Created staging manifest for region {}, elapsed: {:?}",
178 region.region_id,
179 now.elapsed(),
180 );
181 }
182 Err(ref e) => {
183 region
185 .manifest_ctx
186 .manifest_manager
187 .write()
188 .await
189 .unset_staging_manifest();
190 error!(
191 "Failed to create staging manifest for region {}: {:?}, elapsed: {:?}",
192 region.region_id,
193 e,
194 now.elapsed(),
195 );
196 }
197 }
198
199 let notify = WorkerRequest::Background {
200 region_id: region.region_id,
201 notify: BackgroundNotify::EnterStaging(EnterStagingResult {
202 region_id: region.region_id,
203 sender,
204 result,
205 partition_directive,
206 }),
207 };
208 listener
209 .on_enter_staging_result_begin(region.region_id)
210 .await;
211
212 if let Err(res) = request_sender
213 .send(WorkerRequestWithTime::new(notify))
214 .await
215 {
216 warn!(
217 "Failed to send enter staging result back to the worker, region_id: {}, res: {:?}",
218 region.region_id, res
219 );
220 }
221 });
222 }
223
224 pub(crate) async fn handle_enter_staging_result(
226 &mut self,
227 enter_staging_result: EnterStagingResult,
228 ) {
229 let region = match self.regions.get_region(enter_staging_result.region_id) {
230 Some(region) => region,
231 None => {
232 self.reject_region_stalled_requests(&enter_staging_result.region_id);
233 enter_staging_result.sender.send(
234 RegionNotFoundSnafu {
235 region_id: enter_staging_result.region_id,
236 }
237 .fail(),
238 );
239 return;
240 }
241 };
242
243 if enter_staging_result.result.is_ok() {
244 info!(
245 "Updating region {} staging partition directive to {:?}",
246 region.region_id, enter_staging_result.partition_directive
247 );
248 Self::update_region_staging_partition_info(
249 ®ion,
250 enter_staging_result.partition_directive,
251 );
252 region.switch_state_to_staging(RegionLeaderState::EnteringStaging);
253 } else {
254 region.switch_state_to_writable(RegionLeaderState::EnteringStaging);
255 }
256 enter_staging_result
257 .sender
258 .send(enter_staging_result.result.map(|_| 0));
259 self.handle_region_stalled_requests(&enter_staging_result.region_id)
261 .await;
262 }
263
264 fn update_region_staging_partition_info(
265 region: &MitoRegionRef,
266 partition_directive: StagingPartitionDirective,
267 ) {
268 let mut staging_partition_info = region.staging_partition_info.lock().unwrap();
269 debug_assert!(staging_partition_info.is_none());
270 *staging_partition_info = Some(StagingPartitionInfo::from_partition_directive(
271 partition_directive,
272 ));
273 }
274}