mito2/worker/
handle_enter_staging.rs1use std::sync::Arc;
16use std::time::Instant;
17
18use common_telemetry::{error, info, warn};
19use store_api::logstore::LogStore;
20use store_api::region_request::EnterStagingRequest;
21use store_api::storage::RegionId;
22
23use crate::error::{RegionNotFoundSnafu, Result, StagingPartitionExprMismatchSnafu};
24use crate::flush::FlushReason;
25use crate::manifest::action::{RegionChange, RegionMetaAction, RegionMetaActionList};
26use crate::region::{MitoRegionRef, RegionLeaderState};
27use crate::request::{
28 BackgroundNotify, DdlRequest, EnterStagingResult, OptionOutputTx, SenderDdlRequest,
29 WorkerRequest, WorkerRequestWithTime,
30};
31use crate::worker::RegionWorkerLoop;
32
33impl<S: LogStore> RegionWorkerLoop<S> {
34 pub(crate) async fn handle_enter_staging_request(
35 &mut self,
36 region_id: RegionId,
37 partition_expr: String,
38 mut sender: OptionOutputTx,
39 ) {
40 let Some(region) = self.regions.writable_region_or(region_id, &mut sender) else {
41 return;
42 };
43
44 if region.is_staging() {
46 let staging_partition_expr = region.staging_partition_expr.lock().unwrap().clone();
47 if staging_partition_expr.as_ref() != Some(&partition_expr) {
49 sender.send(Err(StagingPartitionExprMismatchSnafu {
50 manifest_expr: staging_partition_expr,
51 request_expr: partition_expr,
52 }
53 .build()));
54 return;
55 }
56
57 sender.send(Ok(0));
59 return;
60 }
61
62 let version = region.version();
63 if !version.memtables.is_empty() {
64 info!("Flush region: {} before entering staging", region_id);
67 debug_assert!(!region.is_staging());
68 let task = self.new_flush_task(
69 ®ion,
70 FlushReason::EnterStaging,
71 None,
72 self.config.clone(),
73 );
74 if let Err(e) =
75 self.flush_scheduler
76 .schedule_flush(region.region_id, ®ion.version_control, task)
77 {
78 sender.send(Err(e));
80 return;
81 }
82
83 self.flush_scheduler
85 .add_ddl_request_to_pending(SenderDdlRequest {
86 region_id,
87 sender,
88 request: DdlRequest::EnterStaging(EnterStagingRequest { partition_expr }),
89 });
90
91 return;
92 }
93
94 self.handle_enter_staging(region, partition_expr, sender);
95 }
96
97 async fn enter_staging(region: &MitoRegionRef, partition_expr: String) -> Result<()> {
98 let now = Instant::now();
99 {
101 let mut manager = region.manifest_ctx.manifest_manager.write().await;
102 manager
103 .clear_staging_manifest_and_dir()
104 .await
105 .inspect_err(|e| {
106 error!(
107 e;
108 "Failed to clear staging manifest files for region {}",
109 region.region_id
110 );
111 })?;
112
113 info!(
114 "Cleared all staging manifest files for region {}, elapsed: {:?}",
115 region.region_id,
116 now.elapsed(),
117 );
118 }
119
120 let mut new_meta = (*region.metadata()).clone();
122 new_meta.partition_expr = Some(partition_expr.clone());
123 let sst_format = region.version().options.sst_format.unwrap_or_default();
124 let change = RegionChange {
125 metadata: Arc::new(new_meta),
126 sst_format,
127 };
128 let action_list = RegionMetaActionList::with_action(RegionMetaAction::Change(change));
129 region
130 .manifest_ctx
131 .update_manifest(RegionLeaderState::EnteringStaging, action_list, true)
132 .await?;
133
134 Ok(())
135 }
136
137 fn handle_enter_staging(
138 &self,
139 region: MitoRegionRef,
140 partition_expr: String,
141 sender: OptionOutputTx,
142 ) {
143 if let Err(e) = region.set_entering_staging() {
144 sender.send(Err(e));
145 return;
146 }
147
148 let listener = self.listener.clone();
149 let request_sender = self.sender.clone();
150 common_runtime::spawn_global(async move {
151 let now = Instant::now();
152 let result = Self::enter_staging(®ion, partition_expr.clone()).await;
153 match result {
154 Ok(_) => {
155 info!(
156 "Created staging manifest for region {}, elapsed: {:?}",
157 region.region_id,
158 now.elapsed(),
159 );
160 }
161 Err(ref e) => {
162 region
164 .manifest_ctx
165 .manifest_manager
166 .write()
167 .await
168 .unset_staging_manifest();
169 error!(
170 "Failed to create staging manifest for region {}: {:?}, elapsed: {:?}",
171 region.region_id,
172 e,
173 now.elapsed(),
174 );
175 }
176 }
177
178 let notify = WorkerRequest::Background {
179 region_id: region.region_id,
180 notify: BackgroundNotify::EnterStaging(EnterStagingResult {
181 region_id: region.region_id,
182 sender,
183 result,
184 partition_expr,
185 }),
186 };
187 listener
188 .on_enter_staging_result_begin(region.region_id)
189 .await;
190
191 if let Err(res) = request_sender
192 .send(WorkerRequestWithTime::new(notify))
193 .await
194 {
195 warn!(
196 "Failed to send enter staging result back to the worker, region_id: {}, res: {:?}",
197 region.region_id, res
198 );
199 }
200 });
201 }
202
203 pub(crate) async fn handle_enter_staging_result(
205 &mut self,
206 enter_staging_result: EnterStagingResult,
207 ) {
208 let region = match self.regions.get_region(enter_staging_result.region_id) {
209 Some(region) => region,
210 None => {
211 self.reject_region_stalled_requests(&enter_staging_result.region_id);
212 enter_staging_result.sender.send(
213 RegionNotFoundSnafu {
214 region_id: enter_staging_result.region_id,
215 }
216 .fail(),
217 );
218 return;
219 }
220 };
221
222 if enter_staging_result.result.is_ok() {
223 info!(
224 "Updating region {} staging partition expr to {}",
225 region.region_id, enter_staging_result.partition_expr
226 );
227 Self::update_region_staging_partition_expr(
228 ®ion,
229 enter_staging_result.partition_expr,
230 );
231 region.switch_state_to_staging(RegionLeaderState::EnteringStaging);
232 } else {
233 region.switch_state_to_writable(RegionLeaderState::EnteringStaging);
234 }
235 enter_staging_result
236 .sender
237 .send(enter_staging_result.result.map(|_| 0));
238 self.handle_region_stalled_requests(&enter_staging_result.region_id)
240 .await;
241 }
242
243 fn update_region_staging_partition_expr(region: &MitoRegionRef, partition_expr: String) {
244 let mut staging_partition_expr = region.staging_partition_expr.lock().unwrap();
245 debug_assert!(staging_partition_expr.is_none());
246 *staging_partition_expr = Some(partition_expr);
247 }
248}