1use std::any::Any;
16use std::time::Duration;
17
18use api::v1::meta::MailboxMessage;
19use common_error::ext::BoxedError;
20use common_meta::distributed_time_constants::default_distributed_time_constants;
21use common_meta::instruction::{
22 DowngradeRegion, DowngradeRegionReply, DowngradeRegionsReply, Instruction, InstructionReply,
23};
24use common_procedure::{Context as ProcedureContext, Status};
25use common_telemetry::tracing_context::TracingContext;
26use common_telemetry::{debug, error, info, warn};
27use common_time::util::current_time_millis;
28use serde::{Deserialize, Serialize};
29use snafu::{OptionExt, ResultExt};
30use tokio::time::{Instant, sleep};
31
32use crate::discovery::utils::find_datanode_lease_value;
33use crate::error::{self, Result};
34use crate::handler::HeartbeatMailbox;
35use crate::procedure::region_migration::update_metadata::UpdateMetadata;
36use crate::procedure::region_migration::upgrade_candidate_region::UpgradeCandidateRegion;
37use crate::procedure::region_migration::{Context, State};
38use crate::service::mailbox::Channel;
39
40#[derive(Debug, Serialize, Deserialize)]
41pub struct DowngradeLeaderRegion {
42 optimistic_retry: usize,
44 retry_initial_interval: Duration,
46}
47
48impl Default for DowngradeLeaderRegion {
49 fn default() -> Self {
50 Self {
51 optimistic_retry: 3,
52 retry_initial_interval: Duration::from_millis(500),
53 }
54 }
55}
56
57#[async_trait::async_trait]
58#[typetag::serde]
59impl State for DowngradeLeaderRegion {
60 async fn next(
61 &mut self,
62 ctx: &mut Context,
63 _procedure_ctx: &ProcedureContext,
64 ) -> Result<(Box<dyn State>, Status)> {
65 let now = Instant::now();
66 ctx.volatile_ctx
68 .set_leader_region_lease_deadline(default_distributed_time_constants().region_lease);
69
70 match self.downgrade_region_with_retry(ctx).await {
71 Ok(_) => {
72 info!(
74 "Downgraded region leader success, region: {:?}",
75 ctx.persistent_ctx.region_ids
76 );
77 }
78 Err(error::Error::ExceededDeadline { .. }) => {
79 info!(
80 "Downgrade region leader exceeded deadline, region: {:?}",
81 ctx.persistent_ctx.region_ids
82 );
83 return Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false)));
85 }
86 Err(err) => {
87 error!(err; "Occurs non-retryable error, region: {:?}", ctx.persistent_ctx.region_ids);
88 if let Some(deadline) = ctx.volatile_ctx.leader_region_lease_deadline.as_ref() {
89 info!(
90 "Running into the downgrade region leader slow path, region: {:?}, sleep until {:?}",
91 ctx.persistent_ctx.region_ids, deadline
92 );
93 tokio::time::sleep_until(*deadline).await;
94 } else {
95 warn!(
96 "Leader region lease deadline is not set, region: {:?}",
97 ctx.persistent_ctx.region_ids
98 );
99 }
100 }
101 }
102 ctx.update_downgrade_leader_region_elapsed(now);
103
104 Ok((
105 Box::new(UpgradeCandidateRegion::default()),
106 Status::executing(false),
107 ))
108 }
109
110 fn as_any(&self) -> &dyn Any {
111 self
112 }
113}
114
115impl DowngradeLeaderRegion {
116 fn build_downgrade_region_instruction(
118 &self,
119 ctx: &Context,
120 flush_timeout: Duration,
121 ) -> Instruction {
122 let region_ids = &ctx.persistent_ctx.region_ids;
123 let mut downgrade_regions = Vec::with_capacity(region_ids.len());
124 for region_id in region_ids {
125 downgrade_regions.push(DowngradeRegion {
126 region_id: *region_id,
127 flush_timeout: Some(flush_timeout),
128 });
129 }
130
131 Instruction::DowngradeRegions(downgrade_regions)
132 }
133
134 fn handle_downgrade_region_reply(
135 &self,
136 ctx: &mut Context,
137 reply: &DowngradeRegionReply,
138 now: &Instant,
139 ) -> Result<()> {
140 let leader = &ctx.persistent_ctx.from_peer;
141 let DowngradeRegionReply {
142 region_id,
143 last_entry_id,
144 metadata_last_entry_id,
145 exists,
146 error,
147 } = reply;
148
149 if error.is_some() {
150 return error::RetryLaterSnafu {
151 reason: format!(
152 "Failed to downgrade the region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
153 region_id, leader, error, now.elapsed()
154 ),
155 }
156 .fail();
157 }
158
159 if !exists {
160 warn!(
161 "Trying to downgrade the region {} on datanode {:?}, but region doesn't exist!, elapsed: {:?}",
162 region_id,
163 leader,
164 now.elapsed()
165 );
166 } else {
167 info!(
168 "Region {} leader is downgraded on datanode {:?}, last_entry_id: {:?}, metadata_last_entry_id: {:?}, elapsed: {:?}",
169 region_id,
170 leader,
171 last_entry_id,
172 metadata_last_entry_id,
173 now.elapsed()
174 );
175 }
176
177 if let Some(last_entry_id) = last_entry_id {
178 debug!(
179 "set last_entry_id: {:?}, region_id: {:?}",
180 last_entry_id, region_id
181 );
182 ctx.volatile_ctx
183 .set_last_entry_id(*region_id, *last_entry_id);
184 }
185
186 if let Some(metadata_last_entry_id) = metadata_last_entry_id {
187 ctx.volatile_ctx
188 .set_metadata_last_entry_id(*region_id, *metadata_last_entry_id);
189 }
190
191 Ok(())
192 }
193
194 async fn downgrade_region(&self, ctx: &mut Context) -> Result<()> {
208 let region_ids = &ctx.persistent_ctx.region_ids;
209 let operation_timeout =
210 ctx.next_operation_timeout()
211 .context(error::ExceededDeadlineSnafu {
212 operation: "Downgrade region",
213 })?;
214 let downgrade_instruction = self.build_downgrade_region_instruction(ctx, operation_timeout);
215
216 let leader = &ctx.persistent_ctx.from_peer;
217 let tracing_ctx = TracingContext::from_current_span();
218 let msg = MailboxMessage::json_message(
219 &format!("Downgrade leader regions: {:?}", region_ids),
220 &format!("Metasrv@{}", ctx.server_addr()),
221 &format!("Datanode-{}@{}", leader.id, leader.addr),
222 common_time::util::current_time_millis(),
223 &downgrade_instruction,
224 Some(tracing_ctx.to_w3c()),
225 )
226 .with_context(|_| error::SerializeToJsonSnafu {
227 input: downgrade_instruction.to_string(),
228 })?;
229
230 let ch = Channel::Datanode(leader.id);
231 let now = Instant::now();
232 let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
233
234 match receiver.await {
235 Ok(msg) => {
236 let reply = HeartbeatMailbox::json_reply(&msg)?;
237 info!(
238 "Received downgrade region reply: {:?}, region: {:?}, elapsed: {:?}",
239 reply,
240 region_ids,
241 now.elapsed()
242 );
243 let InstructionReply::DowngradeRegions(DowngradeRegionsReply { replies }) = reply
244 else {
245 return error::UnexpectedInstructionReplySnafu {
246 mailbox_message: msg.to_string(),
247 reason: "expect downgrade region reply",
248 }
249 .fail();
250 };
251
252 for reply in replies {
253 self.handle_downgrade_region_reply(ctx, &reply, &now)?;
254 }
255 Ok(())
256 }
257 Err(error::Error::MailboxTimeout { .. }) => {
258 let reason = format!(
259 "Mailbox received timeout for downgrade leader region {region_ids:?} on datanode {:?}, elapsed: {:?}",
260 leader,
261 now.elapsed()
262 );
263 error::RetryLaterSnafu { reason }.fail()
264 }
265 Err(err) => Err(err),
266 }
267 }
268
269 async fn update_leader_region_lease_deadline(&self, ctx: &mut Context) {
270 let leader = &ctx.persistent_ctx.from_peer;
271
272 let last_connection_at = match find_datanode_lease_value(&ctx.in_memory, leader.id).await {
273 Ok(lease_value) => lease_value.map(|lease_value| lease_value.timestamp_millis),
274 Err(err) => {
275 error!(err; "Failed to find datanode lease value for datanode: {}, during region migration, region: {:?}", leader, ctx.persistent_ctx.region_ids);
276 return;
277 }
278 };
279
280 if let Some(last_connection_at) = last_connection_at {
281 let now = current_time_millis();
282 let elapsed = now - last_connection_at;
283 let region_lease = default_distributed_time_constants().region_lease;
284
285 if elapsed >= (region_lease.as_secs() * 1000) as i64 {
291 ctx.volatile_ctx.reset_leader_region_lease_deadline();
292 info!(
293 "Datanode {}({}) has been disconnected for longer than the region lease period ({:?}), reset leader region lease deadline to None, region: {:?}",
294 leader, last_connection_at, region_lease, ctx.persistent_ctx.region_ids
295 );
296 } else if elapsed > 0 {
297 let lease_timeout =
299 region_lease - Duration::from_millis((now - last_connection_at) as u64);
300 ctx.volatile_ctx.reset_leader_region_lease_deadline();
301 ctx.volatile_ctx
302 .set_leader_region_lease_deadline(lease_timeout);
303 info!(
304 "Datanode {}({}) last connected {:?} ago, updated leader region lease deadline to {:?}, region: {:?}",
305 leader,
306 last_connection_at,
307 elapsed,
308 ctx.volatile_ctx.leader_region_lease_deadline,
309 ctx.persistent_ctx.region_ids
310 );
311 } else {
312 warn!(
313 "Datanode {} has invalid last connection timestamp: {} (which is after current time: {}), region: {:?}",
314 leader, last_connection_at, now, ctx.persistent_ctx.region_ids
315 )
316 }
317 } else {
318 warn!(
319 "Failed to find last connection time for datanode {}, unable to update region lease deadline, region: {:?}",
320 leader, ctx.persistent_ctx.region_ids
321 )
322 }
323 }
324
325 async fn downgrade_region_with_retry(&self, ctx: &mut Context) -> Result<()> {
336 let mut retry = 0;
337
338 loop {
339 let timer = Instant::now();
340 if let Err(err) = self.downgrade_region(ctx).await {
341 ctx.update_operations_elapsed(timer);
342 retry += 1;
343 if matches!(err, error::Error::ExceededDeadline { .. }) {
345 error!(err; "Failed to downgrade region leader, regions: {:?}, exceeded deadline", ctx.persistent_ctx.region_ids);
346 return Err(err);
347 } else if matches!(err, error::Error::PusherNotFound { .. }) {
348 error!(err; "Failed to downgrade region leader, regions: {:?}, datanode({}) is unreachable(PusherNotFound)", ctx.persistent_ctx.region_ids, ctx.persistent_ctx.from_peer.id);
350 self.update_leader_region_lease_deadline(ctx).await;
351 return Err(err);
352 } else if err.is_retryable() && retry < self.optimistic_retry {
353 error!(err; "Failed to downgrade region leader, regions: {:?}, retry later", ctx.persistent_ctx.region_ids);
354 sleep(self.retry_initial_interval).await;
355 } else {
356 return Err(BoxedError::new(err)).context(error::DowngradeLeaderSnafu {
357 region_id: ctx.persistent_ctx.region_ids[0],
359 })?;
360 }
361 } else {
362 ctx.update_operations_elapsed(timer);
363 ctx.volatile_ctx.reset_leader_region_lease_deadline();
365 break;
366 }
367 }
368
369 Ok(())
370 }
371}
372
373#[cfg(test)]
374mod tests {
375 use std::assert_matches::assert_matches;
376 use std::collections::HashMap;
377
378 use common_meta::key::table_route::TableRouteValue;
379 use common_meta::key::test_utils::new_test_table_info;
380 use common_meta::peer::Peer;
381 use common_meta::rpc::router::{Region, RegionRoute};
382 use store_api::storage::RegionId;
383 use tokio::time::Instant;
384
385 use super::*;
386 use crate::error::Error;
387 use crate::procedure::region_migration::manager::RegionMigrationTriggerReason;
388 use crate::procedure::region_migration::test_util::{TestingEnv, new_procedure_context};
389 use crate::procedure::region_migration::{ContextFactory, PersistentContext};
390 use crate::procedure::test_util::{
391 new_close_region_reply, new_downgrade_region_reply, send_mock_reply,
392 };
393
394 fn new_persistent_context() -> PersistentContext {
395 PersistentContext::new(
396 vec![("greptime".into(), "public".into())],
397 Peer::empty(1),
398 Peer::empty(2),
399 vec![RegionId::new(1024, 1)],
400 Duration::from_millis(1000),
401 RegionMigrationTriggerReason::Manual,
402 )
403 }
404
405 async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap<u32, String>) {
406 let region_id = ctx.persistent_ctx.region_ids[0];
407 let table_info = new_test_table_info(region_id.table_id());
408 let region_routes = vec![RegionRoute {
409 region: Region::new_test(region_id),
410 leader_peer: Some(ctx.persistent_ctx.from_peer.clone()),
411 follower_peers: vec![ctx.persistent_ctx.to_peer.clone()],
412 ..Default::default()
413 }];
414 ctx.table_metadata_manager
415 .create_table_metadata(
416 table_info,
417 TableRouteValue::physical(region_routes),
418 wal_options,
419 )
420 .await
421 .unwrap();
422 }
423
424 #[tokio::test]
425 async fn test_datanode_is_unreachable() {
426 let state = DowngradeLeaderRegion::default();
427 let persistent_context = new_persistent_context();
428 let env = TestingEnv::new();
429 let mut ctx = env.context_factory().new_context(persistent_context);
430 prepare_table_metadata(&ctx, HashMap::default()).await;
431 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
432
433 assert_matches!(err, Error::PusherNotFound { .. });
434 assert!(!err.is_retryable());
435 }
436
437 #[tokio::test]
438 async fn test_pusher_dropped() {
439 let state = DowngradeLeaderRegion::default();
440 let persistent_context = new_persistent_context();
441 let from_peer_id = persistent_context.from_peer.id;
442
443 let mut env = TestingEnv::new();
444 let mut ctx = env.context_factory().new_context(persistent_context);
445 prepare_table_metadata(&ctx, HashMap::default()).await;
446 let mailbox_ctx = env.mailbox_context();
447
448 let (tx, rx) = tokio::sync::mpsc::channel(1);
449
450 mailbox_ctx
451 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
452 .await;
453
454 drop(rx);
455
456 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
457
458 assert_matches!(err, Error::PushMessage { .. });
459 assert!(!err.is_retryable());
460 }
461
462 #[tokio::test]
463 async fn test_procedure_exceeded_deadline() {
464 let state = DowngradeLeaderRegion::default();
465 let persistent_context = new_persistent_context();
466 let env = TestingEnv::new();
467 let mut ctx = env.context_factory().new_context(persistent_context);
468 prepare_table_metadata(&ctx, HashMap::default()).await;
469 ctx.volatile_ctx.metrics.operations_elapsed =
470 ctx.persistent_ctx.timeout + Duration::from_secs(1);
471
472 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
473
474 assert_matches!(err, Error::ExceededDeadline { .. });
475 assert!(!err.is_retryable());
476
477 let err = state
478 .downgrade_region_with_retry(&mut ctx)
479 .await
480 .unwrap_err();
481 assert_matches!(err, Error::ExceededDeadline { .. });
482 assert!(!err.is_retryable());
483 }
484
485 #[tokio::test]
486 async fn test_unexpected_instruction_reply() {
487 let state = DowngradeLeaderRegion::default();
488 let persistent_context = new_persistent_context();
489 let from_peer_id = persistent_context.from_peer.id;
490
491 let mut env = TestingEnv::new();
492 let mut ctx = env.context_factory().new_context(persistent_context);
493 prepare_table_metadata(&ctx, HashMap::default()).await;
494 let mailbox_ctx = env.mailbox_context();
495 let mailbox = mailbox_ctx.mailbox().clone();
496
497 let (tx, rx) = tokio::sync::mpsc::channel(1);
498
499 mailbox_ctx
500 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
501 .await;
502
503 send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
505
506 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
507
508 assert_matches!(err, Error::UnexpectedInstructionReply { .. });
509 assert!(!err.is_retryable());
510 }
511
512 #[tokio::test]
513 async fn test_instruction_exceeded_deadline() {
514 let state = DowngradeLeaderRegion::default();
515 let persistent_context = new_persistent_context();
516 let from_peer_id = persistent_context.from_peer.id;
517
518 let mut env = TestingEnv::new();
519 let mut ctx = env.context_factory().new_context(persistent_context);
520 prepare_table_metadata(&ctx, HashMap::default()).await;
521 let mailbox_ctx = env.mailbox_context();
522 let mailbox = mailbox_ctx.mailbox().clone();
523
524 let (tx, rx) = tokio::sync::mpsc::channel(1);
525
526 mailbox_ctx
527 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
528 .await;
529
530 send_mock_reply(mailbox, rx, |id| {
531 Err(error::MailboxTimeoutSnafu { id }.build())
532 });
533
534 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
535
536 assert_matches!(err, Error::RetryLater { .. });
537 assert!(err.is_retryable());
538 }
539
540 #[tokio::test]
541 async fn test_downgrade_region_failed() {
542 let state = DowngradeLeaderRegion::default();
543 let persistent_context = new_persistent_context();
544 let from_peer_id = persistent_context.from_peer.id;
545
546 let mut env = TestingEnv::new();
547 let mut ctx = env.context_factory().new_context(persistent_context);
548 prepare_table_metadata(&ctx, HashMap::default()).await;
549 let mailbox_ctx = env.mailbox_context();
550 let mailbox = mailbox_ctx.mailbox().clone();
551
552 let (tx, rx) = tokio::sync::mpsc::channel(1);
553
554 mailbox_ctx
555 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
556 .await;
557
558 send_mock_reply(mailbox, rx, |id| {
559 Ok(new_downgrade_region_reply(
560 id,
561 None,
562 false,
563 Some("test mocked".to_string()),
564 ))
565 });
566
567 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
568
569 assert_matches!(err, Error::RetryLater { .. });
570 assert!(err.is_retryable());
571 assert!(format!("{err:?}").contains("test mocked"), "err: {err:?}",);
572 }
573
574 #[tokio::test]
575 async fn test_downgrade_region_with_retry_fast_path() {
576 let state = DowngradeLeaderRegion::default();
577 let persistent_context = new_persistent_context();
578 let from_peer_id = persistent_context.from_peer.id;
579
580 let mut env = TestingEnv::new();
581 let mut ctx = env.context_factory().new_context(persistent_context);
582 prepare_table_metadata(&ctx, HashMap::default()).await;
583 let mailbox_ctx = env.mailbox_context();
584 let mailbox = mailbox_ctx.mailbox().clone();
585
586 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
587
588 mailbox_ctx
589 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
590 .await;
591
592 common_runtime::spawn_global(async move {
593 let resp = rx.recv().await.unwrap().unwrap();
595 let reply_id = resp.mailbox_message.unwrap().id;
596 mailbox
597 .on_recv(
598 reply_id,
599 Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
600 )
601 .await
602 .unwrap();
603
604 let resp = rx.recv().await.unwrap().unwrap();
606 let reply_id = resp.mailbox_message.unwrap().id;
607 mailbox
608 .on_recv(
609 reply_id,
610 Ok(new_downgrade_region_reply(reply_id, Some(1), true, None)),
611 )
612 .await
613 .unwrap();
614 });
615
616 state.downgrade_region_with_retry(&mut ctx).await.unwrap();
617 assert_eq!(
618 ctx.volatile_ctx
619 .leader_region_last_entry_ids
620 .get(&RegionId::new(0, 0))
621 .cloned(),
622 Some(1)
623 );
624 assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none());
625 }
626
627 #[tokio::test]
628 async fn test_downgrade_region_with_retry_slow_path() {
629 let state = DowngradeLeaderRegion {
630 optimistic_retry: 3,
631 retry_initial_interval: Duration::from_millis(100),
632 };
633 let persistent_context = new_persistent_context();
634 let from_peer_id = persistent_context.from_peer.id;
635
636 let mut env = TestingEnv::new();
637 let mut ctx = env.context_factory().new_context(persistent_context);
638 let mailbox_ctx = env.mailbox_context();
639 let mailbox = mailbox_ctx.mailbox().clone();
640
641 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
642
643 mailbox_ctx
644 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
645 .await;
646
647 common_runtime::spawn_global(async move {
648 for _ in 0..3 {
649 let resp = rx.recv().await.unwrap().unwrap();
650 let reply_id = resp.mailbox_message.unwrap().id;
651 mailbox
652 .on_recv(
653 reply_id,
654 Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
655 )
656 .await
657 .unwrap();
658 }
659 });
660
661 ctx.volatile_ctx
662 .set_leader_region_lease_deadline(Duration::from_secs(5));
663 let expected_deadline = ctx.volatile_ctx.leader_region_lease_deadline.unwrap();
664 let err = state
665 .downgrade_region_with_retry(&mut ctx)
666 .await
667 .unwrap_err();
668 assert_matches!(err, error::Error::DowngradeLeader { .. });
669 assert_eq!(
672 ctx.volatile_ctx.leader_region_lease_deadline.unwrap(),
673 expected_deadline
674 )
675 }
676
677 #[tokio::test]
678 async fn test_next_upgrade_candidate_state() {
679 let mut state = Box::<DowngradeLeaderRegion>::default();
680 let persistent_context = new_persistent_context();
681 let from_peer_id = persistent_context.from_peer.id;
682
683 let mut env = TestingEnv::new();
684 let mut ctx = env.context_factory().new_context(persistent_context);
685 prepare_table_metadata(&ctx, HashMap::default()).await;
686 let mailbox_ctx = env.mailbox_context();
687 let mailbox = mailbox_ctx.mailbox().clone();
688
689 let (tx, rx) = tokio::sync::mpsc::channel(1);
690
691 mailbox_ctx
692 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
693 .await;
694
695 send_mock_reply(mailbox, rx, |id| {
696 Ok(new_downgrade_region_reply(id, Some(1), true, None))
697 });
698
699 let timer = Instant::now();
700 let procedure_ctx = new_procedure_context();
701 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
702 let elapsed = timer.elapsed().as_secs();
703 let region_lease = default_distributed_time_constants().region_lease.as_secs();
704 assert!(elapsed < region_lease / 2);
705 assert_eq!(
706 ctx.volatile_ctx
707 .leader_region_last_entry_ids
708 .get(&RegionId::new(0, 0))
709 .cloned(),
710 Some(1)
711 );
712 assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none());
713
714 let _ = next
715 .as_any()
716 .downcast_ref::<UpgradeCandidateRegion>()
717 .unwrap();
718 }
719
720 #[tokio::test]
721 async fn test_downgrade_region_procedure_exceeded_deadline() {
722 let mut state = Box::<UpgradeCandidateRegion>::default();
723 state.retry_initial_interval = Duration::from_millis(100);
724 let persistent_context = new_persistent_context();
725 let to_peer_id = persistent_context.to_peer.id;
726
727 let mut env = TestingEnv::new();
728 let mut ctx = env.context_factory().new_context(persistent_context);
729 let mailbox_ctx = env.mailbox_context();
730 let mailbox = mailbox_ctx.mailbox().clone();
731 ctx.volatile_ctx.metrics.operations_elapsed =
732 ctx.persistent_ctx.timeout + Duration::from_secs(1);
733
734 let (tx, rx) = tokio::sync::mpsc::channel(1);
735 mailbox_ctx
736 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
737 .await;
738
739 send_mock_reply(mailbox, rx, |id| {
740 Ok(new_downgrade_region_reply(id, None, true, None))
741 });
742 let procedure_ctx = new_procedure_context();
743 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
744 let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
745 assert_matches!(update_metadata, UpdateMetadata::Rollback);
746 }
747}