1use std::any::Any;
16use std::collections::HashSet;
17use std::time::Duration;
18
19use api::v1::meta::MailboxMessage;
20use common_meta::instruction::{
21 Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply, UpgradeRegionsReply,
22};
23use common_meta::lock_key::RemoteWalLock;
24use common_meta::wal_provider::extract_topic_from_wal_options;
25use common_procedure::{Context as ProcedureContext, Status};
26use common_telemetry::tracing_context::TracingContext;
27use common_telemetry::{error, info};
28use common_wal::options::WalOptions;
29use serde::{Deserialize, Serialize};
30use snafu::{OptionExt, ResultExt, ensure};
31use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
32use tokio::time::{Instant, sleep};
33
34use crate::error::{self, Result};
35use crate::handler::HeartbeatMailbox;
36use crate::procedure::region_migration::update_metadata::UpdateMetadata;
37use crate::procedure::region_migration::{Context, State};
38use crate::procedure::utils::instruction_error_result;
39use crate::service::mailbox::Channel;
40
41#[derive(Debug, Serialize, Deserialize)]
42pub struct UpgradeCandidateRegion {
43 pub(crate) optimistic_retry: usize,
45 pub(crate) retry_initial_interval: Duration,
47 pub(crate) require_ready: bool,
50}
51
52impl Default for UpgradeCandidateRegion {
53 fn default() -> Self {
54 Self {
55 optimistic_retry: 3,
56 retry_initial_interval: Duration::from_millis(500),
57 require_ready: true,
58 }
59 }
60}
61
62#[async_trait::async_trait]
63#[typetag::serde]
64impl State for UpgradeCandidateRegion {
65 async fn next(
66 &mut self,
67 ctx: &mut Context,
68 procedure_ctx: &ProcedureContext,
69 ) -> Result<(Box<dyn State>, Status)> {
70 let now = Instant::now();
71
72 let topics = self.get_kafka_topics(ctx).await?;
73 if self
74 .upgrade_region_with_retry(ctx, procedure_ctx, topics)
75 .await
76 {
77 ctx.update_upgrade_candidate_region_elapsed(now);
78 Ok((Box::new(UpdateMetadata::Upgrade), Status::executing(false)))
79 } else {
80 ctx.update_upgrade_candidate_region_elapsed(now);
81 Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false)))
82 }
83 }
84
85 fn as_any(&self) -> &dyn Any {
86 self
87 }
88}
89
90impl UpgradeCandidateRegion {
91 async fn get_kafka_topics(&self, ctx: &mut Context) -> Result<HashSet<String>> {
92 let table_regions = ctx.persistent_ctx.table_regions();
93 let datanode_table_values = ctx.get_from_peer_datanode_table_values().await?;
94 let mut topics = HashSet::new();
95 for (table_id, regions) in table_regions {
96 let Some(datanode_table_value) = datanode_table_values.get(&table_id) else {
97 continue;
98 };
99
100 let region_wal_options = &datanode_table_value.region_info.region_wal_options;
101
102 for region_id in regions {
103 let Some(WalOptions::Kafka(kafka_wal_options)) =
104 region_wal_options.get(®ion_id.region_number())
105 else {
106 continue;
107 };
108 if !topics.contains(&kafka_wal_options.topic) {
109 topics.insert(kafka_wal_options.topic.clone());
110 }
111 }
112 }
113
114 Ok(topics)
115 }
116
117 async fn build_upgrade_region_instruction(
119 &self,
120 ctx: &mut Context,
121 replay_timeout: Duration,
122 ) -> Result<Instruction> {
123 let region_ids = ctx.persistent_ctx.region_ids.clone();
124 let datanode_table_values = ctx.get_from_peer_datanode_table_values().await?;
125 let mut region_topic = Vec::with_capacity(region_ids.len());
126 for region_id in region_ids.iter() {
127 let table_id = region_id.table_id();
128 if let Some(datanode_table_value) = datanode_table_values.get(&table_id)
129 && let Some(topic) = extract_topic_from_wal_options(
130 *region_id,
131 &datanode_table_value.region_info.region_wal_options,
132 )
133 {
134 let is_metric_engine =
135 datanode_table_value.region_info.engine == METRIC_ENGINE_NAME;
136 region_topic.push((*region_id, topic, is_metric_engine));
137 }
138 }
139
140 let replay_checkpoints = ctx
141 .get_replay_checkpoints_with_topic_pruned_entry_ids(®ion_topic)
142 .await?;
143 let mut upgrade_regions = Vec::with_capacity(region_ids.len());
145 for region_id in region_ids {
146 let last_entry_id = ctx
147 .volatile_ctx
148 .leader_region_last_entry_ids
149 .get(®ion_id)
150 .copied();
151 let metadata_last_entry_id = ctx
152 .volatile_ctx
153 .leader_region_metadata_last_entry_ids
154 .get(®ion_id)
155 .copied();
156 let checkpoint = replay_checkpoints.get(®ion_id).copied();
157 upgrade_regions.push(UpgradeRegion {
158 region_id,
159 last_entry_id,
160 metadata_last_entry_id,
161 replay_timeout,
162 location_id: Some(ctx.persistent_ctx.from_peer.id),
163 replay_entry_id: checkpoint.map(|c| c.entry_id),
164 metadata_replay_entry_id: checkpoint.and_then(|c| c.metadata_entry_id),
165 });
166 }
167
168 Ok(Instruction::UpgradeRegions(upgrade_regions))
169 }
170
171 fn handle_upgrade_region_reply(
172 &self,
173 ctx: &mut Context,
174 UpgradeRegionReply {
175 region_id,
176 ready,
177 exists,
178 error,
179 }: &UpgradeRegionReply,
180 now: &Instant,
181 ) -> Result<()> {
182 let candidate = &ctx.persistent_ctx.to_peer;
183 if let Some(error) = error {
184 return instruction_error_result(
185 error,
186 format!(
187 "Failed to upgrade the region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
188 region_id,
189 candidate,
190 error,
191 now.elapsed()
192 ),
193 );
194 }
195
196 ensure!(
197 exists,
198 error::UnexpectedSnafu {
199 violated: format!(
200 "Candidate region {} doesn't exist on datanode {:?}",
201 region_id, candidate
202 )
203 }
204 );
205
206 if self.require_ready && !ready {
207 return error::RetryLaterSnafu {
208 reason: format!(
209 "Candidate region {} still replaying the wal on datanode {:?}, elapsed: {:?}",
210 region_id,
211 candidate,
212 now.elapsed()
213 ),
214 }
215 .fail();
216 }
217
218 Ok(())
219 }
220
221 async fn upgrade_region(&self, ctx: &mut Context) -> Result<()> {
236 let operation_timeout =
237 ctx.next_operation_timeout()
238 .context(error::ExceededDeadlineSnafu {
239 operation: "Upgrade region",
240 })?;
241 let upgrade_instruction = self
242 .build_upgrade_region_instruction(ctx, operation_timeout)
243 .await?;
244
245 let pc = &ctx.persistent_ctx;
246 let region_ids = &pc.region_ids;
247 let candidate = &pc.to_peer;
248
249 let tracing_ctx = TracingContext::from_current_span();
250 let msg = MailboxMessage::json_message(
251 &format!("Upgrade candidate regions: {:?}", region_ids),
252 &format!("Metasrv@{}", ctx.server_addr()),
253 &format!("Datanode-{}@{}", candidate.id, candidate.addr),
254 common_time::util::current_time_millis(),
255 &upgrade_instruction,
256 Some(tracing_ctx.to_w3c()),
257 )
258 .with_context(|_| error::SerializeToJsonSnafu {
259 input: upgrade_instruction.to_string(),
260 })?;
261
262 let ch = Channel::Datanode(candidate.id);
263 let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
264
265 let now = Instant::now();
266 match receiver.await {
267 Ok(msg) => {
268 let reply = HeartbeatMailbox::json_reply(&msg)?;
269 info!(
270 "Received upgrade region reply: {:?}, regions: {:?}, elapsed: {:?}",
271 reply,
272 region_ids,
273 now.elapsed()
274 );
275 let InstructionReply::UpgradeRegions(UpgradeRegionsReply { replies }) = reply
276 else {
277 return error::UnexpectedInstructionReplySnafu {
278 mailbox_message: msg.to_string(),
279 reason: "Unexpected reply of the upgrade region instruction",
280 }
281 .fail();
282 };
283 for reply in replies {
284 self.handle_upgrade_region_reply(ctx, &reply, &now)?;
285 }
286 Ok(())
287 }
288 Err(error::Error::MailboxTimeout { .. }) => {
289 let reason = format!(
290 "Mailbox received timeout for upgrade candidate regions {region_ids:?} on datanode {:?}, elapsed: {:?}",
291 candidate,
292 now.elapsed()
293 );
294 error::RetryLaterSnafu { reason }.fail()
295 }
296 Err(err) => Err(err),
297 }
298 }
299
300 async fn upgrade_region_with_retry(
304 &self,
305 ctx: &mut Context,
306 procedure_ctx: &ProcedureContext,
307 topics: HashSet<String>,
308 ) -> bool {
309 let mut retry = 0;
310 let mut upgraded = false;
311
312 let mut guards = Vec::with_capacity(topics.len());
313 loop {
314 let timer = Instant::now();
315 for topic in &topics {
317 guards.push(
318 procedure_ctx
319 .provider
320 .acquire_lock(&(RemoteWalLock::Read(topic.clone()).into()))
321 .await,
322 );
323 }
324
325 if let Err(err) = self.upgrade_region(ctx).await {
326 retry += 1;
327 ctx.update_operations_elapsed(timer);
328 if matches!(err, error::Error::ExceededDeadline { .. }) {
329 error!("Failed to upgrade region, exceeded deadline");
330 break;
331 } else if err.is_retryable() && retry < self.optimistic_retry {
332 error!("Failed to upgrade region, error: {err:?}, retry later");
333 sleep(self.retry_initial_interval).await;
334 } else {
335 error!("Failed to upgrade region, error: {err:?}");
336 break;
337 }
338 } else {
339 ctx.update_operations_elapsed(timer);
340 upgraded = true;
341 break;
342 }
343 }
344
345 upgraded
346 }
347}
348
349#[cfg(test)]
350mod tests {
351 use std::assert_matches;
352 use std::collections::HashMap;
353
354 use common_meta::key::table_route::TableRouteValue;
355 use common_meta::key::test_utils::new_test_table_info;
356 use common_meta::key::topic_name::TopicNameKey;
357 use common_meta::key::topic_region::{ReplayCheckpoint, TopicRegionKey, TopicRegionValue};
358 use common_meta::peer::Peer;
359 use common_meta::rpc::router::{Region, RegionRoute};
360 use common_meta::wal_provider::RegionWalOptions;
361 use common_wal::options::KafkaWalOptions;
362 use store_api::storage::RegionId;
363
364 use super::*;
365 use crate::error::Error;
366 use crate::procedure::region_migration::manager::RegionMigrationTriggerReason;
367 use crate::procedure::region_migration::test_util::{TestingEnv, new_procedure_context};
368 use crate::procedure::region_migration::{ContextFactory, PersistentContext};
369 use crate::procedure::test_util::{
370 new_close_region_reply, new_upgrade_region_reply, send_mock_reply,
371 };
372
373 fn new_persistent_context() -> PersistentContext {
374 PersistentContext::new(
375 vec![("greptime".into(), "public".into())],
376 Peer::empty(1),
377 Peer::empty(2),
378 vec![RegionId::new(1024, 1)],
379 Duration::from_millis(1000),
380 RegionMigrationTriggerReason::Manual,
381 )
382 }
383
384 fn kafka_wal_options(topic: &str) -> RegionWalOptions {
385 RegionWalOptions::from([(
386 1,
387 WalOptions::Kafka(KafkaWalOptions::new(topic.to_string())),
388 )])
389 }
390
391 async fn prepare_table_metadata(ctx: &Context, wal_options: RegionWalOptions) {
392 prepare_table_metadata_with_engine(ctx, wal_options, "engine").await;
393 }
394
395 async fn prepare_table_metadata_with_engine(
396 ctx: &Context,
397 wal_options: RegionWalOptions,
398 engine: &str,
399 ) {
400 let region_id = ctx.persistent_ctx.region_ids[0];
401 let mut table_info = new_test_table_info(region_id.table_id());
402 table_info.meta.engine = engine.to_string();
403 let region_routes = vec![RegionRoute {
404 region: Region::new_test(region_id),
405 leader_peer: Some(ctx.persistent_ctx.from_peer.clone()),
406 follower_peers: vec![ctx.persistent_ctx.to_peer.clone()],
407 ..Default::default()
408 }];
409 ctx.table_metadata_manager
410 .create_table_metadata(
411 table_info,
412 TableRouteValue::physical(region_routes),
413 wal_options,
414 )
415 .await
416 .unwrap();
417 }
418
419 #[tokio::test]
420 async fn test_build_upgrade_region_instruction_merges_topic_pruned_entry_id() {
421 let state = UpgradeCandidateRegion::default();
422 let persistent_context = new_persistent_context();
423 let env = TestingEnv::new();
424 let mut ctx = env.context_factory().new_context(persistent_context);
425 let region_id = ctx.persistent_ctx.region_ids[0];
426 let topic = "test_topic";
427 prepare_table_metadata(&ctx, kafka_wal_options(topic)).await;
428 ctx.table_metadata_manager
429 .topic_region_manager()
430 .batch_put(&[(
431 TopicRegionKey::new(region_id, topic),
432 Some(TopicRegionValue::new(Some(ReplayCheckpoint::new(10, None)))),
433 )])
434 .await
435 .unwrap();
436 ctx.table_metadata_manager
437 .topic_name_manager()
438 .batch_put(vec![TopicNameKey::new(topic)])
439 .await
440 .unwrap();
441 let prev = ctx
442 .table_metadata_manager
443 .topic_name_manager()
444 .get(topic)
445 .await
446 .unwrap();
447 ctx.table_metadata_manager
448 .topic_name_manager()
449 .update(topic, 20, prev)
450 .await
451 .unwrap();
452
453 let instruction = state
454 .build_upgrade_region_instruction(&mut ctx, Duration::from_secs(1))
455 .await
456 .unwrap();
457 let Instruction::UpgradeRegions(upgrade_regions) = instruction else {
458 unreachable!()
459 };
460
461 assert_eq!(upgrade_regions.len(), 1);
462 assert_eq!(upgrade_regions[0].replay_entry_id, Some(20));
463 assert_eq!(upgrade_regions[0].metadata_replay_entry_id, None);
464 }
465
466 #[tokio::test]
467 async fn test_build_upgrade_region_instruction_merges_metric_metadata_pruned_entry_id() {
468 let state = UpgradeCandidateRegion::default();
469 let persistent_context = new_persistent_context();
470 let env = TestingEnv::new();
471 let mut ctx = env.context_factory().new_context(persistent_context);
472 let region_id = ctx.persistent_ctx.region_ids[0];
473 let topic = "test_topic";
474 prepare_table_metadata_with_engine(&ctx, kafka_wal_options(topic), METRIC_ENGINE_NAME)
475 .await;
476 ctx.table_metadata_manager
477 .topic_region_manager()
478 .batch_put(&[(
479 TopicRegionKey::new(region_id, topic),
480 Some(TopicRegionValue::new(Some(ReplayCheckpoint::new(
481 10,
482 Some(5),
483 )))),
484 )])
485 .await
486 .unwrap();
487 ctx.table_metadata_manager
488 .topic_name_manager()
489 .batch_put(vec![TopicNameKey::new(topic)])
490 .await
491 .unwrap();
492 let prev = ctx
493 .table_metadata_manager
494 .topic_name_manager()
495 .get(topic)
496 .await
497 .unwrap();
498 ctx.table_metadata_manager
499 .topic_name_manager()
500 .update(topic, 20, prev)
501 .await
502 .unwrap();
503
504 let instruction = state
505 .build_upgrade_region_instruction(&mut ctx, Duration::from_secs(1))
506 .await
507 .unwrap();
508 let Instruction::UpgradeRegions(upgrade_regions) = instruction else {
509 unreachable!()
510 };
511
512 assert_eq!(upgrade_regions.len(), 1);
513 assert_eq!(upgrade_regions[0].replay_entry_id, Some(20));
514 assert_eq!(upgrade_regions[0].metadata_replay_entry_id, Some(20));
515 }
516
517 #[tokio::test]
518 async fn test_datanode_is_unreachable() {
519 let state = UpgradeCandidateRegion::default();
520 let persistent_context = new_persistent_context();
521 let env = TestingEnv::new();
522 let mut ctx = env.context_factory().new_context(persistent_context);
523 prepare_table_metadata(&ctx, HashMap::default()).await;
524 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
525
526 assert_matches!(err, Error::PusherNotFound { .. });
527 assert!(!err.is_retryable());
528 }
529
530 #[tokio::test]
531 async fn test_pusher_dropped() {
532 let state = UpgradeCandidateRegion::default();
533 let persistent_context = new_persistent_context();
534 let to_peer_id = persistent_context.to_peer.id;
535
536 let mut env = TestingEnv::new();
537 let mut ctx = env.context_factory().new_context(persistent_context);
538 prepare_table_metadata(&ctx, HashMap::default()).await;
539 let mailbox_ctx = env.mailbox_context();
540
541 let (tx, rx) = tokio::sync::mpsc::channel(1);
542
543 mailbox_ctx
544 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
545 .await;
546
547 drop(rx);
548
549 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
550
551 assert_matches!(err, Error::PushMessage { .. });
552 assert!(!err.is_retryable());
553 }
554
555 #[tokio::test]
556 async fn test_procedure_exceeded_deadline() {
557 let state = UpgradeCandidateRegion::default();
558 let persistent_context = new_persistent_context();
559 let env = TestingEnv::new();
560 let mut ctx = env.context_factory().new_context(persistent_context);
561 prepare_table_metadata(&ctx, HashMap::default()).await;
562 ctx.volatile_ctx.metrics.operations_elapsed =
563 ctx.persistent_ctx.timeout + Duration::from_secs(1);
564
565 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
566
567 assert_matches!(err, Error::ExceededDeadline { .. });
568 assert!(!err.is_retryable());
569 }
570
571 #[tokio::test]
572 async fn test_unexpected_instruction_reply() {
573 let state = UpgradeCandidateRegion::default();
574 let persistent_context = new_persistent_context();
575 let to_peer_id = persistent_context.to_peer.id;
576
577 let mut env = TestingEnv::new();
578 let mut ctx = env.context_factory().new_context(persistent_context);
579 prepare_table_metadata(&ctx, HashMap::default()).await;
580 let mailbox_ctx = env.mailbox_context();
581 let mailbox = mailbox_ctx.mailbox().clone();
582
583 let (tx, rx) = tokio::sync::mpsc::channel(1);
584
585 mailbox_ctx
586 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
587 .await;
588
589 send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
590
591 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
592 assert_matches!(err, Error::UnexpectedInstructionReply { .. });
593 assert!(!err.is_retryable());
594 }
595
596 #[tokio::test]
597 async fn test_upgrade_region_failed() {
598 let state = UpgradeCandidateRegion::default();
599 let persistent_context = new_persistent_context();
600 let to_peer_id = persistent_context.to_peer.id;
601
602 let mut env = TestingEnv::new();
603 let mut ctx = env.context_factory().new_context(persistent_context);
604 prepare_table_metadata(&ctx, HashMap::default()).await;
605 let mailbox_ctx = env.mailbox_context();
606 let mailbox = mailbox_ctx.mailbox().clone();
607
608 let (tx, rx) = tokio::sync::mpsc::channel(1);
609
610 mailbox_ctx
611 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
612 .await;
613
614 send_mock_reply(mailbox, rx, |id| {
616 Ok(new_upgrade_region_reply(
617 id,
618 true,
619 true,
620 Some("test mocked".to_string()),
621 ))
622 });
623
624 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
625
626 assert_matches!(err, Error::RetryLater { .. });
627 assert!(err.is_retryable());
628 assert!(format!("{err:?}").contains("test mocked"));
629 }
630
631 #[tokio::test]
632 async fn test_upgrade_region_not_found() {
633 let state = UpgradeCandidateRegion::default();
634 let persistent_context = new_persistent_context();
635 let to_peer_id = persistent_context.to_peer.id;
636
637 let mut env = TestingEnv::new();
638 let mut ctx = env.context_factory().new_context(persistent_context);
639 prepare_table_metadata(&ctx, HashMap::default()).await;
640 let mailbox_ctx = env.mailbox_context();
641 let mailbox = mailbox_ctx.mailbox().clone();
642
643 let (tx, rx) = tokio::sync::mpsc::channel(1);
644
645 mailbox_ctx
646 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
647 .await;
648
649 send_mock_reply(mailbox, rx, |id| {
650 Ok(new_upgrade_region_reply(id, true, false, None))
651 });
652
653 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
654
655 assert_matches!(err, Error::Unexpected { .. });
656 assert!(!err.is_retryable());
657 assert!(err.to_string().contains("doesn't exist"));
658 }
659
660 #[tokio::test]
661 async fn test_upgrade_region_require_ready() {
662 let mut state = UpgradeCandidateRegion {
663 require_ready: true,
664 ..Default::default()
665 };
666
667 let persistent_context = new_persistent_context();
668 let to_peer_id = persistent_context.to_peer.id;
669
670 let mut env = TestingEnv::new();
671 let mut ctx = env.context_factory().new_context(persistent_context);
672 prepare_table_metadata(&ctx, HashMap::default()).await;
673 let mailbox_ctx = env.mailbox_context();
674 let mailbox = mailbox_ctx.mailbox().clone();
675
676 let (tx, rx) = tokio::sync::mpsc::channel(1);
677
678 mailbox_ctx
679 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
680 .await;
681
682 send_mock_reply(mailbox, rx, |id| {
683 Ok(new_upgrade_region_reply(id, false, true, None))
684 });
685
686 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
687
688 assert_matches!(err, Error::RetryLater { .. });
689 assert!(err.is_retryable());
690 assert!(format!("{err:?}").contains("still replaying the wal"));
691
692 state.require_ready = false;
694
695 let mailbox = mailbox_ctx.mailbox().clone();
696 let (tx, rx) = tokio::sync::mpsc::channel(1);
697
698 mailbox_ctx
699 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
700 .await;
701
702 send_mock_reply(mailbox, rx, |id| {
703 Ok(new_upgrade_region_reply(id, false, true, None))
704 });
705
706 state.upgrade_region(&mut ctx).await.unwrap();
707 }
708
709 #[tokio::test]
710 async fn test_upgrade_region_with_retry_ok() {
711 let mut state = Box::<UpgradeCandidateRegion>::default();
712 state.retry_initial_interval = Duration::from_millis(100);
713 let persistent_context = new_persistent_context();
714 let to_peer_id = persistent_context.to_peer.id;
715
716 let mut env = TestingEnv::new();
717 let mut ctx = env.context_factory().new_context(persistent_context);
718 prepare_table_metadata(&ctx, HashMap::default()).await;
719 let mailbox_ctx = env.mailbox_context();
720 let mailbox = mailbox_ctx.mailbox().clone();
721
722 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
723
724 mailbox_ctx
725 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
726 .await;
727
728 common_runtime::spawn_global(async move {
729 let resp = rx.recv().await.unwrap().unwrap();
730 let reply_id = resp.mailbox_message.unwrap().id;
731 mailbox
732 .on_recv(
733 reply_id,
734 Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
735 )
736 .await
737 .unwrap();
738
739 let resp = rx.recv().await.unwrap().unwrap();
741 let reply_id = resp.mailbox_message.unwrap().id;
742 mailbox
743 .on_recv(
744 reply_id,
745 Ok(new_upgrade_region_reply(reply_id, false, true, None)),
746 )
747 .await
748 .unwrap();
749
750 let resp = rx.recv().await.unwrap().unwrap();
752 let reply_id = resp.mailbox_message.unwrap().id;
753 mailbox
754 .on_recv(
755 reply_id,
756 Ok(new_upgrade_region_reply(reply_id, true, true, None)),
757 )
758 .await
759 .unwrap();
760 });
761
762 let procedure_ctx = new_procedure_context();
763 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
764
765 let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
766
767 assert_matches!(update_metadata, UpdateMetadata::Upgrade);
768 }
769
770 #[tokio::test]
771 async fn test_upgrade_region_with_retry_failed() {
772 let mut state = Box::<UpgradeCandidateRegion>::default();
773 state.retry_initial_interval = Duration::from_millis(100);
774 let persistent_context = new_persistent_context();
775 let to_peer_id = persistent_context.to_peer.id;
776
777 let mut env = TestingEnv::new();
778 let mut ctx = env.context_factory().new_context(persistent_context);
779 prepare_table_metadata(&ctx, HashMap::default()).await;
780 let mailbox_ctx = env.mailbox_context();
781 let mailbox = mailbox_ctx.mailbox().clone();
782
783 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
784
785 mailbox_ctx
786 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
787 .await;
788
789 common_runtime::spawn_global(async move {
790 let resp = rx.recv().await.unwrap().unwrap();
791 let reply_id = resp.mailbox_message.unwrap().id;
792 mailbox
793 .on_recv(
794 reply_id,
795 Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
796 )
797 .await
798 .unwrap();
799
800 let resp = rx.recv().await.unwrap().unwrap();
802 let reply_id = resp.mailbox_message.unwrap().id;
803 mailbox
804 .on_recv(
805 reply_id,
806 Ok(new_upgrade_region_reply(reply_id, false, true, None)),
807 )
808 .await
809 .unwrap();
810
811 let resp = rx.recv().await.unwrap().unwrap();
813 let reply_id = resp.mailbox_message.unwrap().id;
814 mailbox
815 .on_recv(
816 reply_id,
817 Ok(new_upgrade_region_reply(reply_id, false, false, None)),
818 )
819 .await
820 .unwrap();
821 });
822 let procedure_ctx = new_procedure_context();
823 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
824
825 let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
826 assert_matches!(update_metadata, UpdateMetadata::Rollback);
827 }
828
829 #[tokio::test]
830 async fn test_upgrade_region_procedure_exceeded_deadline() {
831 let mut state = Box::<UpgradeCandidateRegion>::default();
832 state.retry_initial_interval = Duration::from_millis(100);
833 let persistent_context = new_persistent_context();
834 let to_peer_id = persistent_context.to_peer.id;
835
836 let mut env = TestingEnv::new();
837 let mut ctx = env.context_factory().new_context(persistent_context);
838 prepare_table_metadata(&ctx, HashMap::default()).await;
839 let mailbox_ctx = env.mailbox_context();
840 let mailbox = mailbox_ctx.mailbox().clone();
841 ctx.volatile_ctx.metrics.operations_elapsed =
842 ctx.persistent_ctx.timeout + Duration::from_secs(1);
843
844 let (tx, rx) = tokio::sync::mpsc::channel(1);
845 mailbox_ctx
846 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
847 .await;
848
849 send_mock_reply(mailbox, rx, |id| {
850 Ok(new_upgrade_region_reply(id, false, true, None))
851 });
852 let procedure_ctx = new_procedure_context();
853 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
854 let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
855 assert_matches!(update_metadata, UpdateMetadata::Rollback);
856 }
857}