1use std::any::Any;
16use std::time::Duration;
17
18use api::v1::meta::MailboxMessage;
19use common_error::ext::BoxedError;
20use common_meta::distributed_time_constants::REGION_LEASE_SECS;
21use common_meta::instruction::{
22 DowngradeRegion, DowngradeRegionReply, DowngradeRegionsReply, Instruction, InstructionReply,
23};
24use common_procedure::{Context as ProcedureContext, Status};
25use common_telemetry::{debug, error, info, warn};
26use common_time::util::current_time_millis;
27use serde::{Deserialize, Serialize};
28use snafu::{OptionExt, ResultExt};
29use tokio::time::{Instant, sleep};
30
31use crate::discovery::utils::find_datanode_lease_value;
32use crate::error::{self, Result};
33use crate::handler::HeartbeatMailbox;
34use crate::procedure::region_migration::update_metadata::UpdateMetadata;
35use crate::procedure::region_migration::upgrade_candidate_region::UpgradeCandidateRegion;
36use crate::procedure::region_migration::{Context, State};
37use crate::service::mailbox::Channel;
38
39#[derive(Debug, Serialize, Deserialize)]
40pub struct DowngradeLeaderRegion {
41 optimistic_retry: usize,
43 retry_initial_interval: Duration,
45}
46
47impl Default for DowngradeLeaderRegion {
48 fn default() -> Self {
49 Self {
50 optimistic_retry: 3,
51 retry_initial_interval: Duration::from_millis(500),
52 }
53 }
54}
55
56#[async_trait::async_trait]
57#[typetag::serde]
58impl State for DowngradeLeaderRegion {
59 async fn next(
60 &mut self,
61 ctx: &mut Context,
62 _procedure_ctx: &ProcedureContext,
63 ) -> Result<(Box<dyn State>, Status)> {
64 let now = Instant::now();
65 ctx.volatile_ctx
67 .set_leader_region_lease_deadline(Duration::from_secs(REGION_LEASE_SECS));
68
69 match self.downgrade_region_with_retry(ctx).await {
70 Ok(_) => {
71 info!(
73 "Downgraded region leader success, region: {:?}",
74 ctx.persistent_ctx.region_ids
75 );
76 }
77 Err(error::Error::ExceededDeadline { .. }) => {
78 info!(
79 "Downgrade region leader exceeded deadline, region: {:?}",
80 ctx.persistent_ctx.region_ids
81 );
82 return Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false)));
84 }
85 Err(err) => {
86 error!(err; "Occurs non-retryable error, region: {:?}", ctx.persistent_ctx.region_ids);
87 if let Some(deadline) = ctx.volatile_ctx.leader_region_lease_deadline.as_ref() {
88 info!(
89 "Running into the downgrade region leader slow path, region: {:?}, sleep until {:?}",
90 ctx.persistent_ctx.region_ids, deadline
91 );
92 tokio::time::sleep_until(*deadline).await;
93 } else {
94 warn!(
95 "Leader region lease deadline is not set, region: {:?}",
96 ctx.persistent_ctx.region_ids
97 );
98 }
99 }
100 }
101 ctx.update_downgrade_leader_region_elapsed(now);
102
103 Ok((
104 Box::new(UpgradeCandidateRegion::default()),
105 Status::executing(false),
106 ))
107 }
108
109 fn as_any(&self) -> &dyn Any {
110 self
111 }
112}
113
114impl DowngradeLeaderRegion {
115 fn build_downgrade_region_instruction(
117 &self,
118 ctx: &Context,
119 flush_timeout: Duration,
120 ) -> Instruction {
121 let region_ids = &ctx.persistent_ctx.region_ids;
122 let mut downgrade_regions = Vec::with_capacity(region_ids.len());
123 for region_id in region_ids {
124 downgrade_regions.push(DowngradeRegion {
125 region_id: *region_id,
126 flush_timeout: Some(flush_timeout),
127 });
128 }
129
130 Instruction::DowngradeRegions(downgrade_regions)
131 }
132
133 fn handle_downgrade_region_reply(
134 &self,
135 ctx: &mut Context,
136 reply: &DowngradeRegionReply,
137 now: &Instant,
138 ) -> Result<()> {
139 let leader = &ctx.persistent_ctx.from_peer;
140 let DowngradeRegionReply {
141 region_id,
142 last_entry_id,
143 metadata_last_entry_id,
144 exists,
145 error,
146 } = reply;
147
148 if error.is_some() {
149 return error::RetryLaterSnafu {
150 reason: format!(
151 "Failed to downgrade the region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
152 region_id, leader, error, now.elapsed()
153 ),
154 }
155 .fail();
156 }
157
158 if !exists {
159 warn!(
160 "Trying to downgrade the region {} on datanode {:?}, but region doesn't exist!, elapsed: {:?}",
161 region_id,
162 leader,
163 now.elapsed()
164 );
165 } else {
166 info!(
167 "Region {} leader is downgraded on datanode {:?}, last_entry_id: {:?}, metadata_last_entry_id: {:?}, elapsed: {:?}",
168 region_id,
169 leader,
170 last_entry_id,
171 metadata_last_entry_id,
172 now.elapsed()
173 );
174 }
175
176 if let Some(last_entry_id) = last_entry_id {
177 debug!(
178 "set last_entry_id: {:?}, region_id: {:?}",
179 last_entry_id, region_id
180 );
181 ctx.volatile_ctx
182 .set_last_entry_id(*region_id, *last_entry_id);
183 }
184
185 if let Some(metadata_last_entry_id) = metadata_last_entry_id {
186 ctx.volatile_ctx
187 .set_metadata_last_entry_id(*region_id, *metadata_last_entry_id);
188 }
189
190 Ok(())
191 }
192
193 async fn downgrade_region(&self, ctx: &mut Context) -> Result<()> {
207 let region_ids = &ctx.persistent_ctx.region_ids;
208 let operation_timeout =
209 ctx.next_operation_timeout()
210 .context(error::ExceededDeadlineSnafu {
211 operation: "Downgrade region",
212 })?;
213 let downgrade_instruction = self.build_downgrade_region_instruction(ctx, operation_timeout);
214
215 let leader = &ctx.persistent_ctx.from_peer;
216 let msg = MailboxMessage::json_message(
217 &format!("Downgrade leader regions: {:?}", region_ids),
218 &format!("Metasrv@{}", ctx.server_addr()),
219 &format!("Datanode-{}@{}", leader.id, leader.addr),
220 common_time::util::current_time_millis(),
221 &downgrade_instruction,
222 )
223 .with_context(|_| error::SerializeToJsonSnafu {
224 input: downgrade_instruction.to_string(),
225 })?;
226
227 let ch = Channel::Datanode(leader.id);
228 let now = Instant::now();
229 let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
230
231 match receiver.await {
232 Ok(msg) => {
233 let reply = HeartbeatMailbox::json_reply(&msg)?;
234 info!(
235 "Received downgrade region reply: {:?}, region: {:?}, elapsed: {:?}",
236 reply,
237 region_ids,
238 now.elapsed()
239 );
240 let InstructionReply::DowngradeRegions(DowngradeRegionsReply { replies }) = reply
241 else {
242 return error::UnexpectedInstructionReplySnafu {
243 mailbox_message: msg.to_string(),
244 reason: "expect downgrade region reply",
245 }
246 .fail();
247 };
248
249 for reply in replies {
250 self.handle_downgrade_region_reply(ctx, &reply, &now)?;
251 }
252 Ok(())
253 }
254 Err(error::Error::MailboxTimeout { .. }) => {
255 let reason = format!(
256 "Mailbox received timeout for downgrade leader region {region_ids:?} on datanode {:?}, elapsed: {:?}",
257 leader,
258 now.elapsed()
259 );
260 error::RetryLaterSnafu { reason }.fail()
261 }
262 Err(err) => Err(err),
263 }
264 }
265
266 async fn update_leader_region_lease_deadline(&self, ctx: &mut Context) {
267 let leader = &ctx.persistent_ctx.from_peer;
268
269 let last_connection_at = match find_datanode_lease_value(&ctx.in_memory, leader.id).await {
270 Ok(lease_value) => lease_value.map(|lease_value| lease_value.timestamp_millis),
271 Err(err) => {
272 error!(err; "Failed to find datanode lease value for datanode: {}, during region migration, region: {:?}", leader, ctx.persistent_ctx.region_ids);
273 return;
274 }
275 };
276
277 if let Some(last_connection_at) = last_connection_at {
278 let now = current_time_millis();
279 let elapsed = now - last_connection_at;
280 let region_lease = Duration::from_secs(REGION_LEASE_SECS);
281
282 if elapsed >= (REGION_LEASE_SECS * 1000) as i64 {
288 ctx.volatile_ctx.reset_leader_region_lease_deadline();
289 info!(
290 "Datanode {}({}) has been disconnected for longer than the region lease period ({:?}), reset leader region lease deadline to None, region: {:?}",
291 leader, last_connection_at, region_lease, ctx.persistent_ctx.region_ids
292 );
293 } else if elapsed > 0 {
294 let lease_timeout =
296 region_lease - Duration::from_millis((now - last_connection_at) as u64);
297 ctx.volatile_ctx.reset_leader_region_lease_deadline();
298 ctx.volatile_ctx
299 .set_leader_region_lease_deadline(lease_timeout);
300 info!(
301 "Datanode {}({}) last connected {:?} ago, updated leader region lease deadline to {:?}, region: {:?}",
302 leader,
303 last_connection_at,
304 elapsed,
305 ctx.volatile_ctx.leader_region_lease_deadline,
306 ctx.persistent_ctx.region_ids
307 );
308 } else {
309 warn!(
310 "Datanode {} has invalid last connection timestamp: {} (which is after current time: {}), region: {:?}",
311 leader, last_connection_at, now, ctx.persistent_ctx.region_ids
312 )
313 }
314 } else {
315 warn!(
316 "Failed to find last connection time for datanode {}, unable to update region lease deadline, region: {:?}",
317 leader, ctx.persistent_ctx.region_ids
318 )
319 }
320 }
321
322 async fn downgrade_region_with_retry(&self, ctx: &mut Context) -> Result<()> {
333 let mut retry = 0;
334
335 loop {
336 let timer = Instant::now();
337 if let Err(err) = self.downgrade_region(ctx).await {
338 ctx.update_operations_elapsed(timer);
339 retry += 1;
340 if matches!(err, error::Error::ExceededDeadline { .. }) {
342 error!(err; "Failed to downgrade region leader, regions: {:?}, exceeded deadline", ctx.persistent_ctx.region_ids);
343 return Err(err);
344 } else if matches!(err, error::Error::PusherNotFound { .. }) {
345 error!(err; "Failed to downgrade region leader, regions: {:?}, datanode({}) is unreachable(PusherNotFound)", ctx.persistent_ctx.region_ids, ctx.persistent_ctx.from_peer.id);
347 self.update_leader_region_lease_deadline(ctx).await;
348 return Err(err);
349 } else if err.is_retryable() && retry < self.optimistic_retry {
350 error!(err; "Failed to downgrade region leader, regions: {:?}, retry later", ctx.persistent_ctx.region_ids);
351 sleep(self.retry_initial_interval).await;
352 } else {
353 return Err(BoxedError::new(err)).context(error::DowngradeLeaderSnafu {
354 region_id: ctx.persistent_ctx.region_ids[0],
356 })?;
357 }
358 } else {
359 ctx.update_operations_elapsed(timer);
360 ctx.volatile_ctx.reset_leader_region_lease_deadline();
362 break;
363 }
364 }
365
366 Ok(())
367 }
368}
369
370#[cfg(test)]
371mod tests {
372 use std::assert_matches::assert_matches;
373 use std::collections::HashMap;
374
375 use common_meta::key::table_route::TableRouteValue;
376 use common_meta::key::test_utils::new_test_table_info;
377 use common_meta::peer::Peer;
378 use common_meta::rpc::router::{Region, RegionRoute};
379 use store_api::storage::RegionId;
380 use tokio::time::Instant;
381
382 use super::*;
383 use crate::error::Error;
384 use crate::procedure::region_migration::manager::RegionMigrationTriggerReason;
385 use crate::procedure::region_migration::test_util::{TestingEnv, new_procedure_context};
386 use crate::procedure::region_migration::{ContextFactory, PersistentContext};
387 use crate::procedure::test_util::{
388 new_close_region_reply, new_downgrade_region_reply, send_mock_reply,
389 };
390
391 fn new_persistent_context() -> PersistentContext {
392 PersistentContext::new(
393 vec![("greptime".into(), "public".into())],
394 Peer::empty(1),
395 Peer::empty(2),
396 vec![RegionId::new(1024, 1)],
397 Duration::from_millis(1000),
398 RegionMigrationTriggerReason::Manual,
399 )
400 }
401
402 async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap<u32, String>) {
403 let region_id = ctx.persistent_ctx.region_ids[0];
404 let table_info = new_test_table_info(region_id.table_id(), vec![1]).into();
405 let region_routes = vec![RegionRoute {
406 region: Region::new_test(region_id),
407 leader_peer: Some(ctx.persistent_ctx.from_peer.clone()),
408 follower_peers: vec![ctx.persistent_ctx.to_peer.clone()],
409 ..Default::default()
410 }];
411 ctx.table_metadata_manager
412 .create_table_metadata(
413 table_info,
414 TableRouteValue::physical(region_routes),
415 wal_options,
416 )
417 .await
418 .unwrap();
419 }
420
421 #[tokio::test]
422 async fn test_datanode_is_unreachable() {
423 let state = DowngradeLeaderRegion::default();
424 let persistent_context = new_persistent_context();
425 let env = TestingEnv::new();
426 let mut ctx = env.context_factory().new_context(persistent_context);
427 prepare_table_metadata(&ctx, HashMap::default()).await;
428 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
429
430 assert_matches!(err, Error::PusherNotFound { .. });
431 assert!(!err.is_retryable());
432 }
433
434 #[tokio::test]
435 async fn test_pusher_dropped() {
436 let state = DowngradeLeaderRegion::default();
437 let persistent_context = new_persistent_context();
438 let from_peer_id = persistent_context.from_peer.id;
439
440 let mut env = TestingEnv::new();
441 let mut ctx = env.context_factory().new_context(persistent_context);
442 prepare_table_metadata(&ctx, HashMap::default()).await;
443 let mailbox_ctx = env.mailbox_context();
444
445 let (tx, rx) = tokio::sync::mpsc::channel(1);
446
447 mailbox_ctx
448 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
449 .await;
450
451 drop(rx);
452
453 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
454
455 assert_matches!(err, Error::PushMessage { .. });
456 assert!(!err.is_retryable());
457 }
458
459 #[tokio::test]
460 async fn test_procedure_exceeded_deadline() {
461 let state = DowngradeLeaderRegion::default();
462 let persistent_context = new_persistent_context();
463 let env = TestingEnv::new();
464 let mut ctx = env.context_factory().new_context(persistent_context);
465 prepare_table_metadata(&ctx, HashMap::default()).await;
466 ctx.volatile_ctx.metrics.operations_elapsed =
467 ctx.persistent_ctx.timeout + Duration::from_secs(1);
468
469 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
470
471 assert_matches!(err, Error::ExceededDeadline { .. });
472 assert!(!err.is_retryable());
473
474 let err = state
475 .downgrade_region_with_retry(&mut ctx)
476 .await
477 .unwrap_err();
478 assert_matches!(err, Error::ExceededDeadline { .. });
479 assert!(!err.is_retryable());
480 }
481
482 #[tokio::test]
483 async fn test_unexpected_instruction_reply() {
484 let state = DowngradeLeaderRegion::default();
485 let persistent_context = new_persistent_context();
486 let from_peer_id = persistent_context.from_peer.id;
487
488 let mut env = TestingEnv::new();
489 let mut ctx = env.context_factory().new_context(persistent_context);
490 prepare_table_metadata(&ctx, HashMap::default()).await;
491 let mailbox_ctx = env.mailbox_context();
492 let mailbox = mailbox_ctx.mailbox().clone();
493
494 let (tx, rx) = tokio::sync::mpsc::channel(1);
495
496 mailbox_ctx
497 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
498 .await;
499
500 send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
502
503 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
504
505 assert_matches!(err, Error::UnexpectedInstructionReply { .. });
506 assert!(!err.is_retryable());
507 }
508
509 #[tokio::test]
510 async fn test_instruction_exceeded_deadline() {
511 let state = DowngradeLeaderRegion::default();
512 let persistent_context = new_persistent_context();
513 let from_peer_id = persistent_context.from_peer.id;
514
515 let mut env = TestingEnv::new();
516 let mut ctx = env.context_factory().new_context(persistent_context);
517 prepare_table_metadata(&ctx, HashMap::default()).await;
518 let mailbox_ctx = env.mailbox_context();
519 let mailbox = mailbox_ctx.mailbox().clone();
520
521 let (tx, rx) = tokio::sync::mpsc::channel(1);
522
523 mailbox_ctx
524 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
525 .await;
526
527 send_mock_reply(mailbox, rx, |id| {
528 Err(error::MailboxTimeoutSnafu { id }.build())
529 });
530
531 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
532
533 assert_matches!(err, Error::RetryLater { .. });
534 assert!(err.is_retryable());
535 }
536
537 #[tokio::test]
538 async fn test_downgrade_region_failed() {
539 let state = DowngradeLeaderRegion::default();
540 let persistent_context = new_persistent_context();
541 let from_peer_id = persistent_context.from_peer.id;
542
543 let mut env = TestingEnv::new();
544 let mut ctx = env.context_factory().new_context(persistent_context);
545 prepare_table_metadata(&ctx, HashMap::default()).await;
546 let mailbox_ctx = env.mailbox_context();
547 let mailbox = mailbox_ctx.mailbox().clone();
548
549 let (tx, rx) = tokio::sync::mpsc::channel(1);
550
551 mailbox_ctx
552 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
553 .await;
554
555 send_mock_reply(mailbox, rx, |id| {
556 Ok(new_downgrade_region_reply(
557 id,
558 None,
559 false,
560 Some("test mocked".to_string()),
561 ))
562 });
563
564 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
565
566 assert_matches!(err, Error::RetryLater { .. });
567 assert!(err.is_retryable());
568 assert!(format!("{err:?}").contains("test mocked"), "err: {err:?}",);
569 }
570
571 #[tokio::test]
572 async fn test_downgrade_region_with_retry_fast_path() {
573 let state = DowngradeLeaderRegion::default();
574 let persistent_context = new_persistent_context();
575 let from_peer_id = persistent_context.from_peer.id;
576
577 let mut env = TestingEnv::new();
578 let mut ctx = env.context_factory().new_context(persistent_context);
579 prepare_table_metadata(&ctx, HashMap::default()).await;
580 let mailbox_ctx = env.mailbox_context();
581 let mailbox = mailbox_ctx.mailbox().clone();
582
583 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
584
585 mailbox_ctx
586 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
587 .await;
588
589 common_runtime::spawn_global(async move {
590 let resp = rx.recv().await.unwrap().unwrap();
592 let reply_id = resp.mailbox_message.unwrap().id;
593 mailbox
594 .on_recv(
595 reply_id,
596 Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
597 )
598 .await
599 .unwrap();
600
601 let resp = rx.recv().await.unwrap().unwrap();
603 let reply_id = resp.mailbox_message.unwrap().id;
604 mailbox
605 .on_recv(
606 reply_id,
607 Ok(new_downgrade_region_reply(reply_id, Some(1), true, None)),
608 )
609 .await
610 .unwrap();
611 });
612
613 state.downgrade_region_with_retry(&mut ctx).await.unwrap();
614 assert_eq!(
615 ctx.volatile_ctx
616 .leader_region_last_entry_ids
617 .get(&RegionId::new(0, 0))
618 .cloned(),
619 Some(1)
620 );
621 assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none());
622 }
623
624 #[tokio::test]
625 async fn test_downgrade_region_with_retry_slow_path() {
626 let state = DowngradeLeaderRegion {
627 optimistic_retry: 3,
628 retry_initial_interval: Duration::from_millis(100),
629 };
630 let persistent_context = new_persistent_context();
631 let from_peer_id = persistent_context.from_peer.id;
632
633 let mut env = TestingEnv::new();
634 let mut ctx = env.context_factory().new_context(persistent_context);
635 let mailbox_ctx = env.mailbox_context();
636 let mailbox = mailbox_ctx.mailbox().clone();
637
638 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
639
640 mailbox_ctx
641 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
642 .await;
643
644 common_runtime::spawn_global(async move {
645 for _ in 0..3 {
646 let resp = rx.recv().await.unwrap().unwrap();
647 let reply_id = resp.mailbox_message.unwrap().id;
648 mailbox
649 .on_recv(
650 reply_id,
651 Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
652 )
653 .await
654 .unwrap();
655 }
656 });
657
658 ctx.volatile_ctx
659 .set_leader_region_lease_deadline(Duration::from_secs(5));
660 let expected_deadline = ctx.volatile_ctx.leader_region_lease_deadline.unwrap();
661 let err = state
662 .downgrade_region_with_retry(&mut ctx)
663 .await
664 .unwrap_err();
665 assert_matches!(err, error::Error::DowngradeLeader { .. });
666 assert_eq!(
669 ctx.volatile_ctx.leader_region_lease_deadline.unwrap(),
670 expected_deadline
671 )
672 }
673
674 #[tokio::test]
675 async fn test_next_upgrade_candidate_state() {
676 let mut state = Box::<DowngradeLeaderRegion>::default();
677 let persistent_context = new_persistent_context();
678 let from_peer_id = persistent_context.from_peer.id;
679
680 let mut env = TestingEnv::new();
681 let mut ctx = env.context_factory().new_context(persistent_context);
682 prepare_table_metadata(&ctx, HashMap::default()).await;
683 let mailbox_ctx = env.mailbox_context();
684 let mailbox = mailbox_ctx.mailbox().clone();
685
686 let (tx, rx) = tokio::sync::mpsc::channel(1);
687
688 mailbox_ctx
689 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
690 .await;
691
692 send_mock_reply(mailbox, rx, |id| {
693 Ok(new_downgrade_region_reply(id, Some(1), true, None))
694 });
695
696 let timer = Instant::now();
697 let procedure_ctx = new_procedure_context();
698 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
699 let elapsed = timer.elapsed().as_secs();
700 assert!(elapsed < REGION_LEASE_SECS / 2);
701 assert_eq!(
702 ctx.volatile_ctx
703 .leader_region_last_entry_ids
704 .get(&RegionId::new(0, 0))
705 .cloned(),
706 Some(1)
707 );
708 assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none());
709
710 let _ = next
711 .as_any()
712 .downcast_ref::<UpgradeCandidateRegion>()
713 .unwrap();
714 }
715
716 #[tokio::test]
717 async fn test_downgrade_region_procedure_exceeded_deadline() {
718 let mut state = Box::<UpgradeCandidateRegion>::default();
719 state.retry_initial_interval = Duration::from_millis(100);
720 let persistent_context = new_persistent_context();
721 let to_peer_id = persistent_context.to_peer.id;
722
723 let mut env = TestingEnv::new();
724 let mut ctx = env.context_factory().new_context(persistent_context);
725 let mailbox_ctx = env.mailbox_context();
726 let mailbox = mailbox_ctx.mailbox().clone();
727 ctx.volatile_ctx.metrics.operations_elapsed =
728 ctx.persistent_ctx.timeout + Duration::from_secs(1);
729
730 let (tx, rx) = tokio::sync::mpsc::channel(1);
731 mailbox_ctx
732 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
733 .await;
734
735 send_mock_reply(mailbox, rx, |id| {
736 Ok(new_downgrade_region_reply(id, None, true, None))
737 });
738 let procedure_ctx = new_procedure_context();
739 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
740 let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
741 assert_matches!(update_metadata, UpdateMetadata::Rollback);
742 }
743}