1use std::any::Any;
16use std::time::Duration;
17
18use api::v1::meta::MailboxMessage;
19use common_error::ext::BoxedError;
20use common_meta::distributed_time_constants::default_distributed_time_constants;
21use common_meta::instruction::{
22 DowngradeRegion, DowngradeRegionReply, DowngradeRegionsReply, Instruction, InstructionReply,
23};
24use common_procedure::{Context as ProcedureContext, Status};
25use common_telemetry::tracing_context::TracingContext;
26use common_telemetry::{debug, error, info, warn};
27use common_time::util::current_time_millis;
28use serde::{Deserialize, Serialize};
29use snafu::{OptionExt, ResultExt};
30use tokio::time::{Instant, sleep};
31
32use crate::discovery::utils::find_datanode_lease_value;
33use crate::error::{self, Result};
34use crate::handler::HeartbeatMailbox;
35use crate::procedure::region_migration::update_metadata::UpdateMetadata;
36use crate::procedure::region_migration::upgrade_candidate_region::UpgradeCandidateRegion;
37use crate::procedure::region_migration::{Context, State};
38use crate::procedure::utils::instruction_error_result;
39use crate::service::mailbox::Channel;
40
41#[derive(Debug, Serialize, Deserialize)]
42pub struct DowngradeLeaderRegion {
43 optimistic_retry: usize,
45 retry_initial_interval: Duration,
47}
48
49impl Default for DowngradeLeaderRegion {
50 fn default() -> Self {
51 Self {
52 optimistic_retry: 3,
53 retry_initial_interval: Duration::from_millis(500),
54 }
55 }
56}
57
58#[async_trait::async_trait]
59#[typetag::serde]
60impl State for DowngradeLeaderRegion {
61 async fn next(
62 &mut self,
63 ctx: &mut Context,
64 _procedure_ctx: &ProcedureContext,
65 ) -> Result<(Box<dyn State>, Status)> {
66 let now = Instant::now();
67 ctx.volatile_ctx
69 .set_leader_region_lease_deadline(default_distributed_time_constants().region_lease);
70
71 match self.downgrade_region_with_retry(ctx).await {
72 Ok(_) => {
73 info!(
75 "Downgraded region leader success, region: {:?}",
76 ctx.persistent_ctx.region_ids
77 );
78 }
79 Err(error::Error::ExceededDeadline { .. }) => {
80 info!(
81 "Downgrade region leader exceeded deadline, region: {:?}",
82 ctx.persistent_ctx.region_ids
83 );
84 return Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false)));
86 }
87 Err(err) => {
88 error!(err; "Occurs non-retryable error, region: {:?}", ctx.persistent_ctx.region_ids);
89 if let Some(deadline) = ctx.volatile_ctx.leader_region_lease_deadline.as_ref() {
90 info!(
91 "Running into the downgrade region leader slow path, region: {:?}, sleep until {:?}",
92 ctx.persistent_ctx.region_ids, deadline
93 );
94 tokio::time::sleep_until(*deadline).await;
95 } else {
96 warn!(
97 "Leader region lease deadline is not set, region: {:?}",
98 ctx.persistent_ctx.region_ids
99 );
100 }
101 }
102 }
103 ctx.update_downgrade_leader_region_elapsed(now);
104
105 Ok((
106 Box::new(UpgradeCandidateRegion::default()),
107 Status::executing(false),
108 ))
109 }
110
111 fn as_any(&self) -> &dyn Any {
112 self
113 }
114}
115
116impl DowngradeLeaderRegion {
117 fn build_downgrade_region_instruction(
119 &self,
120 ctx: &Context,
121 flush_timeout: Duration,
122 ) -> Instruction {
123 let region_ids = &ctx.persistent_ctx.region_ids;
124 let mut downgrade_regions = Vec::with_capacity(region_ids.len());
125 for region_id in region_ids {
126 downgrade_regions.push(DowngradeRegion {
127 region_id: *region_id,
128 flush_timeout: Some(flush_timeout),
129 });
130 }
131
132 Instruction::DowngradeRegions(downgrade_regions)
133 }
134
135 fn handle_downgrade_region_reply(
136 &self,
137 ctx: &mut Context,
138 reply: &DowngradeRegionReply,
139 now: &Instant,
140 ) -> Result<()> {
141 let leader = &ctx.persistent_ctx.from_peer;
142 let DowngradeRegionReply {
143 region_id,
144 last_entry_id,
145 metadata_last_entry_id,
146 exists,
147 error,
148 } = reply;
149
150 if let Some(error) = error {
151 return instruction_error_result(
152 error,
153 format!(
154 "Failed to downgrade the region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
155 region_id,
156 leader,
157 error,
158 now.elapsed()
159 ),
160 );
161 }
162
163 if !exists {
164 warn!(
165 "Trying to downgrade the region {} on datanode {:?}, but region doesn't exist!, elapsed: {:?}",
166 region_id,
167 leader,
168 now.elapsed()
169 );
170 } else {
171 info!(
172 "Region {} leader is downgraded on datanode {:?}, last_entry_id: {:?}, metadata_last_entry_id: {:?}, elapsed: {:?}",
173 region_id,
174 leader,
175 last_entry_id,
176 metadata_last_entry_id,
177 now.elapsed()
178 );
179 }
180
181 if let Some(last_entry_id) = last_entry_id {
182 debug!(
183 "set last_entry_id: {:?}, region_id: {:?}",
184 last_entry_id, region_id
185 );
186 ctx.volatile_ctx
187 .set_last_entry_id(*region_id, *last_entry_id);
188 }
189
190 if let Some(metadata_last_entry_id) = metadata_last_entry_id {
191 ctx.volatile_ctx
192 .set_metadata_last_entry_id(*region_id, *metadata_last_entry_id);
193 }
194
195 Ok(())
196 }
197
198 async fn downgrade_region(&self, ctx: &mut Context) -> Result<()> {
212 let region_ids = &ctx.persistent_ctx.region_ids;
213 let operation_timeout =
214 ctx.next_operation_timeout()
215 .context(error::ExceededDeadlineSnafu {
216 operation: "Downgrade region",
217 })?;
218 let downgrade_instruction = self.build_downgrade_region_instruction(ctx, operation_timeout);
219
220 let leader = &ctx.persistent_ctx.from_peer;
221 let tracing_ctx = TracingContext::from_current_span();
222 let msg = MailboxMessage::json_message(
223 &format!("Downgrade leader regions: {:?}", region_ids),
224 &format!("Metasrv@{}", ctx.server_addr()),
225 &format!("Datanode-{}@{}", leader.id, leader.addr),
226 common_time::util::current_time_millis(),
227 &downgrade_instruction,
228 Some(tracing_ctx.to_w3c()),
229 )
230 .with_context(|_| error::SerializeToJsonSnafu {
231 input: downgrade_instruction.to_string(),
232 })?;
233
234 let ch = Channel::Datanode(leader.id);
235 let now = Instant::now();
236 let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
237
238 match receiver.await {
239 Ok(msg) => {
240 let reply = HeartbeatMailbox::json_reply(&msg)?;
241 info!(
242 "Received downgrade region reply: {:?}, region: {:?}, elapsed: {:?}",
243 reply,
244 region_ids,
245 now.elapsed()
246 );
247 let InstructionReply::DowngradeRegions(DowngradeRegionsReply { replies }) = reply
248 else {
249 return error::UnexpectedInstructionReplySnafu {
250 mailbox_message: msg.to_string(),
251 reason: "expect downgrade region reply",
252 }
253 .fail();
254 };
255
256 for reply in replies {
257 self.handle_downgrade_region_reply(ctx, &reply, &now)?;
258 }
259 Ok(())
260 }
261 Err(error::Error::MailboxTimeout { .. }) => {
262 let reason = format!(
263 "Mailbox received timeout for downgrade leader region {region_ids:?} on datanode {:?}, elapsed: {:?}",
264 leader,
265 now.elapsed()
266 );
267 error::RetryLaterSnafu { reason }.fail()
268 }
269 Err(err) => Err(err),
270 }
271 }
272
273 async fn update_leader_region_lease_deadline(&self, ctx: &mut Context) {
274 let leader = &ctx.persistent_ctx.from_peer;
275
276 let last_connection_at = match find_datanode_lease_value(&ctx.in_memory, leader.id).await {
277 Ok(lease_value) => lease_value.map(|lease_value| lease_value.timestamp_millis),
278 Err(err) => {
279 error!(err; "Failed to find datanode lease value for datanode: {}, during region migration, region: {:?}", leader, ctx.persistent_ctx.region_ids);
280 return;
281 }
282 };
283
284 if let Some(last_connection_at) = last_connection_at {
285 let now = current_time_millis();
286 let elapsed = now - last_connection_at;
287 let region_lease = default_distributed_time_constants().region_lease;
288
289 if elapsed >= (region_lease.as_secs() * 1000) as i64 {
295 ctx.volatile_ctx.reset_leader_region_lease_deadline();
296 info!(
297 "Datanode {}({}) has been disconnected for longer than the region lease period ({:?}), reset leader region lease deadline to None, region: {:?}",
298 leader, last_connection_at, region_lease, ctx.persistent_ctx.region_ids
299 );
300 } else if elapsed > 0 {
301 let lease_timeout =
303 region_lease - Duration::from_millis((now - last_connection_at) as u64);
304 ctx.volatile_ctx.reset_leader_region_lease_deadline();
305 ctx.volatile_ctx
306 .set_leader_region_lease_deadline(lease_timeout);
307 info!(
308 "Datanode {}({}) last connected {:?} ago, updated leader region lease deadline to {:?}, region: {:?}",
309 leader,
310 last_connection_at,
311 elapsed,
312 ctx.volatile_ctx.leader_region_lease_deadline,
313 ctx.persistent_ctx.region_ids
314 );
315 } else {
316 warn!(
317 "Datanode {} has invalid last connection timestamp: {} (which is after current time: {}), region: {:?}",
318 leader, last_connection_at, now, ctx.persistent_ctx.region_ids
319 )
320 }
321 } else {
322 warn!(
323 "Failed to find last connection time for datanode {}, unable to update region lease deadline, region: {:?}",
324 leader, ctx.persistent_ctx.region_ids
325 )
326 }
327 }
328
329 async fn downgrade_region_with_retry(&self, ctx: &mut Context) -> Result<()> {
340 let mut retry = 0;
341
342 loop {
343 let timer = Instant::now();
344 if let Err(err) = self.downgrade_region(ctx).await {
345 ctx.update_operations_elapsed(timer);
346 retry += 1;
347 if matches!(err, error::Error::ExceededDeadline { .. }) {
349 error!(err; "Failed to downgrade region leader, regions: {:?}, exceeded deadline", ctx.persistent_ctx.region_ids);
350 return Err(err);
351 } else if matches!(err, error::Error::PusherNotFound { .. }) {
352 error!(err; "Failed to downgrade region leader, regions: {:?}, datanode({}) is unreachable(PusherNotFound)", ctx.persistent_ctx.region_ids, ctx.persistent_ctx.from_peer.id);
354 self.update_leader_region_lease_deadline(ctx).await;
355 return Err(err);
356 } else if err.is_retryable() && retry < self.optimistic_retry {
357 error!(err; "Failed to downgrade region leader, regions: {:?}, retry later", ctx.persistent_ctx.region_ids);
358 sleep(self.retry_initial_interval).await;
359 } else {
360 return Err(BoxedError::new(err)).context(error::DowngradeLeaderSnafu {
361 region_id: ctx.persistent_ctx.region_ids[0],
363 })?;
364 }
365 } else {
366 ctx.update_operations_elapsed(timer);
367 ctx.volatile_ctx.reset_leader_region_lease_deadline();
369 break;
370 }
371 }
372
373 Ok(())
374 }
375}
376
377#[cfg(test)]
378mod tests {
379 use std::assert_matches;
380 use std::collections::HashMap;
381
382 use common_meta::key::table_route::TableRouteValue;
383 use common_meta::key::test_utils::new_test_table_info;
384 use common_meta::peer::Peer;
385 use common_meta::rpc::router::{Region, RegionRoute};
386 use common_meta::wal_provider::RegionWalOptions;
387 use store_api::storage::RegionId;
388 use tokio::time::Instant;
389
390 use super::*;
391 use crate::error::Error;
392 use crate::procedure::region_migration::manager::RegionMigrationTriggerReason;
393 use crate::procedure::region_migration::test_util::{TestingEnv, new_procedure_context};
394 use crate::procedure::region_migration::{ContextFactory, PersistentContext};
395 use crate::procedure::test_util::{
396 new_close_region_reply, new_downgrade_region_reply, send_mock_reply,
397 };
398
399 fn new_persistent_context() -> PersistentContext {
400 PersistentContext::new(
401 vec![("greptime".into(), "public".into())],
402 Peer::empty(1),
403 Peer::empty(2),
404 vec![RegionId::new(1024, 1)],
405 Duration::from_millis(1000),
406 RegionMigrationTriggerReason::Manual,
407 )
408 }
409
410 async fn prepare_table_metadata(ctx: &Context, wal_options: RegionWalOptions) {
411 let region_id = ctx.persistent_ctx.region_ids[0];
412 let table_info = new_test_table_info(region_id.table_id());
413 let region_routes = vec![RegionRoute {
414 region: Region::new_test(region_id),
415 leader_peer: Some(ctx.persistent_ctx.from_peer.clone()),
416 follower_peers: vec![ctx.persistent_ctx.to_peer.clone()],
417 ..Default::default()
418 }];
419 ctx.table_metadata_manager
420 .create_table_metadata(
421 table_info,
422 TableRouteValue::physical(region_routes),
423 wal_options,
424 )
425 .await
426 .unwrap();
427 }
428
429 #[tokio::test]
430 async fn test_datanode_is_unreachable() {
431 let state = DowngradeLeaderRegion::default();
432 let persistent_context = new_persistent_context();
433 let env = TestingEnv::new();
434 let mut ctx = env.context_factory().new_context(persistent_context);
435 prepare_table_metadata(&ctx, HashMap::default()).await;
436 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
437
438 assert_matches!(err, Error::PusherNotFound { .. });
439 assert!(!err.is_retryable());
440 }
441
442 #[tokio::test]
443 async fn test_pusher_dropped() {
444 let state = DowngradeLeaderRegion::default();
445 let persistent_context = new_persistent_context();
446 let from_peer_id = persistent_context.from_peer.id;
447
448 let mut env = TestingEnv::new();
449 let mut ctx = env.context_factory().new_context(persistent_context);
450 prepare_table_metadata(&ctx, HashMap::default()).await;
451 let mailbox_ctx = env.mailbox_context();
452
453 let (tx, rx) = tokio::sync::mpsc::channel(1);
454
455 mailbox_ctx
456 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
457 .await;
458
459 drop(rx);
460
461 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
462
463 assert_matches!(err, Error::PushMessage { .. });
464 assert!(!err.is_retryable());
465 }
466
467 #[tokio::test]
468 async fn test_procedure_exceeded_deadline() {
469 let state = DowngradeLeaderRegion::default();
470 let persistent_context = new_persistent_context();
471 let env = TestingEnv::new();
472 let mut ctx = env.context_factory().new_context(persistent_context);
473 prepare_table_metadata(&ctx, HashMap::default()).await;
474 ctx.volatile_ctx.metrics.operations_elapsed =
475 ctx.persistent_ctx.timeout + Duration::from_secs(1);
476
477 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
478
479 assert_matches!(err, Error::ExceededDeadline { .. });
480 assert!(!err.is_retryable());
481
482 let err = state
483 .downgrade_region_with_retry(&mut ctx)
484 .await
485 .unwrap_err();
486 assert_matches!(err, Error::ExceededDeadline { .. });
487 assert!(!err.is_retryable());
488 }
489
490 #[tokio::test]
491 async fn test_unexpected_instruction_reply() {
492 let state = DowngradeLeaderRegion::default();
493 let persistent_context = new_persistent_context();
494 let from_peer_id = persistent_context.from_peer.id;
495
496 let mut env = TestingEnv::new();
497 let mut ctx = env.context_factory().new_context(persistent_context);
498 prepare_table_metadata(&ctx, HashMap::default()).await;
499 let mailbox_ctx = env.mailbox_context();
500 let mailbox = mailbox_ctx.mailbox().clone();
501
502 let (tx, rx) = tokio::sync::mpsc::channel(1);
503
504 mailbox_ctx
505 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
506 .await;
507
508 send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
510
511 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
512
513 assert_matches!(err, Error::UnexpectedInstructionReply { .. });
514 assert!(!err.is_retryable());
515 }
516
517 #[tokio::test]
518 async fn test_instruction_exceeded_deadline() {
519 let state = DowngradeLeaderRegion::default();
520 let persistent_context = new_persistent_context();
521 let from_peer_id = persistent_context.from_peer.id;
522
523 let mut env = TestingEnv::new();
524 let mut ctx = env.context_factory().new_context(persistent_context);
525 prepare_table_metadata(&ctx, HashMap::default()).await;
526 let mailbox_ctx = env.mailbox_context();
527 let mailbox = mailbox_ctx.mailbox().clone();
528
529 let (tx, rx) = tokio::sync::mpsc::channel(1);
530
531 mailbox_ctx
532 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
533 .await;
534
535 send_mock_reply(mailbox, rx, |id| {
536 Err(error::MailboxTimeoutSnafu { id }.build())
537 });
538
539 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
540
541 assert_matches!(err, Error::RetryLater { .. });
542 assert!(err.is_retryable());
543 }
544
545 #[tokio::test]
546 async fn test_downgrade_region_failed() {
547 let state = DowngradeLeaderRegion::default();
548 let persistent_context = new_persistent_context();
549 let from_peer_id = persistent_context.from_peer.id;
550
551 let mut env = TestingEnv::new();
552 let mut ctx = env.context_factory().new_context(persistent_context);
553 prepare_table_metadata(&ctx, HashMap::default()).await;
554 let mailbox_ctx = env.mailbox_context();
555 let mailbox = mailbox_ctx.mailbox().clone();
556
557 let (tx, rx) = tokio::sync::mpsc::channel(1);
558
559 mailbox_ctx
560 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
561 .await;
562
563 send_mock_reply(mailbox, rx, |id| {
564 Ok(new_downgrade_region_reply(
565 id,
566 None,
567 false,
568 Some("test mocked".to_string()),
569 ))
570 });
571
572 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
573
574 assert_matches!(err, Error::RetryLater { .. });
575 assert!(err.is_retryable());
576 assert!(format!("{err:?}").contains("test mocked"), "err: {err:?}",);
577 }
578
579 #[tokio::test]
580 async fn test_downgrade_region_with_retry_fast_path() {
581 let state = DowngradeLeaderRegion::default();
582 let persistent_context = new_persistent_context();
583 let from_peer_id = persistent_context.from_peer.id;
584
585 let mut env = TestingEnv::new();
586 let mut ctx = env.context_factory().new_context(persistent_context);
587 prepare_table_metadata(&ctx, HashMap::default()).await;
588 let mailbox_ctx = env.mailbox_context();
589 let mailbox = mailbox_ctx.mailbox().clone();
590
591 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
592
593 mailbox_ctx
594 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
595 .await;
596
597 common_runtime::spawn_global(async move {
598 let resp = rx.recv().await.unwrap().unwrap();
600 let reply_id = resp.mailbox_message.unwrap().id;
601 mailbox
602 .on_recv(
603 reply_id,
604 Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
605 )
606 .await
607 .unwrap();
608
609 let resp = rx.recv().await.unwrap().unwrap();
611 let reply_id = resp.mailbox_message.unwrap().id;
612 mailbox
613 .on_recv(
614 reply_id,
615 Ok(new_downgrade_region_reply(reply_id, Some(1), true, None)),
616 )
617 .await
618 .unwrap();
619 });
620
621 state.downgrade_region_with_retry(&mut ctx).await.unwrap();
622 assert_eq!(
623 ctx.volatile_ctx
624 .leader_region_last_entry_ids
625 .get(&RegionId::new(0, 0))
626 .cloned(),
627 Some(1)
628 );
629 assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none());
630 }
631
632 #[tokio::test]
633 async fn test_downgrade_region_with_retry_slow_path() {
634 let state = DowngradeLeaderRegion {
635 optimistic_retry: 3,
636 retry_initial_interval: Duration::from_millis(100),
637 };
638 let persistent_context = new_persistent_context();
639 let from_peer_id = persistent_context.from_peer.id;
640
641 let mut env = TestingEnv::new();
642 let mut ctx = env.context_factory().new_context(persistent_context);
643 let mailbox_ctx = env.mailbox_context();
644 let mailbox = mailbox_ctx.mailbox().clone();
645
646 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
647
648 mailbox_ctx
649 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
650 .await;
651
652 common_runtime::spawn_global(async move {
653 for _ in 0..3 {
654 let resp = rx.recv().await.unwrap().unwrap();
655 let reply_id = resp.mailbox_message.unwrap().id;
656 mailbox
657 .on_recv(
658 reply_id,
659 Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
660 )
661 .await
662 .unwrap();
663 }
664 });
665
666 ctx.volatile_ctx
667 .set_leader_region_lease_deadline(Duration::from_secs(5));
668 let expected_deadline = ctx.volatile_ctx.leader_region_lease_deadline.unwrap();
669 let err = state
670 .downgrade_region_with_retry(&mut ctx)
671 .await
672 .unwrap_err();
673 assert_matches!(err, error::Error::DowngradeLeader { .. });
674 assert_eq!(
677 ctx.volatile_ctx.leader_region_lease_deadline.unwrap(),
678 expected_deadline
679 )
680 }
681
682 #[tokio::test]
683 async fn test_next_upgrade_candidate_state() {
684 let mut state = Box::<DowngradeLeaderRegion>::default();
685 let persistent_context = new_persistent_context();
686 let from_peer_id = persistent_context.from_peer.id;
687
688 let mut env = TestingEnv::new();
689 let mut ctx = env.context_factory().new_context(persistent_context);
690 prepare_table_metadata(&ctx, HashMap::default()).await;
691 let mailbox_ctx = env.mailbox_context();
692 let mailbox = mailbox_ctx.mailbox().clone();
693
694 let (tx, rx) = tokio::sync::mpsc::channel(1);
695
696 mailbox_ctx
697 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
698 .await;
699
700 send_mock_reply(mailbox, rx, |id| {
701 Ok(new_downgrade_region_reply(id, Some(1), true, None))
702 });
703
704 let timer = Instant::now();
705 let procedure_ctx = new_procedure_context();
706 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
707 let elapsed = timer.elapsed().as_secs();
708 let region_lease = default_distributed_time_constants().region_lease.as_secs();
709 assert!(elapsed < region_lease / 2);
710 assert_eq!(
711 ctx.volatile_ctx
712 .leader_region_last_entry_ids
713 .get(&RegionId::new(0, 0))
714 .cloned(),
715 Some(1)
716 );
717 assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none());
718
719 let _ = next
720 .as_any()
721 .downcast_ref::<UpgradeCandidateRegion>()
722 .unwrap();
723 }
724
725 #[tokio::test]
726 async fn test_downgrade_region_procedure_exceeded_deadline() {
727 let mut state = Box::<UpgradeCandidateRegion>::default();
728 state.retry_initial_interval = Duration::from_millis(100);
729 let persistent_context = new_persistent_context();
730 let to_peer_id = persistent_context.to_peer.id;
731
732 let mut env = TestingEnv::new();
733 let mut ctx = env.context_factory().new_context(persistent_context);
734 let mailbox_ctx = env.mailbox_context();
735 let mailbox = mailbox_ctx.mailbox().clone();
736 ctx.volatile_ctx.metrics.operations_elapsed =
737 ctx.persistent_ctx.timeout + Duration::from_secs(1);
738
739 let (tx, rx) = tokio::sync::mpsc::channel(1);
740 mailbox_ctx
741 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
742 .await;
743
744 send_mock_reply(mailbox, rx, |id| {
745 Ok(new_downgrade_region_reply(id, None, true, None))
746 });
747 let procedure_ctx = new_procedure_context();
748 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
749 let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
750 assert_matches!(update_metadata, UpdateMetadata::Rollback);
751 }
752}