1use std::any::Any;
16use std::collections::HashMap;
17use std::time::{Duration, Instant};
18
19use api::v1::meta::MailboxMessage;
20use common_meta::instruction::{Instruction, InstructionReply, SyncRegionReply, SyncRegionsReply};
21use common_meta::peer::Peer;
22use common_meta::rpc::router::RegionRoute;
23use common_procedure::{Context as ProcedureContext, Status};
24use common_telemetry::info;
25use futures::future::join_all;
26use serde::{Deserialize, Serialize};
27use snafu::{OptionExt, ResultExt, ensure};
28use store_api::region_engine::SyncRegionFromRequest;
29use store_api::storage::RegionId;
30
31use crate::error::{self, Error, Result};
32use crate::handler::HeartbeatMailbox;
33use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
34use crate::procedure::repartition::group::utils::{
35 HandleMultipleResult, group_region_routes_by_peer, handle_multiple_results,
36};
37use crate::procedure::repartition::group::{Context, State};
38use crate::procedure::utils::ErrorStrategy;
39use crate::service::mailbox::{Channel, MailboxRef};
40
41const DEFAULT_SYNC_REGION_PARALLELISM: usize = 3;
42
43#[derive(Debug, Clone, Serialize, Deserialize)]
45pub struct SyncRegion {
46 pub region_routes: Vec<RegionRoute>,
47}
48
49#[async_trait::async_trait]
50#[typetag::serde]
51impl State for SyncRegion {
52 async fn next(
53 &mut self,
54 ctx: &mut Context,
55 _procedure_ctx: &ProcedureContext,
56 ) -> Result<(Box<dyn State>, Status)> {
57 Self::flush_central_region(ctx).await?;
58 self.sync_regions(ctx).await?;
59
60 Ok((
61 Box::new(UpdateMetadata::ApplyStaging),
62 Status::executing(true),
63 ))
64 }
65
66 fn as_any(&self) -> &dyn Any {
67 self
68 }
69}
70
71impl SyncRegion {
72 async fn flush_central_region(ctx: &mut Context) -> Result<()> {
73 let operation_timeout =
74 ctx.next_operation_timeout()
75 .context(error::ExceededDeadlineSnafu {
76 operation: "Flush central region",
77 })?;
78 let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
79
80 crate::procedure::utils::flush_region(
81 &ctx.mailbox,
82 &ctx.server_addr,
83 &[prepare_result.central_region],
84 &prepare_result.central_region_datanode,
85 operation_timeout,
86 ErrorStrategy::Retry,
87 )
88 .await
89 }
90
91 fn build_sync_region_instructions(
93 central_region: RegionId,
94 region_routes: &[RegionRoute],
95 ) -> HashMap<Peer, Vec<common_meta::instruction::SyncRegion>> {
96 let target_region_routes_by_peer = group_region_routes_by_peer(region_routes);
97 let mut instructions = HashMap::with_capacity(target_region_routes_by_peer.len());
98
99 for (peer, region_ids) in target_region_routes_by_peer {
100 let sync_regions = region_ids
101 .into_iter()
102 .map(|region_id| {
103 let request = SyncRegionFromRequest::FromRegion {
104 source_region_id: central_region,
105 parallelism: DEFAULT_SYNC_REGION_PARALLELISM,
106 };
107 common_meta::instruction::SyncRegion { region_id, request }
108 })
109 .collect();
110 instructions.insert((*peer).clone(), sync_regions);
111 }
112
113 instructions
114 }
115
116 async fn sync_regions(&self, ctx: &mut Context) -> Result<()> {
118 let table_id = ctx.persistent_ctx.table_id;
119 let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
120 let instructions = Self::build_sync_region_instructions(
121 prepare_result.central_region,
122 &self.region_routes,
123 );
124 let operation_timeout =
125 ctx.next_operation_timeout()
126 .context(error::ExceededDeadlineSnafu {
127 operation: "Sync regions",
128 })?;
129
130 let (peers, tasks): (Vec<_>, Vec<_>) = instructions
131 .iter()
132 .map(|(peer, sync_regions)| {
133 (
134 peer,
135 Self::sync_region(
136 &ctx.mailbox,
137 &ctx.server_addr,
138 peer,
139 sync_regions,
140 operation_timeout,
141 ),
142 )
143 })
144 .unzip();
145
146 info!(
147 "Sent sync regions instructions to peers: {:?} for repartition table {}",
148 peers, table_id
149 );
150
151 let format_err_msg = |idx: usize, error: &Error| {
152 let peer = peers[idx];
153 format!(
154 "Failed to sync regions on datanode {:?}, error: {:?}",
155 peer, error
156 )
157 };
158
159 let results = join_all(tasks).await;
160 let result = handle_multiple_results(&results);
161
162 match result {
163 HandleMultipleResult::AllSuccessful => Ok(()),
164 HandleMultipleResult::AllRetryable(retryable_errors) => error::RetryLaterSnafu {
165 reason: format!(
166 "All retryable errors during syncing regions for repartition table {}: {:?}",
167 table_id,
168 retryable_errors
169 .iter()
170 .map(|(idx, error)| format_err_msg(*idx, error))
171 .collect::<Vec<_>>()
172 .join(",")
173 ),
174 }
175 .fail(),
176 HandleMultipleResult::AllNonRetryable(non_retryable_errors) => error::UnexpectedSnafu {
177 violated: format!(
178 "All non retryable errors during syncing regions for repartition table {}: {:?}",
179 table_id,
180 non_retryable_errors
181 .iter()
182 .map(|(idx, error)| format_err_msg(*idx, error))
183 .collect::<Vec<_>>()
184 .join(",")
185 ),
186 }
187 .fail(),
188 HandleMultipleResult::PartialRetryable {
189 retryable_errors,
190 non_retryable_errors,
191 } => error::UnexpectedSnafu {
192 violated: format!(
193 "Partial retryable errors during syncing regions for repartition table {}: {:?}, non retryable errors: {:?}",
194 table_id,
195 retryable_errors
196 .iter()
197 .map(|(idx, error)| format_err_msg(*idx, error))
198 .collect::<Vec<_>>()
199 .join(","),
200 non_retryable_errors
201 .iter()
202 .map(|(idx, error)| format_err_msg(*idx, error))
203 .collect::<Vec<_>>()
204 .join(","),
205 ),
206 }
207 .fail(),
208 }
209 }
210
211 async fn sync_region(
213 mailbox: &MailboxRef,
214 server_addr: &str,
215 peer: &Peer,
216 sync_regions: &[common_meta::instruction::SyncRegion],
217 timeout: Duration,
218 ) -> Result<()> {
219 let ch = Channel::Datanode(peer.id);
220 let instruction = Instruction::SyncRegions(sync_regions.to_vec());
221 let message = MailboxMessage::json_message(
222 &format!(
223 "Sync regions: {:?}",
224 sync_regions.iter().map(|r| r.region_id).collect::<Vec<_>>()
225 ),
226 &format!("Metasrv@{}", server_addr),
227 &format!("Datanode-{}@{}", peer.id, peer.addr),
228 common_time::util::current_time_millis(),
229 &instruction,
230 )
231 .with_context(|_| error::SerializeToJsonSnafu {
232 input: instruction.to_string(),
233 })?;
234
235 let now = std::time::Instant::now();
236 let receiver = mailbox.send(&ch, message, timeout).await;
237
238 let receiver = match receiver {
239 Ok(receiver) => receiver,
240 Err(error::Error::PusherNotFound { .. }) => error::RetryLaterSnafu {
241 reason: format!(
242 "Pusher not found for sync regions on datanode {:?}, elapsed: {:?}",
243 peer,
244 now.elapsed()
245 ),
246 }
247 .fail()?,
248 Err(err) => {
249 return Err(err);
250 }
251 };
252
253 match receiver.await {
254 Ok(msg) => {
255 let reply = HeartbeatMailbox::json_reply(&msg)?;
256 info!(
257 "Received sync regions reply: {:?}, elapsed: {:?}",
258 reply,
259 now.elapsed()
260 );
261 let InstructionReply::SyncRegions(SyncRegionsReply { replies }) = reply else {
262 return error::UnexpectedInstructionReplySnafu {
263 mailbox_message: msg.to_string(),
264 reason: "expect sync regions reply",
265 }
266 .fail();
267 };
268 for reply in replies {
269 Self::handle_sync_region_reply(&reply, &now, peer)?;
270 }
271 Ok(())
272 }
273 Err(error::Error::MailboxTimeout { .. }) => {
274 let reason = format!(
275 "Mailbox received timeout for sync regions on datanode {:?}, elapsed: {:?}",
276 peer,
277 now.elapsed()
278 );
279 error::RetryLaterSnafu { reason }.fail()
280 }
281 Err(err) => Err(err),
282 }
283 }
284
285 fn handle_sync_region_reply(
286 SyncRegionReply {
287 region_id,
288 ready,
289 exists,
290 error,
291 }: &SyncRegionReply,
292 now: &Instant,
293 peer: &Peer,
294 ) -> Result<()> {
295 ensure!(
296 exists,
297 error::UnexpectedSnafu {
298 violated: format!(
299 "Region {} doesn't exist on datanode {:?}, elapsed: {:?}",
300 region_id,
301 peer,
302 now.elapsed()
303 )
304 }
305 );
306
307 if let Some(error) = error {
308 return error::RetryLaterSnafu {
309 reason: format!(
310 "Failed to sync region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
311 region_id,
312 peer,
313 error,
314 now.elapsed()
315 ),
316 }
317 .fail();
318 }
319
320 ensure!(
321 ready,
322 error::RetryLaterSnafu {
323 reason: format!(
324 "Region {} failed to sync on datanode {:?}, elapsed: {:?}",
325 region_id,
326 peer,
327 now.elapsed()
328 ),
329 }
330 );
331
332 Ok(())
333 }
334}
335
336#[cfg(test)]
337mod tests {
338 use std::assert_matches::assert_matches;
339
340 use common_meta::peer::Peer;
341 use common_meta::rpc::router::{Region, RegionRoute};
342 use store_api::region_engine::SyncRegionFromRequest;
343 use store_api::storage::RegionId;
344
345 use crate::error::Error;
346 use crate::procedure::repartition::group::GroupPrepareResult;
347 use crate::procedure::repartition::group::sync_region::SyncRegion;
348 use crate::procedure::repartition::test_util::{TestingEnv, new_persistent_context};
349 use crate::procedure::test_util::{new_sync_region_reply, send_mock_reply};
350 use crate::service::mailbox::Channel;
351
352 #[test]
353 fn test_build_sync_region_instructions() {
354 let table_id = 1024;
355 let central_region = RegionId::new(table_id, 1);
356 let region_routes = vec![RegionRoute {
357 region: Region {
358 id: RegionId::new(table_id, 3),
359 ..Default::default()
360 },
361 leader_peer: Some(Peer::empty(1)),
362 ..Default::default()
363 }];
364
365 let instructions =
366 SyncRegion::build_sync_region_instructions(central_region, ®ion_routes);
367 assert_eq!(instructions.len(), 1);
368 let peer_instructions = instructions.get(&Peer::empty(1)).unwrap();
369 assert_eq!(peer_instructions.len(), 1);
370 assert_eq!(peer_instructions[0].region_id, RegionId::new(table_id, 3));
371 let SyncRegionFromRequest::FromRegion {
372 source_region_id, ..
373 } = &peer_instructions[0].request
374 else {
375 panic!("expect from region request");
376 };
377 assert_eq!(*source_region_id, central_region);
378 }
379
380 fn test_prepare_result(table_id: u32) -> GroupPrepareResult {
381 GroupPrepareResult {
382 source_routes: vec![],
383 target_routes: vec![],
384 central_region: RegionId::new(table_id, 1),
385 central_region_datanode: Peer::empty(1),
386 }
387 }
388
389 #[tokio::test]
390 async fn test_sync_regions_all_successful() {
391 let mut env = TestingEnv::new();
392 let table_id = 1024;
393 let mut persistent_context = new_persistent_context(table_id, vec![], vec![]);
394 persistent_context.group_prepare_result = Some(test_prepare_result(table_id));
395
396 let (tx, rx) = tokio::sync::mpsc::channel(1);
397 env.mailbox_ctx
398 .insert_heartbeat_response_receiver(Channel::Datanode(1), tx)
399 .await;
400 send_mock_reply(env.mailbox_ctx.mailbox().clone(), rx, |id| {
401 Ok(new_sync_region_reply(
402 id,
403 RegionId::new(1024, 3),
404 true,
405 true,
406 None,
407 ))
408 });
409
410 let mut ctx = env.create_context(persistent_context);
411 let region_routes = vec![RegionRoute {
412 region: Region {
413 id: RegionId::new(table_id, 3),
414 ..Default::default()
415 },
416 leader_peer: Some(Peer::empty(1)),
417 ..Default::default()
418 }];
419 let sync_region = SyncRegion { region_routes };
420
421 sync_region.sync_regions(&mut ctx).await.unwrap();
422 }
423
424 #[tokio::test]
425 async fn test_sync_regions_retryable() {
426 let env = TestingEnv::new();
427 let table_id = 1024;
428 let mut persistent_context = new_persistent_context(table_id, vec![], vec![]);
429 persistent_context.group_prepare_result = Some(test_prepare_result(table_id));
430
431 let mut ctx = env.create_context(persistent_context);
432 let region_routes = vec![RegionRoute {
433 region: Region {
434 id: RegionId::new(table_id, 3),
435 ..Default::default()
436 },
437 leader_peer: Some(Peer::empty(1)),
438 ..Default::default()
439 }];
440 let sync_region = SyncRegion { region_routes };
441
442 let err = sync_region.sync_regions(&mut ctx).await.unwrap_err();
443 assert_matches!(err, Error::RetryLater { .. });
444 }
445}