1use std::any::Any;
16use std::collections::HashMap;
17use std::time::{Duration, Instant};
18
19use api::v1::meta::MailboxMessage;
20use common_meta::instruction::{Instruction, InstructionReply, SyncRegionReply, SyncRegionsReply};
21use common_meta::peer::Peer;
22use common_meta::rpc::router::RegionRoute;
23use common_procedure::{Context as ProcedureContext, Status};
24use common_telemetry::info;
25use common_telemetry::tracing_context::TracingContext;
26use futures::future::join_all;
27use serde::{Deserialize, Serialize};
28use snafu::{OptionExt, ResultExt, ensure};
29use store_api::region_engine::SyncRegionFromRequest;
30use store_api::region_request::RegionFlushReason;
31use store_api::storage::RegionId;
32
33use crate::error::{self, Error, Result};
34use crate::handler::HeartbeatMailbox;
35use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
36use crate::procedure::repartition::group::utils::{
37 HandleMultipleResult, group_region_routes_by_peer, handle_multiple_results,
38};
39use crate::procedure::repartition::group::{Context, State};
40use crate::procedure::utils::ErrorStrategy;
41use crate::service::mailbox::{Channel, MailboxRef};
42
43const DEFAULT_SYNC_REGION_PARALLELISM: usize = 3;
44
45#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct SyncRegion {
48 pub region_routes: Vec<RegionRoute>,
49}
50
51#[async_trait::async_trait]
52#[typetag::serde]
53impl State for SyncRegion {
54 async fn next(
55 &mut self,
56 ctx: &mut Context,
57 _procedure_ctx: &ProcedureContext,
58 ) -> Result<(Box<dyn State>, Status)> {
59 Self::flush_central_region(ctx).await?;
60 self.sync_regions(ctx).await?;
61
62 Ok((
63 Box::new(UpdateMetadata::ApplyStaging),
64 Status::executing(true),
65 ))
66 }
67
68 fn as_any(&self) -> &dyn Any {
69 self
70 }
71}
72
73impl SyncRegion {
74 async fn flush_central_region(ctx: &mut Context) -> Result<()> {
75 let operation_timeout =
76 ctx.next_operation_timeout()
77 .context(error::ExceededDeadlineSnafu {
78 operation: "Flush central region",
79 })?;
80 let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
81
82 crate::procedure::utils::flush_region(
83 &ctx.mailbox,
84 &ctx.server_addr,
85 &[prepare_result.central_region],
86 &prepare_result.central_region_datanode,
87 operation_timeout,
88 ErrorStrategy::Retry,
89 Some(RegionFlushReason::Repartition),
90 )
91 .await
92 }
93
94 fn build_sync_region_instructions(
96 central_region: RegionId,
97 region_routes: &[RegionRoute],
98 ) -> HashMap<Peer, Vec<common_meta::instruction::SyncRegion>> {
99 let target_region_routes_by_peer = group_region_routes_by_peer(region_routes);
100 let mut instructions = HashMap::with_capacity(target_region_routes_by_peer.len());
101
102 for (peer, region_ids) in target_region_routes_by_peer {
103 let sync_regions = region_ids
104 .into_iter()
105 .map(|region_id| {
106 let request = SyncRegionFromRequest::FromRegion {
107 source_region_id: central_region,
108 parallelism: DEFAULT_SYNC_REGION_PARALLELISM,
109 };
110 common_meta::instruction::SyncRegion { region_id, request }
111 })
112 .collect();
113 instructions.insert((*peer).clone(), sync_regions);
114 }
115
116 instructions
117 }
118
119 async fn sync_regions(&self, ctx: &mut Context) -> Result<()> {
121 let table_id = ctx.persistent_ctx.table_id;
122 let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
123 let instructions = Self::build_sync_region_instructions(
124 prepare_result.central_region,
125 &self.region_routes,
126 );
127 let operation_timeout =
128 ctx.next_operation_timeout()
129 .context(error::ExceededDeadlineSnafu {
130 operation: "Sync regions",
131 })?;
132
133 let (peers, tasks): (Vec<_>, Vec<_>) = instructions
134 .iter()
135 .map(|(peer, sync_regions)| {
136 (
137 peer,
138 Self::sync_region(
139 &ctx.mailbox,
140 &ctx.server_addr,
141 peer,
142 sync_regions,
143 operation_timeout,
144 ),
145 )
146 })
147 .unzip();
148
149 info!(
150 "Sent sync regions instructions to peers: {:?} for repartition table {}",
151 peers, table_id
152 );
153
154 let format_err_msg = |idx: usize, error: &Error| {
155 let peer = peers[idx];
156 format!(
157 "Failed to sync regions on datanode {:?}, error: {:?}",
158 peer, error
159 )
160 };
161
162 let results = join_all(tasks).await;
163 let result = handle_multiple_results(&results);
164
165 match result {
166 HandleMultipleResult::AllSuccessful => Ok(()),
167 HandleMultipleResult::AllRetryable(retryable_errors) => error::RetryLaterSnafu {
168 reason: format!(
169 "All retryable errors during syncing regions for repartition table {}: {:?}",
170 table_id,
171 retryable_errors
172 .iter()
173 .map(|(idx, error)| format_err_msg(*idx, error))
174 .collect::<Vec<_>>()
175 .join(",")
176 ),
177 }
178 .fail(),
179 HandleMultipleResult::AllNonRetryable(non_retryable_errors) => error::UnexpectedSnafu {
180 violated: format!(
181 "All non retryable errors during syncing regions for repartition table {}: {:?}",
182 table_id,
183 non_retryable_errors
184 .iter()
185 .map(|(idx, error)| format_err_msg(*idx, error))
186 .collect::<Vec<_>>()
187 .join(",")
188 ),
189 }
190 .fail(),
191 HandleMultipleResult::PartialRetryable {
192 retryable_errors,
193 non_retryable_errors,
194 } => error::UnexpectedSnafu {
195 violated: format!(
196 "Partial retryable errors during syncing regions for repartition table {}: {:?}, non retryable errors: {:?}",
197 table_id,
198 retryable_errors
199 .iter()
200 .map(|(idx, error)| format_err_msg(*idx, error))
201 .collect::<Vec<_>>()
202 .join(","),
203 non_retryable_errors
204 .iter()
205 .map(|(idx, error)| format_err_msg(*idx, error))
206 .collect::<Vec<_>>()
207 .join(","),
208 ),
209 }
210 .fail(),
211 }
212 }
213
214 async fn sync_region(
216 mailbox: &MailboxRef,
217 server_addr: &str,
218 peer: &Peer,
219 sync_regions: &[common_meta::instruction::SyncRegion],
220 timeout: Duration,
221 ) -> Result<()> {
222 let ch = Channel::Datanode(peer.id);
223 let instruction = Instruction::SyncRegions(sync_regions.to_vec());
224 let tracing_ctx = TracingContext::from_current_span();
225 let message = MailboxMessage::json_message(
226 &format!(
227 "Sync regions: {:?}",
228 sync_regions.iter().map(|r| r.region_id).collect::<Vec<_>>()
229 ),
230 &format!("Metasrv@{}", server_addr),
231 &format!("Datanode-{}@{}", peer.id, peer.addr),
232 common_time::util::current_time_millis(),
233 &instruction,
234 Some(tracing_ctx.to_w3c()),
235 )
236 .with_context(|_| error::SerializeToJsonSnafu {
237 input: instruction.to_string(),
238 })?;
239
240 let now = std::time::Instant::now();
241 let receiver = mailbox.send(&ch, message, timeout).await;
242
243 let receiver = match receiver {
244 Ok(receiver) => receiver,
245 Err(error::Error::PusherNotFound { .. }) => error::RetryLaterSnafu {
246 reason: format!(
247 "Pusher not found for sync regions on datanode {:?}, elapsed: {:?}",
248 peer,
249 now.elapsed()
250 ),
251 }
252 .fail()?,
253 Err(err) => {
254 return Err(err);
255 }
256 };
257
258 match receiver.await {
259 Ok(msg) => {
260 let reply = HeartbeatMailbox::json_reply(&msg)?;
261 info!(
262 "Received sync regions reply: {:?}, elapsed: {:?}",
263 reply,
264 now.elapsed()
265 );
266 let InstructionReply::SyncRegions(SyncRegionsReply { replies }) = reply else {
267 return error::UnexpectedInstructionReplySnafu {
268 mailbox_message: msg.to_string(),
269 reason: "expect sync regions reply",
270 }
271 .fail();
272 };
273 for reply in replies {
274 Self::handle_sync_region_reply(&reply, &now, peer)?;
275 }
276 Ok(())
277 }
278 Err(error::Error::MailboxChannelClosed { .. }) => error::RetryLaterSnafu {
279 reason: format!(
280 "Mailbox closed when sending sync region to datanode {:?}, elapsed: {:?}",
281 peer,
282 now.elapsed()
283 ),
284 }
285 .fail()?,
286 Err(error::Error::MailboxTimeout { .. }) => {
287 let reason = format!(
288 "Mailbox received timeout for sync regions on datanode {:?}, elapsed: {:?}",
289 peer,
290 now.elapsed()
291 );
292 error::RetryLaterSnafu { reason }.fail()
293 }
294 Err(err) => Err(err),
295 }
296 }
297
298 fn handle_sync_region_reply(
299 SyncRegionReply {
300 region_id,
301 ready,
302 exists,
303 error,
304 }: &SyncRegionReply,
305 now: &Instant,
306 peer: &Peer,
307 ) -> Result<()> {
308 ensure!(
309 exists,
310 error::UnexpectedSnafu {
311 violated: format!(
312 "Region {} doesn't exist on datanode {:?}, elapsed: {:?}",
313 region_id,
314 peer,
315 now.elapsed()
316 )
317 }
318 );
319
320 if let Some(error) = error {
321 return error::RetryLaterSnafu {
322 reason: format!(
323 "Failed to sync region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
324 region_id,
325 peer,
326 error,
327 now.elapsed()
328 ),
329 }
330 .fail();
331 }
332
333 ensure!(
334 ready,
335 error::RetryLaterSnafu {
336 reason: format!(
337 "Region {} failed to sync on datanode {:?}, elapsed: {:?}",
338 region_id,
339 peer,
340 now.elapsed()
341 ),
342 }
343 );
344
345 Ok(())
346 }
347}
348
349#[cfg(test)]
350mod tests {
351 use std::assert_matches;
352
353 use common_meta::peer::Peer;
354 use common_meta::rpc::router::{Region, RegionRoute};
355 use store_api::region_engine::SyncRegionFromRequest;
356 use store_api::storage::RegionId;
357
358 use crate::error::Error;
359 use crate::procedure::repartition::group::GroupPrepareResult;
360 use crate::procedure::repartition::group::sync_region::SyncRegion;
361 use crate::procedure::repartition::test_util::{TestingEnv, new_persistent_context};
362 use crate::procedure::test_util::{new_sync_region_reply, send_mock_reply};
363 use crate::service::mailbox::Channel;
364
365 #[test]
366 fn test_build_sync_region_instructions() {
367 let table_id = 1024;
368 let central_region = RegionId::new(table_id, 1);
369 let region_routes = vec![RegionRoute {
370 region: Region {
371 id: RegionId::new(table_id, 3),
372 ..Default::default()
373 },
374 leader_peer: Some(Peer::empty(1)),
375 ..Default::default()
376 }];
377
378 let instructions =
379 SyncRegion::build_sync_region_instructions(central_region, ®ion_routes);
380 assert_eq!(instructions.len(), 1);
381 let peer_instructions = instructions.get(&Peer::empty(1)).unwrap();
382 assert_eq!(peer_instructions.len(), 1);
383 assert_eq!(peer_instructions[0].region_id, RegionId::new(table_id, 3));
384 let SyncRegionFromRequest::FromRegion {
385 source_region_id, ..
386 } = &peer_instructions[0].request
387 else {
388 panic!("expect from region request");
389 };
390 assert_eq!(*source_region_id, central_region);
391 }
392
393 fn test_prepare_result(table_id: u32) -> GroupPrepareResult {
394 GroupPrepareResult {
395 source_routes: vec![],
396 target_routes: vec![],
397 central_region: RegionId::new(table_id, 1),
398 central_region_datanode: Peer::empty(1),
399 }
400 }
401
402 #[tokio::test]
403 async fn test_sync_regions_all_successful() {
404 let mut env = TestingEnv::new();
405 let table_id = 1024;
406 let mut persistent_context = new_persistent_context(table_id, vec![], vec![]);
407 persistent_context.group_prepare_result = Some(test_prepare_result(table_id));
408
409 let (tx, rx) = tokio::sync::mpsc::channel(1);
410 env.mailbox_ctx
411 .insert_heartbeat_response_receiver(Channel::Datanode(1), tx)
412 .await;
413 send_mock_reply(env.mailbox_ctx.mailbox().clone(), rx, |id| {
414 Ok(new_sync_region_reply(
415 id,
416 RegionId::new(1024, 3),
417 true,
418 true,
419 None,
420 ))
421 });
422
423 let mut ctx = env.create_context(persistent_context);
424 let region_routes = vec![RegionRoute {
425 region: Region {
426 id: RegionId::new(table_id, 3),
427 ..Default::default()
428 },
429 leader_peer: Some(Peer::empty(1)),
430 ..Default::default()
431 }];
432 let sync_region = SyncRegion { region_routes };
433
434 sync_region.sync_regions(&mut ctx).await.unwrap();
435 }
436
437 #[tokio::test]
438 async fn test_sync_regions_retryable() {
439 let env = TestingEnv::new();
440 let table_id = 1024;
441 let mut persistent_context = new_persistent_context(table_id, vec![], vec![]);
442 persistent_context.group_prepare_result = Some(test_prepare_result(table_id));
443
444 let mut ctx = env.create_context(persistent_context);
445 let region_routes = vec![RegionRoute {
446 region: Region {
447 id: RegionId::new(table_id, 3),
448 ..Default::default()
449 },
450 leader_peer: Some(Peer::empty(1)),
451 ..Default::default()
452 }];
453 let sync_region = SyncRegion { region_routes };
454
455 let err = sync_region.sync_regions(&mut ctx).await.unwrap_err();
456 assert_matches!(err, Error::RetryLater { .. });
457 }
458}