meta_srv/procedure/region_migration/
downgrade_leader_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::time::Duration;
17
18use api::v1::meta::MailboxMessage;
19use common_error::ext::BoxedError;
20use common_meta::distributed_time_constants::REGION_LEASE_SECS;
21use common_meta::instruction::{
22    DowngradeRegion, DowngradeRegionReply, Instruction, InstructionReply,
23};
24use common_procedure::{Context as ProcedureContext, Status};
25use common_telemetry::{error, info, warn};
26use common_time::util::current_time_millis;
27use serde::{Deserialize, Serialize};
28use snafu::{OptionExt, ResultExt};
29use tokio::time::{sleep, Instant};
30
31use crate::error::{self, Result};
32use crate::handler::HeartbeatMailbox;
33use crate::lease::find_datanode_lease_value;
34use crate::procedure::region_migration::update_metadata::UpdateMetadata;
35use crate::procedure::region_migration::upgrade_candidate_region::UpgradeCandidateRegion;
36use crate::procedure::region_migration::{Context, State};
37use crate::service::mailbox::Channel;
38
39#[derive(Debug, Serialize, Deserialize)]
40pub struct DowngradeLeaderRegion {
41    // The optimistic retry times.
42    optimistic_retry: usize,
43    // The retry initial interval.
44    retry_initial_interval: Duration,
45}
46
47impl Default for DowngradeLeaderRegion {
48    fn default() -> Self {
49        Self {
50            optimistic_retry: 3,
51            retry_initial_interval: Duration::from_millis(500),
52        }
53    }
54}
55
56#[async_trait::async_trait]
57#[typetag::serde]
58impl State for DowngradeLeaderRegion {
59    async fn next(
60        &mut self,
61        ctx: &mut Context,
62        _procedure_ctx: &ProcedureContext,
63    ) -> Result<(Box<dyn State>, Status)> {
64        let now = Instant::now();
65        // Ensures the `leader_region_lease_deadline` must exist after recovering.
66        ctx.volatile_ctx
67            .set_leader_region_lease_deadline(Duration::from_secs(REGION_LEASE_SECS));
68
69        match self.downgrade_region_with_retry(ctx).await {
70            Ok(_) => {
71                // Do nothing
72                info!(
73                    "Downgraded region leader success, region: {}",
74                    ctx.persistent_ctx.region_id
75                );
76            }
77            Err(error::Error::ExceededDeadline { .. }) => {
78                info!(
79                    "Downgrade region leader exceeded deadline, region: {}",
80                    ctx.persistent_ctx.region_id
81                );
82                // Rollbacks the metadata if procedure is timeout
83                return Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false)));
84            }
85            Err(err) => {
86                error!(err; "Occurs non-retryable error, region: {}", ctx.persistent_ctx.region_id);
87                if let Some(deadline) = ctx.volatile_ctx.leader_region_lease_deadline.as_ref() {
88                    info!(
89                        "Running into the downgrade region leader slow path, region: {}, sleep until {:?}",
90                        ctx.persistent_ctx.region_id, deadline
91                    );
92                    tokio::time::sleep_until(*deadline).await;
93                } else {
94                    warn!(
95                        "Leader region lease deadline is not set, region: {}",
96                        ctx.persistent_ctx.region_id
97                    );
98                }
99            }
100        }
101        ctx.update_downgrade_leader_region_elapsed(now);
102
103        Ok((
104            Box::new(UpgradeCandidateRegion::default()),
105            Status::executing(false),
106        ))
107    }
108
109    fn as_any(&self) -> &dyn Any {
110        self
111    }
112}
113
114impl DowngradeLeaderRegion {
115    /// Builds downgrade region instruction.
116    fn build_downgrade_region_instruction(
117        &self,
118        ctx: &Context,
119        flush_timeout: Duration,
120    ) -> Instruction {
121        let pc = &ctx.persistent_ctx;
122        let region_id = pc.region_id;
123        Instruction::DowngradeRegion(DowngradeRegion {
124            region_id,
125            flush_timeout: Some(flush_timeout),
126        })
127    }
128
129    /// Tries to downgrade a leader region.
130    ///
131    /// Retry:
132    /// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout.
133    /// - Failed to downgrade region on the Datanode.
134    ///
135    /// Abort:
136    /// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable.
137    /// - [PushMessage](error::Error::PushMessage), The receiver is dropped.
138    /// - [MailboxReceiver](error::Error::MailboxReceiver), The sender is dropped without sending (impossible).
139    /// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply).
140    /// - [ExceededDeadline](error::Error::ExceededDeadline)
141    /// - Invalid JSON.
142    async fn downgrade_region(&self, ctx: &mut Context) -> Result<()> {
143        let region_id = ctx.persistent_ctx.region_id;
144        let operation_timeout =
145            ctx.next_operation_timeout()
146                .context(error::ExceededDeadlineSnafu {
147                    operation: "Downgrade region",
148                })?;
149        let downgrade_instruction = self.build_downgrade_region_instruction(ctx, operation_timeout);
150
151        let leader = &ctx.persistent_ctx.from_peer;
152        let msg = MailboxMessage::json_message(
153            &format!("Downgrade leader region: {}", region_id),
154            &format!("Metasrv@{}", ctx.server_addr()),
155            &format!("Datanode-{}@{}", leader.id, leader.addr),
156            common_time::util::current_time_millis(),
157            &downgrade_instruction,
158        )
159        .with_context(|_| error::SerializeToJsonSnafu {
160            input: downgrade_instruction.to_string(),
161        })?;
162
163        let ch = Channel::Datanode(leader.id);
164        let now = Instant::now();
165        let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
166
167        match receiver.await {
168            Ok(msg) => {
169                let reply = HeartbeatMailbox::json_reply(&msg)?;
170                info!(
171                    "Received downgrade region reply: {:?}, region: {}, elapsed: {:?}",
172                    reply,
173                    region_id,
174                    now.elapsed()
175                );
176                let InstructionReply::DowngradeRegion(DowngradeRegionReply {
177                    last_entry_id,
178                    metadata_last_entry_id,
179                    exists,
180                    error,
181                }) = reply
182                else {
183                    return error::UnexpectedInstructionReplySnafu {
184                        mailbox_message: msg.to_string(),
185                        reason: "expect downgrade region reply",
186                    }
187                    .fail();
188                };
189
190                if error.is_some() {
191                    return error::RetryLaterSnafu {
192                        reason: format!(
193                            "Failed to downgrade the region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
194                            region_id, leader, error, now.elapsed()
195                        ),
196                    }
197                    .fail();
198                }
199
200                if !exists {
201                    warn!(
202                        "Trying to downgrade the region {} on datanode {:?}, but region doesn't exist!, elapsed: {:?}",
203                        region_id, leader, now.elapsed()
204                    );
205                } else {
206                    info!(
207                        "Region {} leader is downgraded on datanode {:?}, last_entry_id: {:?}, metadata_last_entry_id: {:?}, elapsed: {:?}",
208                        region_id,
209                        leader,
210                        last_entry_id,
211                        metadata_last_entry_id,
212                        now.elapsed()
213                    );
214                }
215
216                if let Some(last_entry_id) = last_entry_id {
217                    ctx.volatile_ctx.set_last_entry_id(last_entry_id);
218                }
219
220                if let Some(metadata_last_entry_id) = metadata_last_entry_id {
221                    ctx.volatile_ctx
222                        .set_metadata_last_entry_id(metadata_last_entry_id);
223                }
224
225                Ok(())
226            }
227            Err(error::Error::MailboxTimeout { .. }) => {
228                let reason = format!(
229                    "Mailbox received timeout for downgrade leader region {region_id} on datanode {:?}, elapsed: {:?}", 
230                    leader,
231                    now.elapsed()
232                );
233                error::RetryLaterSnafu { reason }.fail()
234            }
235            Err(err) => Err(err),
236        }
237    }
238
239    async fn update_leader_region_lease_deadline(&self, ctx: &mut Context) {
240        let leader = &ctx.persistent_ctx.from_peer;
241
242        let last_connection_at = match find_datanode_lease_value(leader.id, &ctx.in_memory).await {
243            Ok(lease_value) => lease_value.map(|lease_value| lease_value.timestamp_millis),
244            Err(err) => {
245                error!(err; "Failed to find datanode lease value for datanode: {}, during region migration, region: {}", leader, ctx.persistent_ctx.region_id);
246                return;
247            }
248        };
249
250        if let Some(last_connection_at) = last_connection_at {
251            let now = current_time_millis();
252            let elapsed = now - last_connection_at;
253            let region_lease = Duration::from_secs(REGION_LEASE_SECS);
254
255            // It's safe to update the region leader lease deadline here because:
256            // 1. The old region leader has already been marked as downgraded in metadata,
257            //    which means any attempts to renew its lease will be rejected.
258            // 2. The pusher disconnect time record only gets removed when the datanode (from_peer)
259            //    establishes a new heartbeat connection stream.
260            if elapsed >= (REGION_LEASE_SECS * 1000) as i64 {
261                ctx.volatile_ctx.reset_leader_region_lease_deadline();
262                info!(
263                    "Datanode {}({}) has been disconnected for longer than the region lease period ({:?}), reset leader region lease deadline to None, region: {}", 
264                    leader,
265                    last_connection_at,
266                    region_lease,
267                    ctx.persistent_ctx.region_id
268                );
269            } else if elapsed > 0 {
270                // `now - last_connection_at` < REGION_LEASE_SECS * 1000
271                let lease_timeout =
272                    region_lease - Duration::from_millis((now - last_connection_at) as u64);
273                ctx.volatile_ctx.reset_leader_region_lease_deadline();
274                ctx.volatile_ctx
275                    .set_leader_region_lease_deadline(lease_timeout);
276                info!(
277                    "Datanode {}({}) last connected {:?} ago, updated leader region lease deadline to {:?}, region: {}",
278                    leader, last_connection_at, elapsed, ctx.volatile_ctx.leader_region_lease_deadline, ctx.persistent_ctx.region_id
279                );
280            } else {
281                warn!(
282                    "Datanode {} has invalid last connection timestamp: {} (which is after current time: {}), region: {}",
283                    leader, last_connection_at, now, ctx.persistent_ctx.region_id
284                )
285            }
286        } else {
287            warn!(
288                "Failed to find last connection time for datanode {}, unable to update region lease deadline, region: {}",
289                leader, ctx.persistent_ctx.region_id
290            )
291        }
292    }
293
294    /// Downgrades a leader region.
295    ///
296    /// Fast path:
297    /// - Waits for the reply of downgrade instruction.
298    ///
299    /// Slow path:
300    /// - Waits for the lease of the leader region expired.
301    ///
302    /// Abort:
303    /// - ExceededDeadline
304    async fn downgrade_region_with_retry(&self, ctx: &mut Context) -> Result<()> {
305        let mut retry = 0;
306
307        loop {
308            let timer = Instant::now();
309            if let Err(err) = self.downgrade_region(ctx).await {
310                ctx.update_operations_elapsed(timer);
311                retry += 1;
312                // Throws the error immediately if the procedure exceeded the deadline.
313                if matches!(err, error::Error::ExceededDeadline { .. }) {
314                    error!(err; "Failed to downgrade region leader, region: {}, exceeded deadline", ctx.persistent_ctx.region_id);
315                    return Err(err);
316                } else if matches!(err, error::Error::PusherNotFound { .. }) {
317                    // Throws the error immediately if the datanode is unreachable.
318                    error!(err; "Failed to downgrade region leader, region: {}, datanode({}) is unreachable(PusherNotFound)", ctx.persistent_ctx.region_id, ctx.persistent_ctx.from_peer.id);
319                    self.update_leader_region_lease_deadline(ctx).await;
320                    return Err(err);
321                } else if err.is_retryable() && retry < self.optimistic_retry {
322                    error!(err; "Failed to downgrade region leader, region: {}, retry later", ctx.persistent_ctx.region_id);
323                    sleep(self.retry_initial_interval).await;
324                } else {
325                    return Err(BoxedError::new(err)).context(error::DowngradeLeaderSnafu {
326                        region_id: ctx.persistent_ctx.region_id,
327                    })?;
328                }
329            } else {
330                ctx.update_operations_elapsed(timer);
331                // Resets the deadline.
332                ctx.volatile_ctx.reset_leader_region_lease_deadline();
333                break;
334            }
335        }
336
337        Ok(())
338    }
339}
340
341#[cfg(test)]
342mod tests {
343    use std::assert_matches::assert_matches;
344    use std::collections::HashMap;
345
346    use common_meta::key::table_route::TableRouteValue;
347    use common_meta::key::test_utils::new_test_table_info;
348    use common_meta::peer::Peer;
349    use common_meta::rpc::router::{Region, RegionRoute};
350    use store_api::storage::RegionId;
351    use tokio::time::Instant;
352
353    use super::*;
354    use crate::error::Error;
355    use crate::procedure::region_migration::manager::RegionMigrationTriggerReason;
356    use crate::procedure::region_migration::test_util::{new_procedure_context, TestingEnv};
357    use crate::procedure::region_migration::{ContextFactory, PersistentContext};
358    use crate::procedure::test_util::{
359        new_close_region_reply, new_downgrade_region_reply, send_mock_reply,
360    };
361
362    fn new_persistent_context() -> PersistentContext {
363        PersistentContext {
364            catalog: "greptime".into(),
365            schema: "public".into(),
366            from_peer: Peer::empty(1),
367            to_peer: Peer::empty(2),
368            region_id: RegionId::new(1024, 1),
369            timeout: Duration::from_millis(1000),
370            trigger_reason: RegionMigrationTriggerReason::Manual,
371        }
372    }
373
374    async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap<u32, String>) {
375        let table_info =
376            new_test_table_info(ctx.persistent_ctx.region_id.table_id(), vec![1]).into();
377        let region_routes = vec![RegionRoute {
378            region: Region::new_test(ctx.persistent_ctx.region_id),
379            leader_peer: Some(ctx.persistent_ctx.from_peer.clone()),
380            follower_peers: vec![ctx.persistent_ctx.to_peer.clone()],
381            ..Default::default()
382        }];
383        ctx.table_metadata_manager
384            .create_table_metadata(
385                table_info,
386                TableRouteValue::physical(region_routes),
387                wal_options,
388            )
389            .await
390            .unwrap();
391    }
392
393    #[tokio::test]
394    async fn test_datanode_is_unreachable() {
395        let state = DowngradeLeaderRegion::default();
396        let persistent_context = new_persistent_context();
397        let env = TestingEnv::new();
398        let mut ctx = env.context_factory().new_context(persistent_context);
399        prepare_table_metadata(&ctx, HashMap::default()).await;
400        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
401
402        assert_matches!(err, Error::PusherNotFound { .. });
403        assert!(!err.is_retryable());
404    }
405
406    #[tokio::test]
407    async fn test_pusher_dropped() {
408        let state = DowngradeLeaderRegion::default();
409        let persistent_context = new_persistent_context();
410        let from_peer_id = persistent_context.from_peer.id;
411
412        let mut env = TestingEnv::new();
413        let mut ctx = env.context_factory().new_context(persistent_context);
414        prepare_table_metadata(&ctx, HashMap::default()).await;
415        let mailbox_ctx = env.mailbox_context();
416
417        let (tx, rx) = tokio::sync::mpsc::channel(1);
418
419        mailbox_ctx
420            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
421            .await;
422
423        drop(rx);
424
425        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
426
427        assert_matches!(err, Error::PushMessage { .. });
428        assert!(!err.is_retryable());
429    }
430
431    #[tokio::test]
432    async fn test_procedure_exceeded_deadline() {
433        let state = DowngradeLeaderRegion::default();
434        let persistent_context = new_persistent_context();
435        let env = TestingEnv::new();
436        let mut ctx = env.context_factory().new_context(persistent_context);
437        prepare_table_metadata(&ctx, HashMap::default()).await;
438        ctx.volatile_ctx.metrics.operations_elapsed =
439            ctx.persistent_ctx.timeout + Duration::from_secs(1);
440
441        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
442
443        assert_matches!(err, Error::ExceededDeadline { .. });
444        assert!(!err.is_retryable());
445
446        let err = state
447            .downgrade_region_with_retry(&mut ctx)
448            .await
449            .unwrap_err();
450        assert_matches!(err, Error::ExceededDeadline { .. });
451        assert!(!err.is_retryable());
452    }
453
454    #[tokio::test]
455    async fn test_unexpected_instruction_reply() {
456        let state = DowngradeLeaderRegion::default();
457        let persistent_context = new_persistent_context();
458        let from_peer_id = persistent_context.from_peer.id;
459
460        let mut env = TestingEnv::new();
461        let mut ctx = env.context_factory().new_context(persistent_context);
462        prepare_table_metadata(&ctx, HashMap::default()).await;
463        let mailbox_ctx = env.mailbox_context();
464        let mailbox = mailbox_ctx.mailbox().clone();
465
466        let (tx, rx) = tokio::sync::mpsc::channel(1);
467
468        mailbox_ctx
469            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
470            .await;
471
472        // Sends an incorrect reply.
473        send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
474
475        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
476
477        assert_matches!(err, Error::UnexpectedInstructionReply { .. });
478        assert!(!err.is_retryable());
479    }
480
481    #[tokio::test]
482    async fn test_instruction_exceeded_deadline() {
483        let state = DowngradeLeaderRegion::default();
484        let persistent_context = new_persistent_context();
485        let from_peer_id = persistent_context.from_peer.id;
486
487        let mut env = TestingEnv::new();
488        let mut ctx = env.context_factory().new_context(persistent_context);
489        prepare_table_metadata(&ctx, HashMap::default()).await;
490        let mailbox_ctx = env.mailbox_context();
491        let mailbox = mailbox_ctx.mailbox().clone();
492
493        let (tx, rx) = tokio::sync::mpsc::channel(1);
494
495        mailbox_ctx
496            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
497            .await;
498
499        send_mock_reply(mailbox, rx, |id| {
500            Err(error::MailboxTimeoutSnafu { id }.build())
501        });
502
503        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
504
505        assert_matches!(err, Error::RetryLater { .. });
506        assert!(err.is_retryable());
507    }
508
509    #[tokio::test]
510    async fn test_downgrade_region_failed() {
511        let state = DowngradeLeaderRegion::default();
512        let persistent_context = new_persistent_context();
513        let from_peer_id = persistent_context.from_peer.id;
514
515        let mut env = TestingEnv::new();
516        let mut ctx = env.context_factory().new_context(persistent_context);
517        prepare_table_metadata(&ctx, HashMap::default()).await;
518        let mailbox_ctx = env.mailbox_context();
519        let mailbox = mailbox_ctx.mailbox().clone();
520
521        let (tx, rx) = tokio::sync::mpsc::channel(1);
522
523        mailbox_ctx
524            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
525            .await;
526
527        send_mock_reply(mailbox, rx, |id| {
528            Ok(new_downgrade_region_reply(
529                id,
530                None,
531                false,
532                Some("test mocked".to_string()),
533            ))
534        });
535
536        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
537
538        assert_matches!(err, Error::RetryLater { .. });
539        assert!(err.is_retryable());
540        assert!(format!("{err:?}").contains("test mocked"), "err: {err:?}",);
541    }
542
543    #[tokio::test]
544    async fn test_downgrade_region_with_retry_fast_path() {
545        let state = DowngradeLeaderRegion::default();
546        let persistent_context = new_persistent_context();
547        let from_peer_id = persistent_context.from_peer.id;
548
549        let mut env = TestingEnv::new();
550        let mut ctx = env.context_factory().new_context(persistent_context);
551        prepare_table_metadata(&ctx, HashMap::default()).await;
552        let mailbox_ctx = env.mailbox_context();
553        let mailbox = mailbox_ctx.mailbox().clone();
554
555        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
556
557        mailbox_ctx
558            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
559            .await;
560
561        common_runtime::spawn_global(async move {
562            // retry: 0.
563            let resp = rx.recv().await.unwrap().unwrap();
564            let reply_id = resp.mailbox_message.unwrap().id;
565            mailbox
566                .on_recv(
567                    reply_id,
568                    Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
569                )
570                .await
571                .unwrap();
572
573            // retry: 1.
574            let resp = rx.recv().await.unwrap().unwrap();
575            let reply_id = resp.mailbox_message.unwrap().id;
576            mailbox
577                .on_recv(
578                    reply_id,
579                    Ok(new_downgrade_region_reply(reply_id, Some(1), true, None)),
580                )
581                .await
582                .unwrap();
583        });
584
585        state.downgrade_region_with_retry(&mut ctx).await.unwrap();
586        assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, Some(1));
587        assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none());
588    }
589
590    #[tokio::test]
591    async fn test_downgrade_region_with_retry_slow_path() {
592        let state = DowngradeLeaderRegion {
593            optimistic_retry: 3,
594            retry_initial_interval: Duration::from_millis(100),
595        };
596        let persistent_context = new_persistent_context();
597        let from_peer_id = persistent_context.from_peer.id;
598
599        let mut env = TestingEnv::new();
600        let mut ctx = env.context_factory().new_context(persistent_context);
601        let mailbox_ctx = env.mailbox_context();
602        let mailbox = mailbox_ctx.mailbox().clone();
603
604        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
605
606        mailbox_ctx
607            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
608            .await;
609
610        common_runtime::spawn_global(async move {
611            for _ in 0..3 {
612                let resp = rx.recv().await.unwrap().unwrap();
613                let reply_id = resp.mailbox_message.unwrap().id;
614                mailbox
615                    .on_recv(
616                        reply_id,
617                        Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
618                    )
619                    .await
620                    .unwrap();
621            }
622        });
623
624        ctx.volatile_ctx
625            .set_leader_region_lease_deadline(Duration::from_secs(5));
626        let expected_deadline = ctx.volatile_ctx.leader_region_lease_deadline.unwrap();
627        let err = state
628            .downgrade_region_with_retry(&mut ctx)
629            .await
630            .unwrap_err();
631        assert_matches!(err, error::Error::DowngradeLeader { .. });
632        assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, None);
633        // Should remain no change.
634        assert_eq!(
635            ctx.volatile_ctx.leader_region_lease_deadline.unwrap(),
636            expected_deadline
637        )
638    }
639
640    #[tokio::test]
641    async fn test_next_upgrade_candidate_state() {
642        let mut state = Box::<DowngradeLeaderRegion>::default();
643        let persistent_context = new_persistent_context();
644        let from_peer_id = persistent_context.from_peer.id;
645
646        let mut env = TestingEnv::new();
647        let mut ctx = env.context_factory().new_context(persistent_context);
648        prepare_table_metadata(&ctx, HashMap::default()).await;
649        let mailbox_ctx = env.mailbox_context();
650        let mailbox = mailbox_ctx.mailbox().clone();
651
652        let (tx, rx) = tokio::sync::mpsc::channel(1);
653
654        mailbox_ctx
655            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
656            .await;
657
658        send_mock_reply(mailbox, rx, |id| {
659            Ok(new_downgrade_region_reply(id, Some(1), true, None))
660        });
661
662        let timer = Instant::now();
663        let procedure_ctx = new_procedure_context();
664        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
665        let elapsed = timer.elapsed().as_secs();
666        assert!(elapsed < REGION_LEASE_SECS / 2);
667        assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, Some(1));
668        assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none());
669
670        let _ = next
671            .as_any()
672            .downcast_ref::<UpgradeCandidateRegion>()
673            .unwrap();
674    }
675
676    #[tokio::test]
677    async fn test_downgrade_region_procedure_exceeded_deadline() {
678        let mut state = Box::<UpgradeCandidateRegion>::default();
679        state.retry_initial_interval = Duration::from_millis(100);
680        let persistent_context = new_persistent_context();
681        let to_peer_id = persistent_context.to_peer.id;
682
683        let mut env = TestingEnv::new();
684        let mut ctx = env.context_factory().new_context(persistent_context);
685        let mailbox_ctx = env.mailbox_context();
686        let mailbox = mailbox_ctx.mailbox().clone();
687        ctx.volatile_ctx.metrics.operations_elapsed =
688            ctx.persistent_ctx.timeout + Duration::from_secs(1);
689
690        let (tx, rx) = tokio::sync::mpsc::channel(1);
691        mailbox_ctx
692            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
693            .await;
694
695        send_mock_reply(mailbox, rx, |id| {
696            Ok(new_downgrade_region_reply(id, None, true, None))
697        });
698        let procedure_ctx = new_procedure_context();
699        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
700        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
701        assert_matches!(update_metadata, UpdateMetadata::Rollback);
702    }
703}