1use std::any::Any;
16use std::collections::HashMap;
17use std::time::{Duration, Instant};
18
19use api::v1::meta::MailboxMessage;
20use common_meta::instruction::{Instruction, InstructionReply, SyncRegionReply, SyncRegionsReply};
21use common_meta::peer::Peer;
22use common_meta::rpc::router::RegionRoute;
23use common_procedure::{Context as ProcedureContext, Status};
24use common_telemetry::info;
25use common_telemetry::tracing_context::TracingContext;
26use futures::future::join_all;
27use serde::{Deserialize, Serialize};
28use snafu::{OptionExt, ResultExt, ensure};
29use store_api::region_engine::SyncRegionFromRequest;
30use store_api::storage::RegionId;
31
32use crate::error::{self, Error, Result};
33use crate::handler::HeartbeatMailbox;
34use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
35use crate::procedure::repartition::group::utils::{
36 HandleMultipleResult, group_region_routes_by_peer, handle_multiple_results,
37};
38use crate::procedure::repartition::group::{Context, State};
39use crate::procedure::utils::ErrorStrategy;
40use crate::service::mailbox::{Channel, MailboxRef};
41
42const DEFAULT_SYNC_REGION_PARALLELISM: usize = 3;
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct SyncRegion {
47 pub region_routes: Vec<RegionRoute>,
48}
49
50#[async_trait::async_trait]
51#[typetag::serde]
52impl State for SyncRegion {
53 async fn next(
54 &mut self,
55 ctx: &mut Context,
56 _procedure_ctx: &ProcedureContext,
57 ) -> Result<(Box<dyn State>, Status)> {
58 Self::flush_central_region(ctx).await?;
59 self.sync_regions(ctx).await?;
60
61 Ok((
62 Box::new(UpdateMetadata::ApplyStaging),
63 Status::executing(true),
64 ))
65 }
66
67 fn as_any(&self) -> &dyn Any {
68 self
69 }
70}
71
72impl SyncRegion {
73 async fn flush_central_region(ctx: &mut Context) -> Result<()> {
74 let operation_timeout =
75 ctx.next_operation_timeout()
76 .context(error::ExceededDeadlineSnafu {
77 operation: "Flush central region",
78 })?;
79 let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
80
81 crate::procedure::utils::flush_region(
82 &ctx.mailbox,
83 &ctx.server_addr,
84 &[prepare_result.central_region],
85 &prepare_result.central_region_datanode,
86 operation_timeout,
87 ErrorStrategy::Retry,
88 )
89 .await
90 }
91
92 fn build_sync_region_instructions(
94 central_region: RegionId,
95 region_routes: &[RegionRoute],
96 ) -> HashMap<Peer, Vec<common_meta::instruction::SyncRegion>> {
97 let target_region_routes_by_peer = group_region_routes_by_peer(region_routes);
98 let mut instructions = HashMap::with_capacity(target_region_routes_by_peer.len());
99
100 for (peer, region_ids) in target_region_routes_by_peer {
101 let sync_regions = region_ids
102 .into_iter()
103 .map(|region_id| {
104 let request = SyncRegionFromRequest::FromRegion {
105 source_region_id: central_region,
106 parallelism: DEFAULT_SYNC_REGION_PARALLELISM,
107 };
108 common_meta::instruction::SyncRegion { region_id, request }
109 })
110 .collect();
111 instructions.insert((*peer).clone(), sync_regions);
112 }
113
114 instructions
115 }
116
117 async fn sync_regions(&self, ctx: &mut Context) -> Result<()> {
119 let table_id = ctx.persistent_ctx.table_id;
120 let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
121 let instructions = Self::build_sync_region_instructions(
122 prepare_result.central_region,
123 &self.region_routes,
124 );
125 let operation_timeout =
126 ctx.next_operation_timeout()
127 .context(error::ExceededDeadlineSnafu {
128 operation: "Sync regions",
129 })?;
130
131 let (peers, tasks): (Vec<_>, Vec<_>) = instructions
132 .iter()
133 .map(|(peer, sync_regions)| {
134 (
135 peer,
136 Self::sync_region(
137 &ctx.mailbox,
138 &ctx.server_addr,
139 peer,
140 sync_regions,
141 operation_timeout,
142 ),
143 )
144 })
145 .unzip();
146
147 info!(
148 "Sent sync regions instructions to peers: {:?} for repartition table {}",
149 peers, table_id
150 );
151
152 let format_err_msg = |idx: usize, error: &Error| {
153 let peer = peers[idx];
154 format!(
155 "Failed to sync regions on datanode {:?}, error: {:?}",
156 peer, error
157 )
158 };
159
160 let results = join_all(tasks).await;
161 let result = handle_multiple_results(&results);
162
163 match result {
164 HandleMultipleResult::AllSuccessful => Ok(()),
165 HandleMultipleResult::AllRetryable(retryable_errors) => error::RetryLaterSnafu {
166 reason: format!(
167 "All retryable errors during syncing regions for repartition table {}: {:?}",
168 table_id,
169 retryable_errors
170 .iter()
171 .map(|(idx, error)| format_err_msg(*idx, error))
172 .collect::<Vec<_>>()
173 .join(",")
174 ),
175 }
176 .fail(),
177 HandleMultipleResult::AllNonRetryable(non_retryable_errors) => error::UnexpectedSnafu {
178 violated: format!(
179 "All non retryable errors during syncing regions for repartition table {}: {:?}",
180 table_id,
181 non_retryable_errors
182 .iter()
183 .map(|(idx, error)| format_err_msg(*idx, error))
184 .collect::<Vec<_>>()
185 .join(",")
186 ),
187 }
188 .fail(),
189 HandleMultipleResult::PartialRetryable {
190 retryable_errors,
191 non_retryable_errors,
192 } => error::UnexpectedSnafu {
193 violated: format!(
194 "Partial retryable errors during syncing regions for repartition table {}: {:?}, non retryable errors: {:?}",
195 table_id,
196 retryable_errors
197 .iter()
198 .map(|(idx, error)| format_err_msg(*idx, error))
199 .collect::<Vec<_>>()
200 .join(","),
201 non_retryable_errors
202 .iter()
203 .map(|(idx, error)| format_err_msg(*idx, error))
204 .collect::<Vec<_>>()
205 .join(","),
206 ),
207 }
208 .fail(),
209 }
210 }
211
212 async fn sync_region(
214 mailbox: &MailboxRef,
215 server_addr: &str,
216 peer: &Peer,
217 sync_regions: &[common_meta::instruction::SyncRegion],
218 timeout: Duration,
219 ) -> Result<()> {
220 let ch = Channel::Datanode(peer.id);
221 let instruction = Instruction::SyncRegions(sync_regions.to_vec());
222 let tracing_ctx = TracingContext::from_current_span();
223 let message = MailboxMessage::json_message(
224 &format!(
225 "Sync regions: {:?}",
226 sync_regions.iter().map(|r| r.region_id).collect::<Vec<_>>()
227 ),
228 &format!("Metasrv@{}", server_addr),
229 &format!("Datanode-{}@{}", peer.id, peer.addr),
230 common_time::util::current_time_millis(),
231 &instruction,
232 Some(tracing_ctx.to_w3c()),
233 )
234 .with_context(|_| error::SerializeToJsonSnafu {
235 input: instruction.to_string(),
236 })?;
237
238 let now = std::time::Instant::now();
239 let receiver = mailbox.send(&ch, message, timeout).await;
240
241 let receiver = match receiver {
242 Ok(receiver) => receiver,
243 Err(error::Error::PusherNotFound { .. }) => error::RetryLaterSnafu {
244 reason: format!(
245 "Pusher not found for sync regions on datanode {:?}, elapsed: {:?}",
246 peer,
247 now.elapsed()
248 ),
249 }
250 .fail()?,
251 Err(err) => {
252 return Err(err);
253 }
254 };
255
256 match receiver.await {
257 Ok(msg) => {
258 let reply = HeartbeatMailbox::json_reply(&msg)?;
259 info!(
260 "Received sync regions reply: {:?}, elapsed: {:?}",
261 reply,
262 now.elapsed()
263 );
264 let InstructionReply::SyncRegions(SyncRegionsReply { replies }) = reply else {
265 return error::UnexpectedInstructionReplySnafu {
266 mailbox_message: msg.to_string(),
267 reason: "expect sync regions reply",
268 }
269 .fail();
270 };
271 for reply in replies {
272 Self::handle_sync_region_reply(&reply, &now, peer)?;
273 }
274 Ok(())
275 }
276 Err(error::Error::MailboxTimeout { .. }) => {
277 let reason = format!(
278 "Mailbox received timeout for sync regions on datanode {:?}, elapsed: {:?}",
279 peer,
280 now.elapsed()
281 );
282 error::RetryLaterSnafu { reason }.fail()
283 }
284 Err(err) => Err(err),
285 }
286 }
287
288 fn handle_sync_region_reply(
289 SyncRegionReply {
290 region_id,
291 ready,
292 exists,
293 error,
294 }: &SyncRegionReply,
295 now: &Instant,
296 peer: &Peer,
297 ) -> Result<()> {
298 ensure!(
299 exists,
300 error::UnexpectedSnafu {
301 violated: format!(
302 "Region {} doesn't exist on datanode {:?}, elapsed: {:?}",
303 region_id,
304 peer,
305 now.elapsed()
306 )
307 }
308 );
309
310 if let Some(error) = error {
311 return error::RetryLaterSnafu {
312 reason: format!(
313 "Failed to sync region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
314 region_id,
315 peer,
316 error,
317 now.elapsed()
318 ),
319 }
320 .fail();
321 }
322
323 ensure!(
324 ready,
325 error::RetryLaterSnafu {
326 reason: format!(
327 "Region {} failed to sync on datanode {:?}, elapsed: {:?}",
328 region_id,
329 peer,
330 now.elapsed()
331 ),
332 }
333 );
334
335 Ok(())
336 }
337}
338
339#[cfg(test)]
340mod tests {
341 use std::assert_matches::assert_matches;
342
343 use common_meta::peer::Peer;
344 use common_meta::rpc::router::{Region, RegionRoute};
345 use store_api::region_engine::SyncRegionFromRequest;
346 use store_api::storage::RegionId;
347
348 use crate::error::Error;
349 use crate::procedure::repartition::group::GroupPrepareResult;
350 use crate::procedure::repartition::group::sync_region::SyncRegion;
351 use crate::procedure::repartition::test_util::{TestingEnv, new_persistent_context};
352 use crate::procedure::test_util::{new_sync_region_reply, send_mock_reply};
353 use crate::service::mailbox::Channel;
354
355 #[test]
356 fn test_build_sync_region_instructions() {
357 let table_id = 1024;
358 let central_region = RegionId::new(table_id, 1);
359 let region_routes = vec![RegionRoute {
360 region: Region {
361 id: RegionId::new(table_id, 3),
362 ..Default::default()
363 },
364 leader_peer: Some(Peer::empty(1)),
365 ..Default::default()
366 }];
367
368 let instructions =
369 SyncRegion::build_sync_region_instructions(central_region, ®ion_routes);
370 assert_eq!(instructions.len(), 1);
371 let peer_instructions = instructions.get(&Peer::empty(1)).unwrap();
372 assert_eq!(peer_instructions.len(), 1);
373 assert_eq!(peer_instructions[0].region_id, RegionId::new(table_id, 3));
374 let SyncRegionFromRequest::FromRegion {
375 source_region_id, ..
376 } = &peer_instructions[0].request
377 else {
378 panic!("expect from region request");
379 };
380 assert_eq!(*source_region_id, central_region);
381 }
382
383 fn test_prepare_result(table_id: u32) -> GroupPrepareResult {
384 GroupPrepareResult {
385 source_routes: vec![],
386 target_routes: vec![],
387 central_region: RegionId::new(table_id, 1),
388 central_region_datanode: Peer::empty(1),
389 }
390 }
391
392 #[tokio::test]
393 async fn test_sync_regions_all_successful() {
394 let mut env = TestingEnv::new();
395 let table_id = 1024;
396 let mut persistent_context = new_persistent_context(table_id, vec![], vec![]);
397 persistent_context.group_prepare_result = Some(test_prepare_result(table_id));
398
399 let (tx, rx) = tokio::sync::mpsc::channel(1);
400 env.mailbox_ctx
401 .insert_heartbeat_response_receiver(Channel::Datanode(1), tx)
402 .await;
403 send_mock_reply(env.mailbox_ctx.mailbox().clone(), rx, |id| {
404 Ok(new_sync_region_reply(
405 id,
406 RegionId::new(1024, 3),
407 true,
408 true,
409 None,
410 ))
411 });
412
413 let mut ctx = env.create_context(persistent_context);
414 let region_routes = vec![RegionRoute {
415 region: Region {
416 id: RegionId::new(table_id, 3),
417 ..Default::default()
418 },
419 leader_peer: Some(Peer::empty(1)),
420 ..Default::default()
421 }];
422 let sync_region = SyncRegion { region_routes };
423
424 sync_region.sync_regions(&mut ctx).await.unwrap();
425 }
426
427 #[tokio::test]
428 async fn test_sync_regions_retryable() {
429 let env = TestingEnv::new();
430 let table_id = 1024;
431 let mut persistent_context = new_persistent_context(table_id, vec![], vec![]);
432 persistent_context.group_prepare_result = Some(test_prepare_result(table_id));
433
434 let mut ctx = env.create_context(persistent_context);
435 let region_routes = vec![RegionRoute {
436 region: Region {
437 id: RegionId::new(table_id, 3),
438 ..Default::default()
439 },
440 leader_peer: Some(Peer::empty(1)),
441 ..Default::default()
442 }];
443 let sync_region = SyncRegion { region_routes };
444
445 let err = sync_region.sync_regions(&mut ctx).await.unwrap_err();
446 assert_matches!(err, Error::RetryLater { .. });
447 }
448}