1use std::time::Instant;
16
17use common_telemetry::{error, info, warn};
18use store_api::logstore::LogStore;
19use store_api::region_request::{EnterStagingRequest, StagingPartitionDirective};
20use store_api::storage::RegionId;
21
22use crate::compaction::RequestCancelResult;
23use crate::error::{RegionNotFoundSnafu, Result, StagingPartitionExprMismatchSnafu};
24use crate::flush::FlushReason;
25use crate::manifest::action::{RegionMetaAction, RegionMetaActionList, RegionPartitionExprChange};
26use crate::region::{MitoRegionRef, RegionLeaderState, StagingPartitionInfo};
27use crate::request::{
28 BackgroundNotify, DdlRequest, EnterStagingResult, OptionOutputTx, SenderDdlRequest,
29 WorkerRequest, WorkerRequestWithTime,
30};
31use crate::worker::RegionWorkerLoop;
32
33impl<S: LogStore> RegionWorkerLoop<S> {
34 pub(crate) async fn handle_enter_staging_request(
35 &mut self,
36 region_id: RegionId,
37 partition_directive: StagingPartitionDirective,
38 mut sender: OptionOutputTx,
39 ) {
40 let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
41 return;
42 };
43
44 if region.is_staging() {
46 let staging_partition_info = region.manifest_ctx.staging_partition_info();
47 if staging_partition_info
49 .as_ref()
50 .map(|info| &info.partition_directive)
51 != Some(&partition_directive)
52 {
53 sender.send(Err(StagingPartitionExprMismatchSnafu {
54 manifest_expr: staging_partition_info
55 .as_ref()
56 .and_then(|info| info.partition_expr().map(ToString::to_string)),
57 request_expr: format!("{:?}", partition_directive),
58 }
59 .build()));
60 return;
61 }
62
63 sender.send(Ok(0));
65 return;
66 }
67
68 let version = region.version();
69 if !version.memtables.is_empty() {
70 info!("Flush region: {} before entering staging", region_id);
73 debug_assert!(!region.is_staging());
74 let task = self.new_flush_task(
75 ®ion,
76 FlushReason::EnterStaging,
77 None,
78 self.config.clone(),
79 );
80 if let Err(e) =
81 self.flush_scheduler
82 .schedule_flush(region.region_id, ®ion.version_control, task)
83 {
84 sender.send(Err(e));
86 return;
87 }
88
89 self.flush_scheduler
91 .add_ddl_request_to_pending(SenderDdlRequest {
92 region_id,
93 sender,
94 request: DdlRequest::EnterStaging(EnterStagingRequest {
95 partition_directive: partition_directive.clone(),
96 }),
97 });
98
99 return;
100 }
101
102 match self.compaction_scheduler.request_cancel(region_id) {
103 RequestCancelResult::CancelIssued
104 | RequestCancelResult::AlreadyCancelling
105 | RequestCancelResult::TooLateToCancel => {
106 self.compaction_scheduler
109 .add_ddl_request_to_pending(SenderDdlRequest {
110 region_id,
111 sender,
112 request: DdlRequest::EnterStaging(EnterStagingRequest {
113 partition_directive,
114 }),
115 });
116
117 return;
118 }
119 RequestCancelResult::NotRunning => {}
120 }
121
122 self.handle_enter_staging(region, partition_directive, sender);
123 }
124
125 async fn enter_staging(
126 region: &MitoRegionRef,
127 partition_directive: &StagingPartitionDirective,
128 ) -> Result<()> {
129 let now = Instant::now();
130 {
132 let mut manager = region.manifest_ctx.manifest_manager.write().await;
133 manager
134 .clear_staging_manifest_and_dir()
135 .await
136 .inspect_err(|e| {
137 error!(
138 e;
139 "Failed to clear staging manifest files for region {}",
140 region.region_id
141 );
142 })?;
143
144 info!(
145 "Cleared all staging manifest files for region {}, elapsed: {:?}",
146 region.region_id,
147 now.elapsed(),
148 );
149 }
150
151 let partition_expr = match partition_directive {
152 StagingPartitionDirective::UpdatePartitionExpr(partition_expr) => {
153 partition_expr.clone()
154 }
155 StagingPartitionDirective::RejectAllWrites => {
156 info!(
157 "Enter staging with reject all writes, region_id: {}",
158 region.region_id
159 );
160 return Ok(());
162 }
163 };
164
165 let change = RegionPartitionExprChange {
167 partition_expr: Some(partition_expr.clone()),
168 };
169 let action_list =
170 RegionMetaActionList::with_action(RegionMetaAction::PartitionExprChange(change));
171 region
172 .manifest_ctx
173 .update_manifest(RegionLeaderState::EnteringStaging, action_list, true)
174 .await?;
175
176 Ok(())
177 }
178
179 fn handle_enter_staging(
180 &self,
181 region: MitoRegionRef,
182 partition_directive: StagingPartitionDirective,
183 sender: OptionOutputTx,
184 ) {
185 if let Err(e) = region.set_entering_staging() {
186 sender.send(Err(e));
187 return;
188 }
189
190 let listener = self.listener.clone();
191 let request_sender = self.sender.clone();
192 common_runtime::spawn_global(async move {
193 let now = Instant::now();
194 let result = Self::enter_staging(®ion, &partition_directive).await;
195 match result {
196 Ok(_) => {
197 info!(
198 "Created staging manifest for region {}, elapsed: {:?}",
199 region.region_id,
200 now.elapsed(),
201 );
202 }
203 Err(ref e) => {
204 region
206 .manifest_ctx
207 .manifest_manager
208 .write()
209 .await
210 .unset_staging_manifest();
211 error!(
212 "Failed to create staging manifest for region {}: {:?}, elapsed: {:?}",
213 region.region_id,
214 e,
215 now.elapsed(),
216 );
217 }
218 }
219
220 let notify = WorkerRequest::Background {
221 region_id: region.region_id,
222 notify: BackgroundNotify::EnterStaging(EnterStagingResult {
223 region_id: region.region_id,
224 sender,
225 result,
226 partition_directive,
227 }),
228 };
229 listener
230 .on_enter_staging_result_begin(region.region_id)
231 .await;
232
233 if let Err(res) = request_sender
234 .send(WorkerRequestWithTime::new(notify))
235 .await
236 {
237 warn!(
238 "Failed to send enter staging result back to the worker, region_id: {}, res: {:?}",
239 region.region_id, res
240 );
241 }
242 });
243 }
244
245 pub(crate) async fn handle_enter_staging_result(
247 &mut self,
248 enter_staging_result: EnterStagingResult,
249 ) {
250 let region = match self.regions.get_region(enter_staging_result.region_id) {
251 Some(region) => region,
252 None => {
253 self.reject_region_stalled_requests(&enter_staging_result.region_id);
254 enter_staging_result.sender.send(
255 RegionNotFoundSnafu {
256 region_id: enter_staging_result.region_id,
257 }
258 .fail(),
259 );
260 return;
261 }
262 };
263
264 if enter_staging_result.result.is_ok() {
265 info!(
266 "Updating region {} staging partition directive to {:?}",
267 region.region_id, enter_staging_result.partition_directive
268 );
269 Self::update_region_staging_partition_info(
270 ®ion,
271 enter_staging_result.partition_directive,
272 );
273 region.switch_state_to_staging(RegionLeaderState::EnteringStaging);
274 } else {
275 region.switch_state_to_writable(RegionLeaderState::EnteringStaging);
276 }
277 enter_staging_result
278 .sender
279 .send(enter_staging_result.result.map(|_| 0));
280 self.handle_region_stalled_requests(&enter_staging_result.region_id)
282 .await;
283 }
284
285 fn update_region_staging_partition_info(
286 region: &MitoRegionRef,
287 partition_directive: StagingPartitionDirective,
288 ) {
289 region.manifest_ctx.set_staging_partition_info(
290 StagingPartitionInfo::from_partition_directive(partition_directive),
291 );
292 }
293}