1use std::time::Instant;
16
17use common_telemetry::{error, info, warn};
18use store_api::logstore::LogStore;
19use store_api::region_request::{EnterStagingRequest, StagingPartitionDirective};
20use store_api::storage::RegionId;
21
22use crate::error::{RegionNotFoundSnafu, Result, StagingPartitionExprMismatchSnafu};
23use crate::flush::FlushReason;
24use crate::manifest::action::{RegionMetaAction, RegionMetaActionList, RegionPartitionExprChange};
25use crate::region::{MitoRegionRef, RegionLeaderState, StagingPartitionInfo};
26use crate::request::{
27 BackgroundNotify, DdlRequest, EnterStagingResult, OptionOutputTx, SenderDdlRequest,
28 WorkerRequest, WorkerRequestWithTime,
29};
30use crate::worker::RegionWorkerLoop;
31
32impl<S: LogStore> RegionWorkerLoop<S> {
33 pub(crate) async fn handle_enter_staging_request(
34 &mut self,
35 region_id: RegionId,
36 partition_directive: StagingPartitionDirective,
37 mut sender: OptionOutputTx,
38 ) {
39 let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
40 return;
41 };
42
43 if region.is_staging() {
45 let staging_partition_info = region.staging_partition_info.lock().unwrap().clone();
46 if staging_partition_info
48 .as_ref()
49 .map(|info| &info.partition_directive)
50 != Some(&partition_directive)
51 {
52 sender.send(Err(StagingPartitionExprMismatchSnafu {
53 manifest_expr: staging_partition_info
54 .as_ref()
55 .and_then(|info| info.partition_expr().map(ToString::to_string)),
56 request_expr: format!("{:?}", partition_directive),
57 }
58 .build()));
59 return;
60 }
61
62 sender.send(Ok(0));
64 return;
65 }
66
67 let version = region.version();
68 if !version.memtables.is_empty() {
69 info!("Flush region: {} before entering staging", region_id);
72 debug_assert!(!region.is_staging());
73 let task = self.new_flush_task(
74 ®ion,
75 FlushReason::EnterStaging,
76 None,
77 self.config.clone(),
78 );
79 if let Err(e) =
80 self.flush_scheduler
81 .schedule_flush(region.region_id, ®ion.version_control, task)
82 {
83 sender.send(Err(e));
85 return;
86 }
87
88 self.flush_scheduler
90 .add_ddl_request_to_pending(SenderDdlRequest {
91 region_id,
92 sender,
93 request: DdlRequest::EnterStaging(EnterStagingRequest {
94 partition_directive: partition_directive.clone(),
95 }),
96 });
97
98 return;
99 }
100
101 if self.compaction_scheduler.is_compacting(region_id) {
102 self.compaction_scheduler
104 .add_ddl_request_to_pending(SenderDdlRequest {
105 region_id,
106 sender,
107 request: DdlRequest::EnterStaging(EnterStagingRequest {
108 partition_directive,
109 }),
110 });
111
112 return;
113 }
114
115 self.handle_enter_staging(region, partition_directive, sender);
116 }
117
118 async fn enter_staging(
119 region: &MitoRegionRef,
120 partition_directive: &StagingPartitionDirective,
121 ) -> Result<()> {
122 let now = Instant::now();
123 {
125 let mut manager = region.manifest_ctx.manifest_manager.write().await;
126 manager
127 .clear_staging_manifest_and_dir()
128 .await
129 .inspect_err(|e| {
130 error!(
131 e;
132 "Failed to clear staging manifest files for region {}",
133 region.region_id
134 );
135 })?;
136
137 info!(
138 "Cleared all staging manifest files for region {}, elapsed: {:?}",
139 region.region_id,
140 now.elapsed(),
141 );
142 }
143
144 let partition_expr = match partition_directive {
145 StagingPartitionDirective::UpdatePartitionExpr(partition_expr) => {
146 partition_expr.clone()
147 }
148 StagingPartitionDirective::RejectAllWrites => {
149 info!(
150 "Enter staging with reject all writes, region_id: {}",
151 region.region_id
152 );
153 return Ok(());
155 }
156 };
157
158 let change = RegionPartitionExprChange {
160 partition_expr: Some(partition_expr.clone()),
161 };
162 let action_list =
163 RegionMetaActionList::with_action(RegionMetaAction::PartitionExprChange(change));
164 region
165 .manifest_ctx
166 .update_manifest(RegionLeaderState::EnteringStaging, action_list, true)
167 .await?;
168
169 Ok(())
170 }
171
172 fn handle_enter_staging(
173 &self,
174 region: MitoRegionRef,
175 partition_directive: StagingPartitionDirective,
176 sender: OptionOutputTx,
177 ) {
178 if let Err(e) = region.set_entering_staging() {
179 sender.send(Err(e));
180 return;
181 }
182
183 let listener = self.listener.clone();
184 let request_sender = self.sender.clone();
185 common_runtime::spawn_global(async move {
186 let now = Instant::now();
187 let result = Self::enter_staging(®ion, &partition_directive).await;
188 match result {
189 Ok(_) => {
190 info!(
191 "Created staging manifest for region {}, elapsed: {:?}",
192 region.region_id,
193 now.elapsed(),
194 );
195 }
196 Err(ref e) => {
197 region
199 .manifest_ctx
200 .manifest_manager
201 .write()
202 .await
203 .unset_staging_manifest();
204 error!(
205 "Failed to create staging manifest for region {}: {:?}, elapsed: {:?}",
206 region.region_id,
207 e,
208 now.elapsed(),
209 );
210 }
211 }
212
213 let notify = WorkerRequest::Background {
214 region_id: region.region_id,
215 notify: BackgroundNotify::EnterStaging(EnterStagingResult {
216 region_id: region.region_id,
217 sender,
218 result,
219 partition_directive,
220 }),
221 };
222 listener
223 .on_enter_staging_result_begin(region.region_id)
224 .await;
225
226 if let Err(res) = request_sender
227 .send(WorkerRequestWithTime::new(notify))
228 .await
229 {
230 warn!(
231 "Failed to send enter staging result back to the worker, region_id: {}, res: {:?}",
232 region.region_id, res
233 );
234 }
235 });
236 }
237
238 pub(crate) async fn handle_enter_staging_result(
240 &mut self,
241 enter_staging_result: EnterStagingResult,
242 ) {
243 let region = match self.regions.get_region(enter_staging_result.region_id) {
244 Some(region) => region,
245 None => {
246 self.reject_region_stalled_requests(&enter_staging_result.region_id);
247 enter_staging_result.sender.send(
248 RegionNotFoundSnafu {
249 region_id: enter_staging_result.region_id,
250 }
251 .fail(),
252 );
253 return;
254 }
255 };
256
257 if enter_staging_result.result.is_ok() {
258 info!(
259 "Updating region {} staging partition directive to {:?}",
260 region.region_id, enter_staging_result.partition_directive
261 );
262 Self::update_region_staging_partition_info(
263 ®ion,
264 enter_staging_result.partition_directive,
265 );
266 region.switch_state_to_staging(RegionLeaderState::EnteringStaging);
267 } else {
268 region.switch_state_to_writable(RegionLeaderState::EnteringStaging);
269 }
270 enter_staging_result
271 .sender
272 .send(enter_staging_result.result.map(|_| 0));
273 self.handle_region_stalled_requests(&enter_staging_result.region_id)
275 .await;
276 }
277
278 fn update_region_staging_partition_info(
279 region: &MitoRegionRef,
280 partition_directive: StagingPartitionDirective,
281 ) {
282 let mut staging_partition_info = region.staging_partition_info.lock().unwrap();
283 debug_assert!(staging_partition_info.is_none());
284 *staging_partition_info = Some(StagingPartitionInfo::from_partition_directive(
285 partition_directive,
286 ));
287 }
288}