1use std::any::Any;
16use std::time::Duration;
17
18use api::v1::meta::MailboxMessage;
19use common_error::ext::BoxedError;
20use common_meta::distributed_time_constants::REGION_LEASE_SECS;
21use common_meta::instruction::{
22 DowngradeRegion, DowngradeRegionReply, DowngradeRegionsReply, Instruction, InstructionReply,
23};
24use common_procedure::{Context as ProcedureContext, Status};
25use common_telemetry::{debug, error, info, warn};
26use common_time::util::current_time_millis;
27use serde::{Deserialize, Serialize};
28use snafu::{OptionExt, ResultExt};
29use tokio::time::{Instant, sleep};
30
31use crate::discovery::utils::find_datanode_lease_value;
32use crate::error::{self, Result};
33use crate::handler::HeartbeatMailbox;
34use crate::procedure::region_migration::update_metadata::UpdateMetadata;
35use crate::procedure::region_migration::upgrade_candidate_region::UpgradeCandidateRegion;
36use crate::procedure::region_migration::{Context, State};
37use crate::service::mailbox::Channel;
38
39#[derive(Debug, Serialize, Deserialize)]
40pub struct DowngradeLeaderRegion {
41 optimistic_retry: usize,
43 retry_initial_interval: Duration,
45}
46
47impl Default for DowngradeLeaderRegion {
48 fn default() -> Self {
49 Self {
50 optimistic_retry: 3,
51 retry_initial_interval: Duration::from_millis(500),
52 }
53 }
54}
55
56#[async_trait::async_trait]
57#[typetag::serde]
58impl State for DowngradeLeaderRegion {
59 async fn next(
60 &mut self,
61 ctx: &mut Context,
62 _procedure_ctx: &ProcedureContext,
63 ) -> Result<(Box<dyn State>, Status)> {
64 let now = Instant::now();
65 ctx.volatile_ctx
67 .set_leader_region_lease_deadline(Duration::from_secs(REGION_LEASE_SECS));
68
69 match self.downgrade_region_with_retry(ctx).await {
70 Ok(_) => {
71 info!(
73 "Downgraded region leader success, region: {:?}",
74 ctx.persistent_ctx.region_ids
75 );
76 }
77 Err(error::Error::ExceededDeadline { .. }) => {
78 info!(
79 "Downgrade region leader exceeded deadline, region: {:?}",
80 ctx.persistent_ctx.region_ids
81 );
82 return Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false)));
84 }
85 Err(err) => {
86 error!(err; "Occurs non-retryable error, region: {:?}", ctx.persistent_ctx.region_ids);
87 if let Some(deadline) = ctx.volatile_ctx.leader_region_lease_deadline.as_ref() {
88 info!(
89 "Running into the downgrade region leader slow path, region: {:?}, sleep until {:?}",
90 ctx.persistent_ctx.region_ids, deadline
91 );
92 tokio::time::sleep_until(*deadline).await;
93 } else {
94 warn!(
95 "Leader region lease deadline is not set, region: {:?}",
96 ctx.persistent_ctx.region_ids
97 );
98 }
99 }
100 }
101 ctx.update_downgrade_leader_region_elapsed(now);
102
103 Ok((
104 Box::new(UpgradeCandidateRegion::default()),
105 Status::executing(false),
106 ))
107 }
108
109 fn as_any(&self) -> &dyn Any {
110 self
111 }
112}
113
114impl DowngradeLeaderRegion {
115 fn build_downgrade_region_instruction(
117 &self,
118 ctx: &Context,
119 flush_timeout: Duration,
120 ) -> Instruction {
121 let region_ids = &ctx.persistent_ctx.region_ids;
122 let mut downgrade_regions = Vec::with_capacity(region_ids.len());
123 for region_id in region_ids {
124 downgrade_regions.push(DowngradeRegion {
125 region_id: *region_id,
126 flush_timeout: Some(flush_timeout),
127 });
128 }
129
130 Instruction::DowngradeRegions(downgrade_regions)
131 }
132
133 fn handle_downgrade_region_reply(
134 &self,
135 ctx: &mut Context,
136 reply: &DowngradeRegionReply,
137 now: &Instant,
138 ) -> Result<()> {
139 let leader = &ctx.persistent_ctx.from_peer;
140 let DowngradeRegionReply {
141 region_id,
142 last_entry_id,
143 metadata_last_entry_id,
144 exists,
145 error,
146 } = reply;
147
148 if error.is_some() {
149 return error::RetryLaterSnafu {
150 reason: format!(
151 "Failed to downgrade the region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
152 region_id, leader, error, now.elapsed()
153 ),
154 }
155 .fail();
156 }
157
158 if !exists {
159 warn!(
160 "Trying to downgrade the region {} on datanode {:?}, but region doesn't exist!, elapsed: {:?}",
161 region_id,
162 leader,
163 now.elapsed()
164 );
165 } else {
166 info!(
167 "Region {} leader is downgraded on datanode {:?}, last_entry_id: {:?}, metadata_last_entry_id: {:?}, elapsed: {:?}",
168 region_id,
169 leader,
170 last_entry_id,
171 metadata_last_entry_id,
172 now.elapsed()
173 );
174 }
175
176 if let Some(last_entry_id) = last_entry_id {
177 debug!(
178 "set last_entry_id: {:?}, region_id: {:?}",
179 last_entry_id, region_id
180 );
181 ctx.volatile_ctx
182 .set_last_entry_id(*region_id, *last_entry_id);
183 }
184
185 if let Some(metadata_last_entry_id) = metadata_last_entry_id {
186 ctx.volatile_ctx
187 .set_metadata_last_entry_id(*region_id, *metadata_last_entry_id);
188 }
189
190 Ok(())
191 }
192
193 async fn downgrade_region(&self, ctx: &mut Context) -> Result<()> {
207 let region_ids = &ctx.persistent_ctx.region_ids;
208 let operation_timeout =
209 ctx.next_operation_timeout()
210 .context(error::ExceededDeadlineSnafu {
211 operation: "Downgrade region",
212 })?;
213 let downgrade_instruction = self.build_downgrade_region_instruction(ctx, operation_timeout);
214
215 let leader = &ctx.persistent_ctx.from_peer;
216 let msg = MailboxMessage::json_message(
217 &format!("Downgrade leader regions: {:?}", region_ids),
218 &format!("Metasrv@{}", ctx.server_addr()),
219 &format!("Datanode-{}@{}", leader.id, leader.addr),
220 common_time::util::current_time_millis(),
221 &downgrade_instruction,
222 )
223 .with_context(|_| error::SerializeToJsonSnafu {
224 input: downgrade_instruction.to_string(),
225 })?;
226
227 let ch = Channel::Datanode(leader.id);
228 let now = Instant::now();
229 let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
230
231 match receiver.await {
232 Ok(msg) => {
233 let reply = HeartbeatMailbox::json_reply(&msg)?;
234 info!(
235 "Received downgrade region reply: {:?}, region: {:?}, elapsed: {:?}",
236 reply,
237 region_ids,
238 now.elapsed()
239 );
240 let InstructionReply::DowngradeRegions(DowngradeRegionsReply { replies }) = reply
241 else {
242 return error::UnexpectedInstructionReplySnafu {
243 mailbox_message: msg.to_string(),
244 reason: "expect downgrade region reply",
245 }
246 .fail();
247 };
248
249 for reply in replies {
250 self.handle_downgrade_region_reply(ctx, &reply, &now)?;
251 }
252 Ok(())
253 }
254 Err(error::Error::MailboxTimeout { .. }) => {
255 let reason = format!(
256 "Mailbox received timeout for downgrade leader region {region_ids:?} on datanode {:?}, elapsed: {:?}",
257 leader,
258 now.elapsed()
259 );
260 error::RetryLaterSnafu { reason }.fail()
261 }
262 Err(err) => Err(err),
263 }
264 }
265
266 async fn update_leader_region_lease_deadline(&self, ctx: &mut Context) {
267 let leader = &ctx.persistent_ctx.from_peer;
268
269 let last_connection_at = match find_datanode_lease_value(&ctx.in_memory, leader.id).await {
270 Ok(lease_value) => lease_value.map(|lease_value| lease_value.timestamp_millis),
271 Err(err) => {
272 error!(err; "Failed to find datanode lease value for datanode: {}, during region migration, region: {:?}", leader, ctx.persistent_ctx.region_ids);
273 return;
274 }
275 };
276
277 if let Some(last_connection_at) = last_connection_at {
278 let now = current_time_millis();
279 let elapsed = now - last_connection_at;
280 let region_lease = Duration::from_secs(REGION_LEASE_SECS);
281
282 if elapsed >= (REGION_LEASE_SECS * 1000) as i64 {
288 ctx.volatile_ctx.reset_leader_region_lease_deadline();
289 info!(
290 "Datanode {}({}) has been disconnected for longer than the region lease period ({:?}), reset leader region lease deadline to None, region: {:?}",
291 leader, last_connection_at, region_lease, ctx.persistent_ctx.region_ids
292 );
293 } else if elapsed > 0 {
294 let lease_timeout =
296 region_lease - Duration::from_millis((now - last_connection_at) as u64);
297 ctx.volatile_ctx.reset_leader_region_lease_deadline();
298 ctx.volatile_ctx
299 .set_leader_region_lease_deadline(lease_timeout);
300 info!(
301 "Datanode {}({}) last connected {:?} ago, updated leader region lease deadline to {:?}, region: {:?}",
302 leader,
303 last_connection_at,
304 elapsed,
305 ctx.volatile_ctx.leader_region_lease_deadline,
306 ctx.persistent_ctx.region_ids
307 );
308 } else {
309 warn!(
310 "Datanode {} has invalid last connection timestamp: {} (which is after current time: {}), region: {:?}",
311 leader, last_connection_at, now, ctx.persistent_ctx.region_ids
312 )
313 }
314 } else {
315 warn!(
316 "Failed to find last connection time for datanode {}, unable to update region lease deadline, region: {:?}",
317 leader, ctx.persistent_ctx.region_ids
318 )
319 }
320 }
321
322 async fn downgrade_region_with_retry(&self, ctx: &mut Context) -> Result<()> {
333 let mut retry = 0;
334
335 loop {
336 let timer = Instant::now();
337 if let Err(err) = self.downgrade_region(ctx).await {
338 ctx.update_operations_elapsed(timer);
339 retry += 1;
340 if matches!(err, error::Error::ExceededDeadline { .. }) {
342 error!(err; "Failed to downgrade region leader, regions: {:?}, exceeded deadline", ctx.persistent_ctx.region_ids);
343 return Err(err);
344 } else if matches!(err, error::Error::PusherNotFound { .. }) {
345 error!(err; "Failed to downgrade region leader, regions: {:?}, datanode({}) is unreachable(PusherNotFound)", ctx.persistent_ctx.region_ids, ctx.persistent_ctx.from_peer.id);
347 self.update_leader_region_lease_deadline(ctx).await;
348 return Err(err);
349 } else if err.is_retryable() && retry < self.optimistic_retry {
350 error!(err; "Failed to downgrade region leader, regions: {:?}, retry later", ctx.persistent_ctx.region_ids);
351 sleep(self.retry_initial_interval).await;
352 } else {
353 return Err(BoxedError::new(err)).context(error::DowngradeLeaderSnafu {
354 region_id: ctx.persistent_ctx.region_ids[0],
356 })?;
357 }
358 } else {
359 ctx.update_operations_elapsed(timer);
360 ctx.volatile_ctx.reset_leader_region_lease_deadline();
362 break;
363 }
364 }
365
366 Ok(())
367 }
368}
369
370#[cfg(test)]
371mod tests {
372 use std::assert_matches::assert_matches;
373 use std::collections::HashMap;
374
375 use common_meta::key::table_route::TableRouteValue;
376 use common_meta::key::test_utils::new_test_table_info;
377 use common_meta::peer::Peer;
378 use common_meta::rpc::router::{Region, RegionRoute};
379 use store_api::storage::RegionId;
380 use tokio::time::Instant;
381
382 use super::*;
383 use crate::error::Error;
384 use crate::procedure::region_migration::manager::RegionMigrationTriggerReason;
385 use crate::procedure::region_migration::test_util::{TestingEnv, new_procedure_context};
386 use crate::procedure::region_migration::{ContextFactory, PersistentContext};
387 use crate::procedure::test_util::{
388 new_close_region_reply, new_downgrade_region_reply, send_mock_reply,
389 };
390
391 fn new_persistent_context() -> PersistentContext {
392 PersistentContext {
393 catalog: "greptime".into(),
394 schema: "public".into(),
395 from_peer: Peer::empty(1),
396 to_peer: Peer::empty(2),
397 region_ids: vec![RegionId::new(1024, 1)],
398 timeout: Duration::from_millis(1000),
399 trigger_reason: RegionMigrationTriggerReason::Manual,
400 }
401 }
402
403 async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap<u32, String>) {
404 let region_id = ctx.persistent_ctx.region_ids[0];
405 let table_info = new_test_table_info(region_id.table_id(), vec![1]).into();
406 let region_routes = vec![RegionRoute {
407 region: Region::new_test(region_id),
408 leader_peer: Some(ctx.persistent_ctx.from_peer.clone()),
409 follower_peers: vec![ctx.persistent_ctx.to_peer.clone()],
410 ..Default::default()
411 }];
412 ctx.table_metadata_manager
413 .create_table_metadata(
414 table_info,
415 TableRouteValue::physical(region_routes),
416 wal_options,
417 )
418 .await
419 .unwrap();
420 }
421
422 #[tokio::test]
423 async fn test_datanode_is_unreachable() {
424 let state = DowngradeLeaderRegion::default();
425 let persistent_context = new_persistent_context();
426 let env = TestingEnv::new();
427 let mut ctx = env.context_factory().new_context(persistent_context);
428 prepare_table_metadata(&ctx, HashMap::default()).await;
429 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
430
431 assert_matches!(err, Error::PusherNotFound { .. });
432 assert!(!err.is_retryable());
433 }
434
435 #[tokio::test]
436 async fn test_pusher_dropped() {
437 let state = DowngradeLeaderRegion::default();
438 let persistent_context = new_persistent_context();
439 let from_peer_id = persistent_context.from_peer.id;
440
441 let mut env = TestingEnv::new();
442 let mut ctx = env.context_factory().new_context(persistent_context);
443 prepare_table_metadata(&ctx, HashMap::default()).await;
444 let mailbox_ctx = env.mailbox_context();
445
446 let (tx, rx) = tokio::sync::mpsc::channel(1);
447
448 mailbox_ctx
449 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
450 .await;
451
452 drop(rx);
453
454 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
455
456 assert_matches!(err, Error::PushMessage { .. });
457 assert!(!err.is_retryable());
458 }
459
460 #[tokio::test]
461 async fn test_procedure_exceeded_deadline() {
462 let state = DowngradeLeaderRegion::default();
463 let persistent_context = new_persistent_context();
464 let env = TestingEnv::new();
465 let mut ctx = env.context_factory().new_context(persistent_context);
466 prepare_table_metadata(&ctx, HashMap::default()).await;
467 ctx.volatile_ctx.metrics.operations_elapsed =
468 ctx.persistent_ctx.timeout + Duration::from_secs(1);
469
470 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
471
472 assert_matches!(err, Error::ExceededDeadline { .. });
473 assert!(!err.is_retryable());
474
475 let err = state
476 .downgrade_region_with_retry(&mut ctx)
477 .await
478 .unwrap_err();
479 assert_matches!(err, Error::ExceededDeadline { .. });
480 assert!(!err.is_retryable());
481 }
482
483 #[tokio::test]
484 async fn test_unexpected_instruction_reply() {
485 let state = DowngradeLeaderRegion::default();
486 let persistent_context = new_persistent_context();
487 let from_peer_id = persistent_context.from_peer.id;
488
489 let mut env = TestingEnv::new();
490 let mut ctx = env.context_factory().new_context(persistent_context);
491 prepare_table_metadata(&ctx, HashMap::default()).await;
492 let mailbox_ctx = env.mailbox_context();
493 let mailbox = mailbox_ctx.mailbox().clone();
494
495 let (tx, rx) = tokio::sync::mpsc::channel(1);
496
497 mailbox_ctx
498 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
499 .await;
500
501 send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
503
504 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
505
506 assert_matches!(err, Error::UnexpectedInstructionReply { .. });
507 assert!(!err.is_retryable());
508 }
509
510 #[tokio::test]
511 async fn test_instruction_exceeded_deadline() {
512 let state = DowngradeLeaderRegion::default();
513 let persistent_context = new_persistent_context();
514 let from_peer_id = persistent_context.from_peer.id;
515
516 let mut env = TestingEnv::new();
517 let mut ctx = env.context_factory().new_context(persistent_context);
518 prepare_table_metadata(&ctx, HashMap::default()).await;
519 let mailbox_ctx = env.mailbox_context();
520 let mailbox = mailbox_ctx.mailbox().clone();
521
522 let (tx, rx) = tokio::sync::mpsc::channel(1);
523
524 mailbox_ctx
525 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
526 .await;
527
528 send_mock_reply(mailbox, rx, |id| {
529 Err(error::MailboxTimeoutSnafu { id }.build())
530 });
531
532 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
533
534 assert_matches!(err, Error::RetryLater { .. });
535 assert!(err.is_retryable());
536 }
537
538 #[tokio::test]
539 async fn test_downgrade_region_failed() {
540 let state = DowngradeLeaderRegion::default();
541 let persistent_context = new_persistent_context();
542 let from_peer_id = persistent_context.from_peer.id;
543
544 let mut env = TestingEnv::new();
545 let mut ctx = env.context_factory().new_context(persistent_context);
546 prepare_table_metadata(&ctx, HashMap::default()).await;
547 let mailbox_ctx = env.mailbox_context();
548 let mailbox = mailbox_ctx.mailbox().clone();
549
550 let (tx, rx) = tokio::sync::mpsc::channel(1);
551
552 mailbox_ctx
553 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
554 .await;
555
556 send_mock_reply(mailbox, rx, |id| {
557 Ok(new_downgrade_region_reply(
558 id,
559 None,
560 false,
561 Some("test mocked".to_string()),
562 ))
563 });
564
565 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
566
567 assert_matches!(err, Error::RetryLater { .. });
568 assert!(err.is_retryable());
569 assert!(format!("{err:?}").contains("test mocked"), "err: {err:?}",);
570 }
571
572 #[tokio::test]
573 async fn test_downgrade_region_with_retry_fast_path() {
574 let state = DowngradeLeaderRegion::default();
575 let persistent_context = new_persistent_context();
576 let from_peer_id = persistent_context.from_peer.id;
577
578 let mut env = TestingEnv::new();
579 let mut ctx = env.context_factory().new_context(persistent_context);
580 prepare_table_metadata(&ctx, HashMap::default()).await;
581 let mailbox_ctx = env.mailbox_context();
582 let mailbox = mailbox_ctx.mailbox().clone();
583
584 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
585
586 mailbox_ctx
587 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
588 .await;
589
590 common_runtime::spawn_global(async move {
591 let resp = rx.recv().await.unwrap().unwrap();
593 let reply_id = resp.mailbox_message.unwrap().id;
594 mailbox
595 .on_recv(
596 reply_id,
597 Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
598 )
599 .await
600 .unwrap();
601
602 let resp = rx.recv().await.unwrap().unwrap();
604 let reply_id = resp.mailbox_message.unwrap().id;
605 mailbox
606 .on_recv(
607 reply_id,
608 Ok(new_downgrade_region_reply(reply_id, Some(1), true, None)),
609 )
610 .await
611 .unwrap();
612 });
613
614 state.downgrade_region_with_retry(&mut ctx).await.unwrap();
615 assert_eq!(
616 ctx.volatile_ctx
617 .leader_region_last_entry_ids
618 .get(&RegionId::new(0, 0))
619 .cloned(),
620 Some(1)
621 );
622 assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none());
623 }
624
625 #[tokio::test]
626 async fn test_downgrade_region_with_retry_slow_path() {
627 let state = DowngradeLeaderRegion {
628 optimistic_retry: 3,
629 retry_initial_interval: Duration::from_millis(100),
630 };
631 let persistent_context = new_persistent_context();
632 let from_peer_id = persistent_context.from_peer.id;
633
634 let mut env = TestingEnv::new();
635 let mut ctx = env.context_factory().new_context(persistent_context);
636 let mailbox_ctx = env.mailbox_context();
637 let mailbox = mailbox_ctx.mailbox().clone();
638
639 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
640
641 mailbox_ctx
642 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
643 .await;
644
645 common_runtime::spawn_global(async move {
646 for _ in 0..3 {
647 let resp = rx.recv().await.unwrap().unwrap();
648 let reply_id = resp.mailbox_message.unwrap().id;
649 mailbox
650 .on_recv(
651 reply_id,
652 Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
653 )
654 .await
655 .unwrap();
656 }
657 });
658
659 ctx.volatile_ctx
660 .set_leader_region_lease_deadline(Duration::from_secs(5));
661 let expected_deadline = ctx.volatile_ctx.leader_region_lease_deadline.unwrap();
662 let err = state
663 .downgrade_region_with_retry(&mut ctx)
664 .await
665 .unwrap_err();
666 assert_matches!(err, error::Error::DowngradeLeader { .. });
667 assert_eq!(
670 ctx.volatile_ctx.leader_region_lease_deadline.unwrap(),
671 expected_deadline
672 )
673 }
674
675 #[tokio::test]
676 async fn test_next_upgrade_candidate_state() {
677 let mut state = Box::<DowngradeLeaderRegion>::default();
678 let persistent_context = new_persistent_context();
679 let from_peer_id = persistent_context.from_peer.id;
680
681 let mut env = TestingEnv::new();
682 let mut ctx = env.context_factory().new_context(persistent_context);
683 prepare_table_metadata(&ctx, HashMap::default()).await;
684 let mailbox_ctx = env.mailbox_context();
685 let mailbox = mailbox_ctx.mailbox().clone();
686
687 let (tx, rx) = tokio::sync::mpsc::channel(1);
688
689 mailbox_ctx
690 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
691 .await;
692
693 send_mock_reply(mailbox, rx, |id| {
694 Ok(new_downgrade_region_reply(id, Some(1), true, None))
695 });
696
697 let timer = Instant::now();
698 let procedure_ctx = new_procedure_context();
699 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
700 let elapsed = timer.elapsed().as_secs();
701 assert!(elapsed < REGION_LEASE_SECS / 2);
702 assert_eq!(
703 ctx.volatile_ctx
704 .leader_region_last_entry_ids
705 .get(&RegionId::new(0, 0))
706 .cloned(),
707 Some(1)
708 );
709 assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none());
710
711 let _ = next
712 .as_any()
713 .downcast_ref::<UpgradeCandidateRegion>()
714 .unwrap();
715 }
716
717 #[tokio::test]
718 async fn test_downgrade_region_procedure_exceeded_deadline() {
719 let mut state = Box::<UpgradeCandidateRegion>::default();
720 state.retry_initial_interval = Duration::from_millis(100);
721 let persistent_context = new_persistent_context();
722 let to_peer_id = persistent_context.to_peer.id;
723
724 let mut env = TestingEnv::new();
725 let mut ctx = env.context_factory().new_context(persistent_context);
726 let mailbox_ctx = env.mailbox_context();
727 let mailbox = mailbox_ctx.mailbox().clone();
728 ctx.volatile_ctx.metrics.operations_elapsed =
729 ctx.persistent_ctx.timeout + Duration::from_secs(1);
730
731 let (tx, rx) = tokio::sync::mpsc::channel(1);
732 mailbox_ctx
733 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
734 .await;
735
736 send_mock_reply(mailbox, rx, |id| {
737 Ok(new_downgrade_region_reply(id, None, true, None))
738 });
739 let procedure_ctx = new_procedure_context();
740 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
741 let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
742 assert_matches!(update_metadata, UpdateMetadata::Rollback);
743 }
744}