meta_srv/procedure/region_migration/
downgrade_leader_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::time::Duration;
17
18use api::v1::meta::MailboxMessage;
19use common_error::ext::BoxedError;
20use common_meta::distributed_time_constants::REGION_LEASE_SECS;
21use common_meta::instruction::{
22    DowngradeRegion, DowngradeRegionReply, Instruction, InstructionReply,
23};
24use common_procedure::{Context as ProcedureContext, Status};
25use common_telemetry::{error, info, warn};
26use common_time::util::current_time_millis;
27use serde::{Deserialize, Serialize};
28use snafu::{OptionExt, ResultExt};
29use tokio::time::{sleep, Instant};
30
31use crate::error::{self, Result};
32use crate::handler::HeartbeatMailbox;
33use crate::lease::find_datanode_lease_value;
34use crate::procedure::region_migration::update_metadata::UpdateMetadata;
35use crate::procedure::region_migration::upgrade_candidate_region::UpgradeCandidateRegion;
36use crate::procedure::region_migration::{Context, State};
37use crate::service::mailbox::Channel;
38
39#[derive(Debug, Serialize, Deserialize)]
40pub struct DowngradeLeaderRegion {
41    // The optimistic retry times.
42    optimistic_retry: usize,
43    // The retry initial interval.
44    retry_initial_interval: Duration,
45}
46
47impl Default for DowngradeLeaderRegion {
48    fn default() -> Self {
49        Self {
50            optimistic_retry: 3,
51            retry_initial_interval: Duration::from_millis(500),
52        }
53    }
54}
55
56#[async_trait::async_trait]
57#[typetag::serde]
58impl State for DowngradeLeaderRegion {
59    async fn next(
60        &mut self,
61        ctx: &mut Context,
62        _procedure_ctx: &ProcedureContext,
63    ) -> Result<(Box<dyn State>, Status)> {
64        let now = Instant::now();
65        // Ensures the `leader_region_lease_deadline` must exist after recovering.
66        ctx.volatile_ctx
67            .set_leader_region_lease_deadline(Duration::from_secs(REGION_LEASE_SECS));
68
69        match self.downgrade_region_with_retry(ctx).await {
70            Ok(_) => {
71                // Do nothing
72                info!(
73                    "Downgraded region leader success, region: {}",
74                    ctx.persistent_ctx.region_id
75                );
76            }
77            Err(error::Error::ExceededDeadline { .. }) => {
78                info!(
79                    "Downgrade region leader exceeded deadline, region: {}",
80                    ctx.persistent_ctx.region_id
81                );
82                // Rollbacks the metadata if procedure is timeout
83                return Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false)));
84            }
85            Err(err) => {
86                error!(err; "Occurs non-retryable error, region: {}", ctx.persistent_ctx.region_id);
87                if let Some(deadline) = ctx.volatile_ctx.leader_region_lease_deadline.as_ref() {
88                    info!(
89                        "Running into the downgrade region leader slow path, region: {}, sleep until {:?}",
90                        ctx.persistent_ctx.region_id, deadline
91                    );
92                    tokio::time::sleep_until(*deadline).await;
93                } else {
94                    warn!(
95                        "Leader region lease deadline is not set, region: {}",
96                        ctx.persistent_ctx.region_id
97                    );
98                }
99            }
100        }
101        ctx.update_downgrade_leader_region_elapsed(now);
102
103        Ok((
104            Box::new(UpgradeCandidateRegion::default()),
105            Status::executing(false),
106        ))
107    }
108
109    fn as_any(&self) -> &dyn Any {
110        self
111    }
112}
113
114impl DowngradeLeaderRegion {
115    /// Builds downgrade region instruction.
116    fn build_downgrade_region_instruction(
117        &self,
118        ctx: &Context,
119        flush_timeout: Duration,
120    ) -> Instruction {
121        let pc = &ctx.persistent_ctx;
122        let region_id = pc.region_id;
123        Instruction::DowngradeRegion(DowngradeRegion {
124            region_id,
125            flush_timeout: Some(flush_timeout),
126        })
127    }
128
129    /// Tries to downgrade a leader region.
130    ///
131    /// Retry:
132    /// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout.
133    /// - Failed to downgrade region on the Datanode.
134    ///
135    /// Abort:
136    /// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable.
137    /// - [PushMessage](error::Error::PushMessage), The receiver is dropped.
138    /// - [MailboxReceiver](error::Error::MailboxReceiver), The sender is dropped without sending (impossible).
139    /// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply).
140    /// - [ExceededDeadline](error::Error::ExceededDeadline)
141    /// - Invalid JSON.
142    async fn downgrade_region(&self, ctx: &mut Context) -> Result<()> {
143        let region_id = ctx.persistent_ctx.region_id;
144        let operation_timeout =
145            ctx.next_operation_timeout()
146                .context(error::ExceededDeadlineSnafu {
147                    operation: "Downgrade region",
148                })?;
149        let downgrade_instruction = self.build_downgrade_region_instruction(ctx, operation_timeout);
150
151        let leader = &ctx.persistent_ctx.from_peer;
152        let msg = MailboxMessage::json_message(
153            &format!("Downgrade leader region: {}", region_id),
154            &format!("Metasrv@{}", ctx.server_addr()),
155            &format!("Datanode-{}@{}", leader.id, leader.addr),
156            common_time::util::current_time_millis(),
157            &downgrade_instruction,
158        )
159        .with_context(|_| error::SerializeToJsonSnafu {
160            input: downgrade_instruction.to_string(),
161        })?;
162
163        let ch = Channel::Datanode(leader.id);
164        let now = Instant::now();
165        let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
166
167        match receiver.await {
168            Ok(msg) => {
169                let reply = HeartbeatMailbox::json_reply(&msg)?;
170                info!(
171                    "Received downgrade region reply: {:?}, region: {}, elapsed: {:?}",
172                    reply,
173                    region_id,
174                    now.elapsed()
175                );
176                let InstructionReply::DowngradeRegion(DowngradeRegionReply {
177                    last_entry_id,
178                    metadata_last_entry_id,
179                    exists,
180                    error,
181                }) = reply
182                else {
183                    return error::UnexpectedInstructionReplySnafu {
184                        mailbox_message: msg.to_string(),
185                        reason: "expect downgrade region reply",
186                    }
187                    .fail();
188                };
189
190                if error.is_some() {
191                    return error::RetryLaterSnafu {
192                        reason: format!(
193                            "Failed to downgrade the region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
194                            region_id, leader, error, now.elapsed()
195                        ),
196                    }
197                    .fail();
198                }
199
200                if !exists {
201                    warn!(
202                        "Trying to downgrade the region {} on datanode {:?}, but region doesn't exist!, elapsed: {:?}",
203                        region_id, leader, now.elapsed()
204                    );
205                } else {
206                    info!(
207                        "Region {} leader is downgraded on datanode {:?}, last_entry_id: {:?}, metadata_last_entry_id: {:?}, elapsed: {:?}",
208                        region_id,
209                        leader,
210                        last_entry_id,
211                        metadata_last_entry_id,
212                        now.elapsed()
213                    );
214                }
215
216                if let Some(last_entry_id) = last_entry_id {
217                    ctx.volatile_ctx.set_last_entry_id(last_entry_id);
218                }
219
220                if let Some(metadata_last_entry_id) = metadata_last_entry_id {
221                    ctx.volatile_ctx
222                        .set_metadata_last_entry_id(metadata_last_entry_id);
223                }
224
225                Ok(())
226            }
227            Err(error::Error::MailboxTimeout { .. }) => {
228                let reason = format!(
229                    "Mailbox received timeout for downgrade leader region {region_id} on datanode {:?}, elapsed: {:?}", 
230                    leader,
231                    now.elapsed()
232                );
233                error::RetryLaterSnafu { reason }.fail()
234            }
235            Err(err) => Err(err),
236        }
237    }
238
239    async fn update_leader_region_lease_deadline(&self, ctx: &mut Context) {
240        let leader = &ctx.persistent_ctx.from_peer;
241
242        let last_connection_at = match find_datanode_lease_value(leader.id, &ctx.in_memory).await {
243            Ok(lease_value) => lease_value.map(|lease_value| lease_value.timestamp_millis),
244            Err(err) => {
245                error!(err; "Failed to find datanode lease value for datanode: {}, during region migration, region: {}", leader, ctx.persistent_ctx.region_id);
246                return;
247            }
248        };
249
250        if let Some(last_connection_at) = last_connection_at {
251            let now = current_time_millis();
252            let elapsed = now - last_connection_at;
253            let region_lease = Duration::from_secs(REGION_LEASE_SECS);
254
255            // It's safe to update the region leader lease deadline here because:
256            // 1. The old region leader has already been marked as downgraded in metadata,
257            //    which means any attempts to renew its lease will be rejected.
258            // 2. The pusher disconnect time record only gets removed when the datanode (from_peer)
259            //    establishes a new heartbeat connection stream.
260            if elapsed >= (REGION_LEASE_SECS * 1000) as i64 {
261                ctx.volatile_ctx.reset_leader_region_lease_deadline();
262                info!(
263                    "Datanode {}({}) has been disconnected for longer than the region lease period ({:?}), reset leader region lease deadline to None, region: {}", 
264                    leader,
265                    last_connection_at,
266                    region_lease,
267                    ctx.persistent_ctx.region_id
268                );
269            } else if elapsed > 0 {
270                // `now - last_connection_at` < REGION_LEASE_SECS * 1000
271                let lease_timeout =
272                    region_lease - Duration::from_millis((now - last_connection_at) as u64);
273                ctx.volatile_ctx.reset_leader_region_lease_deadline();
274                ctx.volatile_ctx
275                    .set_leader_region_lease_deadline(lease_timeout);
276                info!(
277                    "Datanode {}({}) last connected {:?} ago, updated leader region lease deadline to {:?}, region: {}",
278                    leader, last_connection_at, elapsed, ctx.volatile_ctx.leader_region_lease_deadline, ctx.persistent_ctx.region_id
279                );
280            } else {
281                warn!(
282                    "Datanode {} has invalid last connection timestamp: {} (which is after current time: {}), region: {}",
283                    leader, last_connection_at, now, ctx.persistent_ctx.region_id
284                )
285            }
286        } else {
287            warn!(
288                "Failed to find last connection time for datanode {}, unable to update region lease deadline, region: {}",
289                leader, ctx.persistent_ctx.region_id
290            )
291        }
292    }
293
294    /// Downgrades a leader region.
295    ///
296    /// Fast path:
297    /// - Waits for the reply of downgrade instruction.
298    ///
299    /// Slow path:
300    /// - Waits for the lease of the leader region expired.
301    ///
302    /// Abort:
303    /// - ExceededDeadline
304    async fn downgrade_region_with_retry(&self, ctx: &mut Context) -> Result<()> {
305        let mut retry = 0;
306
307        loop {
308            let timer = Instant::now();
309            if let Err(err) = self.downgrade_region(ctx).await {
310                ctx.update_operations_elapsed(timer);
311                retry += 1;
312                // Throws the error immediately if the procedure exceeded the deadline.
313                if matches!(err, error::Error::ExceededDeadline { .. }) {
314                    error!(err; "Failed to downgrade region leader, region: {}, exceeded deadline", ctx.persistent_ctx.region_id);
315                    return Err(err);
316                } else if matches!(err, error::Error::PusherNotFound { .. }) {
317                    // Throws the error immediately if the datanode is unreachable.
318                    error!(err; "Failed to downgrade region leader, region: {}, datanode({}) is unreachable(PusherNotFound)", ctx.persistent_ctx.region_id, ctx.persistent_ctx.from_peer.id);
319                    self.update_leader_region_lease_deadline(ctx).await;
320                    return Err(err);
321                } else if err.is_retryable() && retry < self.optimistic_retry {
322                    error!(err; "Failed to downgrade region leader, region: {}, retry later", ctx.persistent_ctx.region_id);
323                    sleep(self.retry_initial_interval).await;
324                } else {
325                    return Err(BoxedError::new(err)).context(error::DowngradeLeaderSnafu {
326                        region_id: ctx.persistent_ctx.region_id,
327                    })?;
328                }
329            } else {
330                ctx.update_operations_elapsed(timer);
331                // Resets the deadline.
332                ctx.volatile_ctx.reset_leader_region_lease_deadline();
333                break;
334            }
335        }
336
337        Ok(())
338    }
339}
340
341#[cfg(test)]
342mod tests {
343    use std::assert_matches::assert_matches;
344    use std::collections::HashMap;
345
346    use common_meta::key::table_route::TableRouteValue;
347    use common_meta::key::test_utils::new_test_table_info;
348    use common_meta::peer::Peer;
349    use common_meta::rpc::router::{Region, RegionRoute};
350    use store_api::storage::RegionId;
351    use tokio::time::Instant;
352
353    use super::*;
354    use crate::error::Error;
355    use crate::procedure::region_migration::test_util::{new_procedure_context, TestingEnv};
356    use crate::procedure::region_migration::{ContextFactory, PersistentContext};
357    use crate::procedure::test_util::{
358        new_close_region_reply, new_downgrade_region_reply, send_mock_reply,
359    };
360
361    fn new_persistent_context() -> PersistentContext {
362        PersistentContext {
363            catalog: "greptime".into(),
364            schema: "public".into(),
365            from_peer: Peer::empty(1),
366            to_peer: Peer::empty(2),
367            region_id: RegionId::new(1024, 1),
368            timeout: Duration::from_millis(1000),
369        }
370    }
371
372    async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap<u32, String>) {
373        let table_info =
374            new_test_table_info(ctx.persistent_ctx.region_id.table_id(), vec![1]).into();
375        let region_routes = vec![RegionRoute {
376            region: Region::new_test(ctx.persistent_ctx.region_id),
377            leader_peer: Some(ctx.persistent_ctx.from_peer.clone()),
378            follower_peers: vec![ctx.persistent_ctx.to_peer.clone()],
379            ..Default::default()
380        }];
381        ctx.table_metadata_manager
382            .create_table_metadata(
383                table_info,
384                TableRouteValue::physical(region_routes),
385                wal_options,
386            )
387            .await
388            .unwrap();
389    }
390
391    #[tokio::test]
392    async fn test_datanode_is_unreachable() {
393        let state = DowngradeLeaderRegion::default();
394        let persistent_context = new_persistent_context();
395        let env = TestingEnv::new();
396        let mut ctx = env.context_factory().new_context(persistent_context);
397        prepare_table_metadata(&ctx, HashMap::default()).await;
398        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
399
400        assert_matches!(err, Error::PusherNotFound { .. });
401        assert!(!err.is_retryable());
402    }
403
404    #[tokio::test]
405    async fn test_pusher_dropped() {
406        let state = DowngradeLeaderRegion::default();
407        let persistent_context = new_persistent_context();
408        let from_peer_id = persistent_context.from_peer.id;
409
410        let mut env = TestingEnv::new();
411        let mut ctx = env.context_factory().new_context(persistent_context);
412        prepare_table_metadata(&ctx, HashMap::default()).await;
413        let mailbox_ctx = env.mailbox_context();
414
415        let (tx, rx) = tokio::sync::mpsc::channel(1);
416
417        mailbox_ctx
418            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
419            .await;
420
421        drop(rx);
422
423        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
424
425        assert_matches!(err, Error::PushMessage { .. });
426        assert!(!err.is_retryable());
427    }
428
429    #[tokio::test]
430    async fn test_procedure_exceeded_deadline() {
431        let state = DowngradeLeaderRegion::default();
432        let persistent_context = new_persistent_context();
433        let env = TestingEnv::new();
434        let mut ctx = env.context_factory().new_context(persistent_context);
435        prepare_table_metadata(&ctx, HashMap::default()).await;
436        ctx.volatile_ctx.metrics.operations_elapsed =
437            ctx.persistent_ctx.timeout + Duration::from_secs(1);
438
439        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
440
441        assert_matches!(err, Error::ExceededDeadline { .. });
442        assert!(!err.is_retryable());
443
444        let err = state
445            .downgrade_region_with_retry(&mut ctx)
446            .await
447            .unwrap_err();
448        assert_matches!(err, Error::ExceededDeadline { .. });
449        assert!(!err.is_retryable());
450    }
451
452    #[tokio::test]
453    async fn test_unexpected_instruction_reply() {
454        let state = DowngradeLeaderRegion::default();
455        let persistent_context = new_persistent_context();
456        let from_peer_id = persistent_context.from_peer.id;
457
458        let mut env = TestingEnv::new();
459        let mut ctx = env.context_factory().new_context(persistent_context);
460        prepare_table_metadata(&ctx, HashMap::default()).await;
461        let mailbox_ctx = env.mailbox_context();
462        let mailbox = mailbox_ctx.mailbox().clone();
463
464        let (tx, rx) = tokio::sync::mpsc::channel(1);
465
466        mailbox_ctx
467            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
468            .await;
469
470        // Sends an incorrect reply.
471        send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
472
473        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
474
475        assert_matches!(err, Error::UnexpectedInstructionReply { .. });
476        assert!(!err.is_retryable());
477    }
478
479    #[tokio::test]
480    async fn test_instruction_exceeded_deadline() {
481        let state = DowngradeLeaderRegion::default();
482        let persistent_context = new_persistent_context();
483        let from_peer_id = persistent_context.from_peer.id;
484
485        let mut env = TestingEnv::new();
486        let mut ctx = env.context_factory().new_context(persistent_context);
487        prepare_table_metadata(&ctx, HashMap::default()).await;
488        let mailbox_ctx = env.mailbox_context();
489        let mailbox = mailbox_ctx.mailbox().clone();
490
491        let (tx, rx) = tokio::sync::mpsc::channel(1);
492
493        mailbox_ctx
494            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
495            .await;
496
497        send_mock_reply(mailbox, rx, |id| {
498            Err(error::MailboxTimeoutSnafu { id }.build())
499        });
500
501        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
502
503        assert_matches!(err, Error::RetryLater { .. });
504        assert!(err.is_retryable());
505    }
506
507    #[tokio::test]
508    async fn test_downgrade_region_failed() {
509        let state = DowngradeLeaderRegion::default();
510        let persistent_context = new_persistent_context();
511        let from_peer_id = persistent_context.from_peer.id;
512
513        let mut env = TestingEnv::new();
514        let mut ctx = env.context_factory().new_context(persistent_context);
515        prepare_table_metadata(&ctx, HashMap::default()).await;
516        let mailbox_ctx = env.mailbox_context();
517        let mailbox = mailbox_ctx.mailbox().clone();
518
519        let (tx, rx) = tokio::sync::mpsc::channel(1);
520
521        mailbox_ctx
522            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
523            .await;
524
525        send_mock_reply(mailbox, rx, |id| {
526            Ok(new_downgrade_region_reply(
527                id,
528                None,
529                false,
530                Some("test mocked".to_string()),
531            ))
532        });
533
534        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
535
536        assert_matches!(err, Error::RetryLater { .. });
537        assert!(err.is_retryable());
538        assert!(format!("{err:?}").contains("test mocked"), "err: {err:?}",);
539    }
540
541    #[tokio::test]
542    async fn test_downgrade_region_with_retry_fast_path() {
543        let state = DowngradeLeaderRegion::default();
544        let persistent_context = new_persistent_context();
545        let from_peer_id = persistent_context.from_peer.id;
546
547        let mut env = TestingEnv::new();
548        let mut ctx = env.context_factory().new_context(persistent_context);
549        prepare_table_metadata(&ctx, HashMap::default()).await;
550        let mailbox_ctx = env.mailbox_context();
551        let mailbox = mailbox_ctx.mailbox().clone();
552
553        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
554
555        mailbox_ctx
556            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
557            .await;
558
559        common_runtime::spawn_global(async move {
560            // retry: 0.
561            let resp = rx.recv().await.unwrap().unwrap();
562            let reply_id = resp.mailbox_message.unwrap().id;
563            mailbox
564                .on_recv(
565                    reply_id,
566                    Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
567                )
568                .await
569                .unwrap();
570
571            // retry: 1.
572            let resp = rx.recv().await.unwrap().unwrap();
573            let reply_id = resp.mailbox_message.unwrap().id;
574            mailbox
575                .on_recv(
576                    reply_id,
577                    Ok(new_downgrade_region_reply(reply_id, Some(1), true, None)),
578                )
579                .await
580                .unwrap();
581        });
582
583        state.downgrade_region_with_retry(&mut ctx).await.unwrap();
584        assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, Some(1));
585        assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none());
586    }
587
588    #[tokio::test]
589    async fn test_downgrade_region_with_retry_slow_path() {
590        let state = DowngradeLeaderRegion {
591            optimistic_retry: 3,
592            retry_initial_interval: Duration::from_millis(100),
593        };
594        let persistent_context = new_persistent_context();
595        let from_peer_id = persistent_context.from_peer.id;
596
597        let mut env = TestingEnv::new();
598        let mut ctx = env.context_factory().new_context(persistent_context);
599        let mailbox_ctx = env.mailbox_context();
600        let mailbox = mailbox_ctx.mailbox().clone();
601
602        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
603
604        mailbox_ctx
605            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
606            .await;
607
608        common_runtime::spawn_global(async move {
609            for _ in 0..3 {
610                let resp = rx.recv().await.unwrap().unwrap();
611                let reply_id = resp.mailbox_message.unwrap().id;
612                mailbox
613                    .on_recv(
614                        reply_id,
615                        Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
616                    )
617                    .await
618                    .unwrap();
619            }
620        });
621
622        ctx.volatile_ctx
623            .set_leader_region_lease_deadline(Duration::from_secs(5));
624        let expected_deadline = ctx.volatile_ctx.leader_region_lease_deadline.unwrap();
625        let err = state
626            .downgrade_region_with_retry(&mut ctx)
627            .await
628            .unwrap_err();
629        assert_matches!(err, error::Error::DowngradeLeader { .. });
630        assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, None);
631        // Should remain no change.
632        assert_eq!(
633            ctx.volatile_ctx.leader_region_lease_deadline.unwrap(),
634            expected_deadline
635        )
636    }
637
638    #[tokio::test]
639    async fn test_next_upgrade_candidate_state() {
640        let mut state = Box::<DowngradeLeaderRegion>::default();
641        let persistent_context = new_persistent_context();
642        let from_peer_id = persistent_context.from_peer.id;
643
644        let mut env = TestingEnv::new();
645        let mut ctx = env.context_factory().new_context(persistent_context);
646        prepare_table_metadata(&ctx, HashMap::default()).await;
647        let mailbox_ctx = env.mailbox_context();
648        let mailbox = mailbox_ctx.mailbox().clone();
649
650        let (tx, rx) = tokio::sync::mpsc::channel(1);
651
652        mailbox_ctx
653            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
654            .await;
655
656        send_mock_reply(mailbox, rx, |id| {
657            Ok(new_downgrade_region_reply(id, Some(1), true, None))
658        });
659
660        let timer = Instant::now();
661        let procedure_ctx = new_procedure_context();
662        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
663        let elapsed = timer.elapsed().as_secs();
664        assert!(elapsed < REGION_LEASE_SECS / 2);
665        assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, Some(1));
666        assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none());
667
668        let _ = next
669            .as_any()
670            .downcast_ref::<UpgradeCandidateRegion>()
671            .unwrap();
672    }
673
674    #[tokio::test]
675    async fn test_downgrade_region_procedure_exceeded_deadline() {
676        let mut state = Box::<UpgradeCandidateRegion>::default();
677        state.retry_initial_interval = Duration::from_millis(100);
678        let persistent_context = new_persistent_context();
679        let to_peer_id = persistent_context.to_peer.id;
680
681        let mut env = TestingEnv::new();
682        let mut ctx = env.context_factory().new_context(persistent_context);
683        let mailbox_ctx = env.mailbox_context();
684        let mailbox = mailbox_ctx.mailbox().clone();
685        ctx.volatile_ctx.metrics.operations_elapsed =
686            ctx.persistent_ctx.timeout + Duration::from_secs(1);
687
688        let (tx, rx) = tokio::sync::mpsc::channel(1);
689        mailbox_ctx
690            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
691            .await;
692
693        send_mock_reply(mailbox, rx, |id| {
694            Ok(new_downgrade_region_reply(id, None, true, None))
695        });
696        let procedure_ctx = new_procedure_context();
697        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
698        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
699        assert_matches!(update_metadata, UpdateMetadata::Rollback);
700    }
701}