1use std::any::Any;
16use std::ops::Div;
17
18use api::v1::meta::MailboxMessage;
19use common_meta::RegionIdent;
20use common_meta::distributed_time_constants::default_distributed_time_constants;
21use common_meta::instruction::{
22 Instruction, InstructionReply, OpenRegion, OpenRegionReason, SimpleReply,
23};
24use common_meta::key::datanode_table::RegionInfo;
25use common_procedure::{Context as ProcedureContext, Status};
26use common_telemetry::info;
27use common_telemetry::tracing_context::TracingContext;
28use serde::{Deserialize, Serialize};
29use snafu::{OptionExt, ResultExt};
30use store_api::region_engine::RegionRole;
31use store_api::region_request::RegionRequirements;
32use tokio::time::Instant;
33
34use crate::error::{self, Result};
35use crate::handler::HeartbeatMailbox;
36use crate::procedure::region_migration::flush_leader_region::PreFlushRegion;
37use crate::procedure::region_migration::{Context, RegionMigrationTriggerReason, State};
38use crate::procedure::utils::instruction_error_result;
39use crate::service::mailbox::Channel;
40
41#[derive(Debug, Serialize, Deserialize)]
42pub struct OpenCandidateRegion;
43
44#[async_trait::async_trait]
45#[typetag::serde]
46impl State for OpenCandidateRegion {
47 async fn next(
48 &mut self,
49 ctx: &mut Context,
50 _procedure_ctx: &ProcedureContext,
51 ) -> Result<(Box<dyn State>, Status)> {
52 let instruction = self.build_open_region_instruction(ctx).await?;
53 let now = Instant::now();
54 self.open_candidate_region(ctx, instruction).await?;
55 ctx.update_open_candidate_region_elapsed(now);
56
57 Ok((Box::new(PreFlushRegion), Status::executing(false)))
58 }
59
60 fn as_any(&self) -> &dyn Any {
61 self
62 }
63}
64
65impl OpenCandidateRegion {
66 async fn build_open_region_instruction(&self, ctx: &mut Context) -> Result<Instruction> {
71 let region_ids = ctx.persistent_ctx.region_ids.clone();
72 let from_peer_id = ctx.persistent_ctx.from_peer.id;
73 let to_peer_id = ctx.persistent_ctx.to_peer.id;
74 let reason = match ctx.persistent_ctx.trigger_reason {
75 RegionMigrationTriggerReason::Failover => OpenRegionReason::RegionFailover,
76 _ => OpenRegionReason::RegionMigration,
77 };
78 let datanode_table_values = ctx.get_from_peer_datanode_table_values().await?;
79 let mut open_regions = Vec::with_capacity(region_ids.len());
80
81 for region_id in region_ids {
82 let table_id = region_id.table_id();
83 let region_number = region_id.region_number();
84 let datanode_table_value = datanode_table_values.get(&table_id).context(
85 error::DatanodeTableNotFoundSnafu {
86 table_id,
87 datanode_id: from_peer_id,
88 },
89 )?;
90 let RegionInfo {
91 region_storage_path,
92 region_options,
93 region_wal_options,
94 engine,
95 } = datanode_table_value.region_info.clone();
96
97 open_regions.push(OpenRegion::new(
98 RegionIdent {
99 datanode_id: to_peer_id,
100 table_id,
101 region_number,
102 engine,
103 },
104 ®ion_storage_path,
105 region_options,
106 region_wal_options,
107 true,
108 Some(reason),
109 RegionRequirements::object_storage(),
110 ));
111 }
112
113 Ok(Instruction::OpenRegions(open_regions))
114 }
115
116 async fn open_candidate_region(
127 &self,
128 ctx: &mut Context,
129 open_instruction: Instruction,
130 ) -> Result<()> {
131 let pc = &ctx.persistent_ctx;
132 let vc = &mut ctx.volatile_ctx;
133 let region_ids = &pc.region_ids;
134 let candidate = &pc.to_peer;
135
136 if vc.opening_region_guards.is_empty() {
139 for region_id in region_ids {
140 let guard = ctx
142 .opening_region_keeper
143 .register_with_role(candidate.id, *region_id, RegionRole::Follower)
144 .context(error::RegionOperatingRaceSnafu {
145 peer_id: candidate.id,
146 region_id: *region_id,
147 })?;
148 vc.opening_region_guards.push(guard);
149 }
150 }
151
152 let tracing_ctx = TracingContext::from_current_span();
153 let msg = MailboxMessage::json_message(
154 &format!("Open candidate regions: {:?}", region_ids),
155 &format!("Metasrv@{}", ctx.server_addr()),
156 &format!("Datanode-{}@{}", candidate.id, candidate.addr),
157 common_time::util::current_time_millis(),
158 &open_instruction,
159 Some(tracing_ctx.to_w3c()),
160 )
161 .with_context(|_| error::SerializeToJsonSnafu {
162 input: open_instruction.to_string(),
163 })?;
164
165 let operation_timeout =
166 ctx.next_operation_timeout()
167 .context(error::ExceededDeadlineSnafu {
168 operation: "Open candidate region",
169 })?;
170 let operation_timeout = operation_timeout
171 .div(2)
172 .max(default_distributed_time_constants().region_lease);
173 let ch = Channel::Datanode(candidate.id);
174 let now = Instant::now();
175 let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
176
177 match receiver.await {
178 Ok(msg) => {
179 let reply = HeartbeatMailbox::json_reply(&msg)?;
180 info!(
181 "Received open region reply: {:?}, region: {:?}, elapsed: {:?}",
182 reply,
183 region_ids,
184 now.elapsed()
185 );
186 let InstructionReply::OpenRegions(SimpleReply { result, error }) = reply else {
187 return error::UnexpectedInstructionReplySnafu {
188 mailbox_message: msg.to_string(),
189 reason: "expect open region reply",
190 }
191 .fail();
192 };
193
194 if result {
195 Ok(())
196 } else if let Some(error) = error {
197 instruction_error_result(
198 &error,
199 format!(
200 "Region {region_ids:?} is not opened by datanode {:?}, error: {error:?}, elapsed: {:?}",
201 candidate,
202 now.elapsed()
203 ),
204 )
205 } else {
206 error::UnexpectedSnafu {
207 violated: format!(
208 "Region {region_ids:?} is not opened by datanode {:?}, but error is absent, elapsed: {:?}",
209 candidate,
210 now.elapsed()
211 ),
212 }
213 .fail()
214 }
215 }
216 Err(error::Error::MailboxTimeout { .. }) => {
217 let reason = format!(
218 "Mailbox received timeout for open candidate region {region_ids:?} on datanode {:?}, elapsed: {:?}",
219 candidate,
220 now.elapsed()
221 );
222 error::RetryLaterSnafu { reason }.fail()
223 }
224 Err(e) => Err(e),
225 }
226 }
227}
228
229#[cfg(test)]
230mod tests {
231 use std::assert_matches;
232 use std::collections::HashMap;
233
234 use common_catalog::consts::MITO2_ENGINE;
235 use common_error::ext::RetryHint;
236 use common_error::status_code::StatusCode;
237 use common_meta::DatanodeId;
238 use common_meta::instruction::InstructionError;
239 use common_meta::key::table_route::TableRouteValue;
240 use common_meta::key::test_utils::new_test_table_info;
241 use common_meta::peer::Peer;
242 use common_meta::rpc::router::{Region, RegionRoute};
243 use store_api::storage::RegionId;
244
245 use super::*;
246 use crate::error::Error;
247 use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
248 use crate::procedure::region_migration::{ContextFactory, PersistentContext};
249 use crate::procedure::test_util::{
250 new_close_region_reply, new_open_region_reply, new_open_region_reply_with_error,
251 send_mock_reply,
252 };
253
254 fn new_persistent_context() -> PersistentContext {
255 test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
256 }
257
258 fn new_mock_open_instruction(datanode_id: DatanodeId, region_id: RegionId) -> Instruction {
259 Instruction::OpenRegions(vec![OpenRegion::new(
260 RegionIdent {
261 datanode_id,
262 table_id: region_id.table_id(),
263 region_number: region_id.region_number(),
264 engine: MITO2_ENGINE.to_string(),
265 },
266 "/bar/foo/region/",
267 Default::default(),
268 Default::default(),
269 true,
270 Some(OpenRegionReason::RegionMigration),
271 RegionRequirements::object_storage(),
272 )])
273 }
274
275 #[tokio::test]
276 async fn test_datanode_table_is_not_found_error() {
277 let state = OpenCandidateRegion;
278 let persistent_context = new_persistent_context();
279 let env = TestingEnv::new();
280 let mut ctx = env.context_factory().new_context(persistent_context);
281
282 let err = state
283 .build_open_region_instruction(&mut ctx)
284 .await
285 .unwrap_err();
286
287 assert_matches!(err, Error::DatanodeTableNotFound { .. });
288 assert!(!err.is_retryable());
289 }
290
291 #[tokio::test]
292 async fn test_build_open_region_instruction_reason() {
293 let state = OpenCandidateRegion;
294 let mut persistent_context = new_persistent_context();
295 let from_peer_id = persistent_context.from_peer.id;
296 let region_id = persistent_context.region_ids[0];
297 let env = TestingEnv::new();
298
299 let table_info = new_test_table_info(1024);
300 let region_routes = vec![RegionRoute {
301 region: Region::new_test(region_id),
302 leader_peer: Some(Peer::empty(from_peer_id)),
303 ..Default::default()
304 }];
305 env.table_metadata_manager()
306 .create_table_metadata(
307 table_info,
308 TableRouteValue::physical(region_routes),
309 HashMap::default(),
310 )
311 .await
312 .unwrap();
313
314 let mut ctx = env
315 .context_factory()
316 .new_context(persistent_context.clone());
317 let instruction = state.build_open_region_instruction(&mut ctx).await.unwrap();
318 let open_regions = instruction.into_open_regions().unwrap();
319 assert_eq!(
320 Some(OpenRegionReason::RegionMigration),
321 open_regions[0].reason
322 );
323 assert_eq!(
324 RegionRequirements::object_storage(),
325 open_regions[0].requirements
326 );
327
328 persistent_context.trigger_reason = RegionMigrationTriggerReason::Failover;
329 let mut ctx = env.context_factory().new_context(persistent_context);
330 let instruction = state.build_open_region_instruction(&mut ctx).await.unwrap();
331 let open_regions = instruction.into_open_regions().unwrap();
332 assert_eq!(
333 Some(OpenRegionReason::RegionFailover),
334 open_regions[0].reason
335 );
336 assert_eq!(
337 RegionRequirements::object_storage(),
338 open_regions[0].requirements
339 );
340 }
341
342 #[tokio::test]
343 async fn test_datanode_is_unreachable() {
344 let state = OpenCandidateRegion;
345 let persistent_context = new_persistent_context();
348 let region_id = persistent_context.region_ids[0];
349 let to_peer_id = persistent_context.to_peer.id;
350 let env = TestingEnv::new();
351 let mut ctx = env.context_factory().new_context(persistent_context);
352
353 let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
354 let err = state
355 .open_candidate_region(&mut ctx, open_instruction)
356 .await
357 .unwrap_err();
358
359 assert_matches!(err, Error::PusherNotFound { .. });
360 assert!(!err.is_retryable());
361 }
362
363 #[tokio::test]
364 async fn test_candidate_region_opening_error() {
365 let state = OpenCandidateRegion;
366 let persistent_context = new_persistent_context();
369 let region_id = persistent_context.region_ids[0];
370 let to_peer_id = persistent_context.to_peer.id;
371
372 let env = TestingEnv::new();
373 let mut ctx = env.context_factory().new_context(persistent_context);
374 let opening_region_keeper = env.opening_region_keeper();
375 let _guard = opening_region_keeper
376 .register_with_role(to_peer_id, region_id, RegionRole::Follower)
377 .unwrap();
378
379 let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
380 let err = state
381 .open_candidate_region(&mut ctx, open_instruction)
382 .await
383 .unwrap_err();
384
385 assert_matches!(err, Error::RegionOperatingRace { .. });
386 assert!(!err.is_retryable());
387 }
388
389 #[tokio::test]
390 async fn test_unexpected_instruction_reply() {
391 let state = OpenCandidateRegion;
392 let persistent_context = new_persistent_context();
395 let region_id = persistent_context.region_ids[0];
396 let to_peer_id = persistent_context.to_peer.id;
397
398 let mut env = TestingEnv::new();
399 let mut ctx = env.context_factory().new_context(persistent_context);
400 let mailbox_ctx = env.mailbox_context();
401 let mailbox = mailbox_ctx.mailbox().clone();
402
403 let (tx, rx) = tokio::sync::mpsc::channel(1);
404
405 mailbox_ctx
406 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
407 .await;
408
409 send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
411
412 let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
413 let err = state
414 .open_candidate_region(&mut ctx, open_instruction)
415 .await
416 .unwrap_err();
417
418 assert_matches!(err, Error::UnexpectedInstructionReply { .. });
419 assert!(!err.is_retryable());
420 }
421
422 #[tokio::test]
423 async fn test_instruction_exceeded_deadline() {
424 let state = OpenCandidateRegion;
425 let persistent_context = new_persistent_context();
428 let region_id = persistent_context.region_ids[0];
429 let to_peer_id = persistent_context.to_peer.id;
430
431 let mut env = TestingEnv::new();
432 let mut ctx = env.context_factory().new_context(persistent_context);
433 let mailbox_ctx = env.mailbox_context();
434 let mailbox = mailbox_ctx.mailbox().clone();
435
436 let (tx, rx) = tokio::sync::mpsc::channel(1);
437
438 mailbox_ctx
439 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
440 .await;
441
442 send_mock_reply(mailbox, rx, |id| {
444 Err(error::MailboxTimeoutSnafu { id }.build())
445 });
446
447 let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
448 let err = state
449 .open_candidate_region(&mut ctx, open_instruction)
450 .await
451 .unwrap_err();
452
453 assert_matches!(err, Error::RetryLater { .. });
454 assert!(err.is_retryable());
455 }
456
457 #[tokio::test]
458 async fn test_open_candidate_region_failed() {
459 let state = OpenCandidateRegion;
460 let persistent_context = new_persistent_context();
463 let region_id = persistent_context.region_ids[0];
464 let to_peer_id = persistent_context.to_peer.id;
465 let mut env = TestingEnv::new();
466
467 let mut ctx = env.context_factory().new_context(persistent_context);
468 let mailbox_ctx = env.mailbox_context();
469 let mailbox = mailbox_ctx.mailbox().clone();
470
471 let (tx, rx) = tokio::sync::mpsc::channel(1);
472
473 mailbox_ctx
474 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
475 .await;
476
477 send_mock_reply(mailbox, rx, |id| {
478 Ok(new_open_region_reply(
479 id,
480 false,
481 Some("test mocked".to_string()),
482 ))
483 });
484
485 let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
486 let err = state
487 .open_candidate_region(&mut ctx, open_instruction)
488 .await
489 .unwrap_err();
490
491 assert_matches!(err, Error::RetryLater { .. });
492 assert!(err.is_retryable());
493 assert!(format!("{err:?}").contains("test mocked"));
494 }
495
496 #[tokio::test]
497 async fn test_open_candidate_region_non_retryable_instruction_error() {
498 let state = OpenCandidateRegion;
499 let persistent_context = new_persistent_context();
500 let region_id = persistent_context.region_ids[0];
501 let to_peer_id = persistent_context.to_peer.id;
502 let mut env = TestingEnv::new();
503
504 let mut ctx = env.context_factory().new_context(persistent_context);
505 let mailbox_ctx = env.mailbox_context();
506 let mailbox = mailbox_ctx.mailbox().clone();
507
508 let (tx, rx) = tokio::sync::mpsc::channel(1);
509
510 mailbox_ctx
511 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
512 .await;
513
514 send_mock_reply(mailbox, rx, |id| {
515 Ok(new_open_region_reply_with_error(
516 id,
517 false,
518 Some(InstructionError {
519 code: StatusCode::Internal,
520 message: "non retryable mocked".to_string(),
521 retry_hint: RetryHint::NonRetryable,
522 }),
523 ))
524 });
525
526 let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
527 let err = state
528 .open_candidate_region(&mut ctx, open_instruction)
529 .await
530 .unwrap_err();
531
532 assert_matches!(err, Error::Unexpected { .. });
533 assert!(!err.is_retryable());
534 assert!(format!("{err:?}").contains("non retryable mocked"));
535 }
536
537 #[tokio::test]
538 async fn test_open_candidate_region_false_without_error_is_unexpected() {
539 let state = OpenCandidateRegion;
540 let persistent_context = new_persistent_context();
541 let region_id = persistent_context.region_ids[0];
542 let to_peer_id = persistent_context.to_peer.id;
543 let mut env = TestingEnv::new();
544
545 let mut ctx = env.context_factory().new_context(persistent_context);
546 let mailbox_ctx = env.mailbox_context();
547 let mailbox = mailbox_ctx.mailbox().clone();
548
549 let (tx, rx) = tokio::sync::mpsc::channel(1);
550
551 mailbox_ctx
552 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
553 .await;
554
555 send_mock_reply(mailbox, rx, |id| {
556 Ok(new_open_region_reply_with_error(id, false, None))
557 });
558
559 let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
560 let err = state
561 .open_candidate_region(&mut ctx, open_instruction)
562 .await
563 .unwrap_err();
564
565 assert_matches!(err, Error::Unexpected { .. });
566 assert!(!err.is_retryable());
567 }
568
569 #[tokio::test]
570 async fn test_next_flush_leader_region_state() {
571 let mut state = Box::new(OpenCandidateRegion);
572 let persistent_context = new_persistent_context();
575 let from_peer_id = persistent_context.from_peer.id;
576 let region_id = persistent_context.region_ids[0];
577 let to_peer_id = persistent_context.to_peer.id;
578 let mut env = TestingEnv::new();
579
580 let table_info = new_test_table_info(1024);
582 let region_routes = vec![RegionRoute {
583 region: Region::new_test(region_id),
584 leader_peer: Some(Peer::empty(from_peer_id)),
585 ..Default::default()
586 }];
587
588 env.table_metadata_manager()
589 .create_table_metadata(
590 table_info,
591 TableRouteValue::physical(region_routes),
592 HashMap::default(),
593 )
594 .await
595 .unwrap();
596
597 let mut ctx = env.context_factory().new_context(persistent_context);
598 let mailbox_ctx = env.mailbox_context();
599 let mailbox = mailbox_ctx.mailbox().clone();
600
601 let (tx, rx) = tokio::sync::mpsc::channel(1);
602
603 mailbox_ctx
604 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
605 .await;
606
607 send_mock_reply(mailbox, rx, |id| Ok(new_open_region_reply(id, true, None)));
608 let procedure_ctx = new_procedure_context();
609 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
610 let vc = ctx.volatile_ctx;
611 assert_eq!(vc.opening_region_guards[0].info(), (to_peer_id, region_id));
612
613 let flush_leader_region = next.as_any().downcast_ref::<PreFlushRegion>().unwrap();
614 assert_matches!(flush_leader_region, PreFlushRegion);
615 }
616}