mito2/worker/
handle_enter_staging.rs1use std::sync::Arc;
16use std::time::Instant;
17
18use common_telemetry::{error, info, warn};
19use store_api::logstore::LogStore;
20use store_api::region_request::EnterStagingRequest;
21use store_api::storage::RegionId;
22
23use crate::error::{RegionNotFoundSnafu, Result, StagingPartitionExprMismatchSnafu};
24use crate::flush::FlushReason;
25use crate::manifest::action::{RegionChange, RegionMetaAction, RegionMetaActionList};
26use crate::region::{MitoRegionRef, RegionLeaderState};
27use crate::request::{
28 BackgroundNotify, DdlRequest, EnterStagingResult, OptionOutputTx, SenderDdlRequest,
29 WorkerRequest, WorkerRequestWithTime,
30};
31use crate::worker::RegionWorkerLoop;
32
33impl<S: LogStore> RegionWorkerLoop<S> {
34 pub(crate) async fn handle_enter_staging_request(
35 &mut self,
36 region_id: RegionId,
37 partition_expr: String,
38 mut sender: OptionOutputTx,
39 ) {
40 let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
41 return;
42 };
43
44 if region.is_staging() {
46 let staging_partition_expr = region.staging_partition_expr.lock().unwrap().clone();
47 if staging_partition_expr.as_ref() != Some(&partition_expr) {
49 sender.send(Err(StagingPartitionExprMismatchSnafu {
50 manifest_expr: staging_partition_expr,
51 request_expr: partition_expr,
52 }
53 .build()));
54 return;
55 }
56
57 sender.send(Ok(0));
59 return;
60 }
61
62 let version = region.version();
63 if !version.memtables.is_empty() {
64 info!("Flush region: {} before entering staging", region_id);
67 debug_assert!(!region.is_staging());
68 let task = self.new_flush_task(
69 ®ion,
70 FlushReason::EnterStaging,
71 None,
72 self.config.clone(),
73 region.is_staging(),
74 );
75 if let Err(e) =
76 self.flush_scheduler
77 .schedule_flush(region.region_id, ®ion.version_control, task)
78 {
79 sender.send(Err(e));
81 return;
82 }
83
84 self.flush_scheduler
86 .add_ddl_request_to_pending(SenderDdlRequest {
87 region_id,
88 sender,
89 request: DdlRequest::EnterStaging(EnterStagingRequest { partition_expr }),
90 });
91
92 return;
93 }
94
95 self.handle_enter_staging(region, partition_expr, sender);
96 }
97
98 async fn enter_staging(region: &MitoRegionRef, partition_expr: String) -> Result<()> {
99 let now = Instant::now();
100 {
102 let mut manager = region.manifest_ctx.manifest_manager.write().await;
103 manager
104 .clear_staging_manifest_and_dir()
105 .await
106 .inspect_err(|e| {
107 error!(
108 e;
109 "Failed to clear staging manifest files for region {}",
110 region.region_id
111 );
112 })?;
113
114 info!(
115 "Cleared all staging manifest files for region {}, elapsed: {:?}",
116 region.region_id,
117 now.elapsed(),
118 );
119 }
120
121 let mut new_meta = (*region.metadata()).clone();
123 new_meta.partition_expr = Some(partition_expr.clone());
124 let sst_format = region.version().options.sst_format.unwrap_or_default();
125 let change = RegionChange {
126 metadata: Arc::new(new_meta),
127 sst_format,
128 };
129 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Change(change));
130 region
131 .manifest_ctx
132 .update_manifest(RegionLeaderState::EnteringStaging, action_list, true)
133 .await?;
134
135 Ok(())
136 }
137
138 fn handle_enter_staging(
139 &self,
140 region: MitoRegionRef,
141 partition_expr: String,
142 sender: OptionOutputTx,
143 ) {
144 if let Err(e) = region.set_entering_staging() {
145 sender.send(Err(e));
146 return;
147 }
148
149 let listener = self.listener.clone();
150 let request_sender = self.sender.clone();
151 common_runtime::spawn_global(async move {
152 let now = Instant::now();
153 let result = Self::enter_staging(®ion, partition_expr.clone()).await;
154 match result {
155 Ok(_) => {
156 info!(
157 "Created staging manifest for region {}, elapsed: {:?}",
158 region.region_id,
159 now.elapsed(),
160 );
161 }
162 Err(ref e) => {
163 region
165 .manifest_ctx
166 .manifest_manager
167 .write()
168 .await
169 .unset_staging_manifest();
170 error!(
171 "Failed to create staging manifest for region {}: {:?}, elapsed: {:?}",
172 region.region_id,
173 e,
174 now.elapsed(),
175 );
176 }
177 }
178
179 let notify = WorkerRequest::Background {
180 region_id: region.region_id,
181 notify: BackgroundNotify::EnterStaging(EnterStagingResult {
182 region_id: region.region_id,
183 sender,
184 result,
185 partition_expr,
186 }),
187 };
188 listener
189 .on_enter_staging_result_begin(region.region_id)
190 .await;
191
192 if let Err(res) = request_sender
193 .send(WorkerRequestWithTime::new(notify))
194 .await
195 {
196 warn!(
197 "Failed to send enter staging result back to the worker, region_id: {}, res: {:?}",
198 region.region_id, res
199 );
200 }
201 });
202 }
203
204 pub(crate) async fn handle_enter_staging_result(
206 &mut self,
207 enter_staging_result: EnterStagingResult,
208 ) {
209 let region = match self.regions.get_region(enter_staging_result.region_id) {
210 Some(region) => region,
211 None => {
212 self.reject_region_stalled_requests(&enter_staging_result.region_id);
213 enter_staging_result.sender.send(
214 RegionNotFoundSnafu {
215 region_id: enter_staging_result.region_id,
216 }
217 .fail(),
218 );
219 return;
220 }
221 };
222
223 if enter_staging_result.result.is_ok() {
224 info!(
225 "Updating region {} staging partition expr to {}",
226 region.region_id, enter_staging_result.partition_expr
227 );
228 Self::update_region_staging_partition_expr(
229 ®ion,
230 enter_staging_result.partition_expr,
231 );
232 region.switch_state_to_staging(RegionLeaderState::EnteringStaging);
233 } else {
234 region.switch_state_to_writable(RegionLeaderState::EnteringStaging);
235 }
236 enter_staging_result
237 .sender
238 .send(enter_staging_result.result.map(|_| 0));
239 self.handle_region_stalled_requests(&enter_staging_result.region_id)
241 .await;
242 }
243
244 fn update_region_staging_partition_expr(region: &MitoRegionRef, partition_expr: String) {
245 let mut staging_partition_expr = region.staging_partition_expr.lock().unwrap();
246 debug_assert!(staging_partition_expr.is_none());
247 *staging_partition_expr = Some(partition_expr);
248 }
249}