1use std::any::Any;
16use std::collections::HashMap;
17use std::time::{Duration, Instant};
18
19use api::v1::meta::MailboxMessage;
20use common_meta::instruction::{Instruction, InstructionReply, SyncRegionReply, SyncRegionsReply};
21use common_meta::peer::Peer;
22use common_meta::rpc::router::RegionRoute;
23use common_procedure::{Context as ProcedureContext, Status};
24use common_telemetry::info;
25use common_telemetry::tracing_context::TracingContext;
26use futures::future::join_all;
27use serde::{Deserialize, Serialize};
28use snafu::{OptionExt, ResultExt, ensure};
29use store_api::region_engine::SyncRegionFromRequest;
30use store_api::storage::RegionId;
31
32use crate::error::{self, Error, Result};
33use crate::handler::HeartbeatMailbox;
34use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
35use crate::procedure::repartition::group::utils::{
36 HandleMultipleResult, group_region_routes_by_peer, handle_multiple_results,
37};
38use crate::procedure::repartition::group::{Context, State};
39use crate::procedure::utils::ErrorStrategy;
40use crate::service::mailbox::{Channel, MailboxRef};
41
42const DEFAULT_SYNC_REGION_PARALLELISM: usize = 3;
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct SyncRegion {
47 pub region_routes: Vec<RegionRoute>,
48}
49
50#[async_trait::async_trait]
51#[typetag::serde]
52impl State for SyncRegion {
53 async fn next(
54 &mut self,
55 ctx: &mut Context,
56 _procedure_ctx: &ProcedureContext,
57 ) -> Result<(Box<dyn State>, Status)> {
58 Self::flush_central_region(ctx).await?;
59 self.sync_regions(ctx).await?;
60
61 Ok((
62 Box::new(UpdateMetadata::ApplyStaging),
63 Status::executing(true),
64 ))
65 }
66
67 fn as_any(&self) -> &dyn Any {
68 self
69 }
70}
71
72impl SyncRegion {
73 async fn flush_central_region(ctx: &mut Context) -> Result<()> {
74 let operation_timeout =
75 ctx.next_operation_timeout()
76 .context(error::ExceededDeadlineSnafu {
77 operation: "Flush central region",
78 })?;
79 let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
80
81 crate::procedure::utils::flush_region(
82 &ctx.mailbox,
83 &ctx.server_addr,
84 &[prepare_result.central_region],
85 &prepare_result.central_region_datanode,
86 operation_timeout,
87 ErrorStrategy::Retry,
88 )
89 .await
90 }
91
92 fn build_sync_region_instructions(
94 central_region: RegionId,
95 region_routes: &[RegionRoute],
96 ) -> HashMap<Peer, Vec<common_meta::instruction::SyncRegion>> {
97 let target_region_routes_by_peer = group_region_routes_by_peer(region_routes);
98 let mut instructions = HashMap::with_capacity(target_region_routes_by_peer.len());
99
100 for (peer, region_ids) in target_region_routes_by_peer {
101 let sync_regions = region_ids
102 .into_iter()
103 .map(|region_id| {
104 let request = SyncRegionFromRequest::FromRegion {
105 source_region_id: central_region,
106 parallelism: DEFAULT_SYNC_REGION_PARALLELISM,
107 };
108 common_meta::instruction::SyncRegion { region_id, request }
109 })
110 .collect();
111 instructions.insert((*peer).clone(), sync_regions);
112 }
113
114 instructions
115 }
116
117 async fn sync_regions(&self, ctx: &mut Context) -> Result<()> {
119 let table_id = ctx.persistent_ctx.table_id;
120 let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
121 let instructions = Self::build_sync_region_instructions(
122 prepare_result.central_region,
123 &self.region_routes,
124 );
125 let operation_timeout =
126 ctx.next_operation_timeout()
127 .context(error::ExceededDeadlineSnafu {
128 operation: "Sync regions",
129 })?;
130
131 let (peers, tasks): (Vec<_>, Vec<_>) = instructions
132 .iter()
133 .map(|(peer, sync_regions)| {
134 (
135 peer,
136 Self::sync_region(
137 &ctx.mailbox,
138 &ctx.server_addr,
139 peer,
140 sync_regions,
141 operation_timeout,
142 ),
143 )
144 })
145 .unzip();
146
147 info!(
148 "Sent sync regions instructions to peers: {:?} for repartition table {}",
149 peers, table_id
150 );
151
152 let format_err_msg = |idx: usize, error: &Error| {
153 let peer = peers[idx];
154 format!(
155 "Failed to sync regions on datanode {:?}, error: {:?}",
156 peer, error
157 )
158 };
159
160 let results = join_all(tasks).await;
161 let result = handle_multiple_results(&results);
162
163 match result {
164 HandleMultipleResult::AllSuccessful => Ok(()),
165 HandleMultipleResult::AllRetryable(retryable_errors) => error::RetryLaterSnafu {
166 reason: format!(
167 "All retryable errors during syncing regions for repartition table {}: {:?}",
168 table_id,
169 retryable_errors
170 .iter()
171 .map(|(idx, error)| format_err_msg(*idx, error))
172 .collect::<Vec<_>>()
173 .join(",")
174 ),
175 }
176 .fail(),
177 HandleMultipleResult::AllNonRetryable(non_retryable_errors) => error::UnexpectedSnafu {
178 violated: format!(
179 "All non retryable errors during syncing regions for repartition table {}: {:?}",
180 table_id,
181 non_retryable_errors
182 .iter()
183 .map(|(idx, error)| format_err_msg(*idx, error))
184 .collect::<Vec<_>>()
185 .join(",")
186 ),
187 }
188 .fail(),
189 HandleMultipleResult::PartialRetryable {
190 retryable_errors,
191 non_retryable_errors,
192 } => error::UnexpectedSnafu {
193 violated: format!(
194 "Partial retryable errors during syncing regions for repartition table {}: {:?}, non retryable errors: {:?}",
195 table_id,
196 retryable_errors
197 .iter()
198 .map(|(idx, error)| format_err_msg(*idx, error))
199 .collect::<Vec<_>>()
200 .join(","),
201 non_retryable_errors
202 .iter()
203 .map(|(idx, error)| format_err_msg(*idx, error))
204 .collect::<Vec<_>>()
205 .join(","),
206 ),
207 }
208 .fail(),
209 }
210 }
211
212 async fn sync_region(
214 mailbox: &MailboxRef,
215 server_addr: &str,
216 peer: &Peer,
217 sync_regions: &[common_meta::instruction::SyncRegion],
218 timeout: Duration,
219 ) -> Result<()> {
220 let ch = Channel::Datanode(peer.id);
221 let instruction = Instruction::SyncRegions(sync_regions.to_vec());
222 let tracing_ctx = TracingContext::from_current_span();
223 let message = MailboxMessage::json_message(
224 &format!(
225 "Sync regions: {:?}",
226 sync_regions.iter().map(|r| r.region_id).collect::<Vec<_>>()
227 ),
228 &format!("Metasrv@{}", server_addr),
229 &format!("Datanode-{}@{}", peer.id, peer.addr),
230 common_time::util::current_time_millis(),
231 &instruction,
232 Some(tracing_ctx.to_w3c()),
233 )
234 .with_context(|_| error::SerializeToJsonSnafu {
235 input: instruction.to_string(),
236 })?;
237
238 let now = std::time::Instant::now();
239 let receiver = mailbox.send(&ch, message, timeout).await;
240
241 let receiver = match receiver {
242 Ok(receiver) => receiver,
243 Err(error::Error::PusherNotFound { .. }) => error::RetryLaterSnafu {
244 reason: format!(
245 "Pusher not found for sync regions on datanode {:?}, elapsed: {:?}",
246 peer,
247 now.elapsed()
248 ),
249 }
250 .fail()?,
251 Err(err) => {
252 return Err(err);
253 }
254 };
255
256 match receiver.await {
257 Ok(msg) => {
258 let reply = HeartbeatMailbox::json_reply(&msg)?;
259 info!(
260 "Received sync regions reply: {:?}, elapsed: {:?}",
261 reply,
262 now.elapsed()
263 );
264 let InstructionReply::SyncRegions(SyncRegionsReply { replies }) = reply else {
265 return error::UnexpectedInstructionReplySnafu {
266 mailbox_message: msg.to_string(),
267 reason: "expect sync regions reply",
268 }
269 .fail();
270 };
271 for reply in replies {
272 Self::handle_sync_region_reply(&reply, &now, peer)?;
273 }
274 Ok(())
275 }
276 Err(error::Error::MailboxChannelClosed { .. }) => error::RetryLaterSnafu {
277 reason: format!(
278 "Mailbox closed when sending sync region to datanode {:?}, elapsed: {:?}",
279 peer,
280 now.elapsed()
281 ),
282 }
283 .fail()?,
284 Err(error::Error::MailboxTimeout { .. }) => {
285 let reason = format!(
286 "Mailbox received timeout for sync regions on datanode {:?}, elapsed: {:?}",
287 peer,
288 now.elapsed()
289 );
290 error::RetryLaterSnafu { reason }.fail()
291 }
292 Err(err) => Err(err),
293 }
294 }
295
296 fn handle_sync_region_reply(
297 SyncRegionReply {
298 region_id,
299 ready,
300 exists,
301 error,
302 }: &SyncRegionReply,
303 now: &Instant,
304 peer: &Peer,
305 ) -> Result<()> {
306 ensure!(
307 exists,
308 error::UnexpectedSnafu {
309 violated: format!(
310 "Region {} doesn't exist on datanode {:?}, elapsed: {:?}",
311 region_id,
312 peer,
313 now.elapsed()
314 )
315 }
316 );
317
318 if let Some(error) = error {
319 return error::RetryLaterSnafu {
320 reason: format!(
321 "Failed to sync region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
322 region_id,
323 peer,
324 error,
325 now.elapsed()
326 ),
327 }
328 .fail();
329 }
330
331 ensure!(
332 ready,
333 error::RetryLaterSnafu {
334 reason: format!(
335 "Region {} failed to sync on datanode {:?}, elapsed: {:?}",
336 region_id,
337 peer,
338 now.elapsed()
339 ),
340 }
341 );
342
343 Ok(())
344 }
345}
346
347#[cfg(test)]
348mod tests {
349 use std::assert_matches;
350
351 use common_meta::peer::Peer;
352 use common_meta::rpc::router::{Region, RegionRoute};
353 use store_api::region_engine::SyncRegionFromRequest;
354 use store_api::storage::RegionId;
355
356 use crate::error::Error;
357 use crate::procedure::repartition::group::GroupPrepareResult;
358 use crate::procedure::repartition::group::sync_region::SyncRegion;
359 use crate::procedure::repartition::test_util::{TestingEnv, new_persistent_context};
360 use crate::procedure::test_util::{new_sync_region_reply, send_mock_reply};
361 use crate::service::mailbox::Channel;
362
363 #[test]
364 fn test_build_sync_region_instructions() {
365 let table_id = 1024;
366 let central_region = RegionId::new(table_id, 1);
367 let region_routes = vec![RegionRoute {
368 region: Region {
369 id: RegionId::new(table_id, 3),
370 ..Default::default()
371 },
372 leader_peer: Some(Peer::empty(1)),
373 ..Default::default()
374 }];
375
376 let instructions =
377 SyncRegion::build_sync_region_instructions(central_region, ®ion_routes);
378 assert_eq!(instructions.len(), 1);
379 let peer_instructions = instructions.get(&Peer::empty(1)).unwrap();
380 assert_eq!(peer_instructions.len(), 1);
381 assert_eq!(peer_instructions[0].region_id, RegionId::new(table_id, 3));
382 let SyncRegionFromRequest::FromRegion {
383 source_region_id, ..
384 } = &peer_instructions[0].request
385 else {
386 panic!("expect from region request");
387 };
388 assert_eq!(*source_region_id, central_region);
389 }
390
391 fn test_prepare_result(table_id: u32) -> GroupPrepareResult {
392 GroupPrepareResult {
393 source_routes: vec![],
394 target_routes: vec![],
395 central_region: RegionId::new(table_id, 1),
396 central_region_datanode: Peer::empty(1),
397 }
398 }
399
400 #[tokio::test]
401 async fn test_sync_regions_all_successful() {
402 let mut env = TestingEnv::new();
403 let table_id = 1024;
404 let mut persistent_context = new_persistent_context(table_id, vec![], vec![]);
405 persistent_context.group_prepare_result = Some(test_prepare_result(table_id));
406
407 let (tx, rx) = tokio::sync::mpsc::channel(1);
408 env.mailbox_ctx
409 .insert_heartbeat_response_receiver(Channel::Datanode(1), tx)
410 .await;
411 send_mock_reply(env.mailbox_ctx.mailbox().clone(), rx, |id| {
412 Ok(new_sync_region_reply(
413 id,
414 RegionId::new(1024, 3),
415 true,
416 true,
417 None,
418 ))
419 });
420
421 let mut ctx = env.create_context(persistent_context);
422 let region_routes = vec![RegionRoute {
423 region: Region {
424 id: RegionId::new(table_id, 3),
425 ..Default::default()
426 },
427 leader_peer: Some(Peer::empty(1)),
428 ..Default::default()
429 }];
430 let sync_region = SyncRegion { region_routes };
431
432 sync_region.sync_regions(&mut ctx).await.unwrap();
433 }
434
435 #[tokio::test]
436 async fn test_sync_regions_retryable() {
437 let env = TestingEnv::new();
438 let table_id = 1024;
439 let mut persistent_context = new_persistent_context(table_id, vec![], vec![]);
440 persistent_context.group_prepare_result = Some(test_prepare_result(table_id));
441
442 let mut ctx = env.create_context(persistent_context);
443 let region_routes = vec![RegionRoute {
444 region: Region {
445 id: RegionId::new(table_id, 3),
446 ..Default::default()
447 },
448 leader_peer: Some(Peer::empty(1)),
449 ..Default::default()
450 }];
451 let sync_region = SyncRegion { region_routes };
452
453 let err = sync_region.sync_regions(&mut ctx).await.unwrap_err();
454 assert_matches!(err, Error::RetryLater { .. });
455 }
456}