meta_srv/procedure/region_migration/
downgrade_leader_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::time::Duration;
17
18use api::v1::meta::MailboxMessage;
19use common_error::ext::BoxedError;
20use common_meta::distributed_time_constants::REGION_LEASE_SECS;
21use common_meta::instruction::{
22    DowngradeRegion, DowngradeRegionReply, DowngradeRegionsReply, Instruction, InstructionReply,
23};
24use common_procedure::{Context as ProcedureContext, Status};
25use common_telemetry::{error, info, warn};
26use common_time::util::current_time_millis;
27use serde::{Deserialize, Serialize};
28use snafu::{OptionExt, ResultExt};
29use tokio::time::{Instant, sleep};
30
31use crate::discovery::utils::find_datanode_lease_value;
32use crate::error::{self, Result};
33use crate::handler::HeartbeatMailbox;
34use crate::procedure::region_migration::update_metadata::UpdateMetadata;
35use crate::procedure::region_migration::upgrade_candidate_region::UpgradeCandidateRegion;
36use crate::procedure::region_migration::{Context, State};
37use crate::service::mailbox::Channel;
38
39#[derive(Debug, Serialize, Deserialize)]
40pub struct DowngradeLeaderRegion {
41    // The optimistic retry times.
42    optimistic_retry: usize,
43    // The retry initial interval.
44    retry_initial_interval: Duration,
45}
46
47impl Default for DowngradeLeaderRegion {
48    fn default() -> Self {
49        Self {
50            optimistic_retry: 3,
51            retry_initial_interval: Duration::from_millis(500),
52        }
53    }
54}
55
56#[async_trait::async_trait]
57#[typetag::serde]
58impl State for DowngradeLeaderRegion {
59    async fn next(
60        &mut self,
61        ctx: &mut Context,
62        _procedure_ctx: &ProcedureContext,
63    ) -> Result<(Box<dyn State>, Status)> {
64        let now = Instant::now();
65        // Ensures the `leader_region_lease_deadline` must exist after recovering.
66        ctx.volatile_ctx
67            .set_leader_region_lease_deadline(Duration::from_secs(REGION_LEASE_SECS));
68
69        match self.downgrade_region_with_retry(ctx).await {
70            Ok(_) => {
71                // Do nothing
72                info!(
73                    "Downgraded region leader success, region: {}",
74                    ctx.persistent_ctx.region_id
75                );
76            }
77            Err(error::Error::ExceededDeadline { .. }) => {
78                info!(
79                    "Downgrade region leader exceeded deadline, region: {}",
80                    ctx.persistent_ctx.region_id
81                );
82                // Rollbacks the metadata if procedure is timeout
83                return Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false)));
84            }
85            Err(err) => {
86                error!(err; "Occurs non-retryable error, region: {}", ctx.persistent_ctx.region_id);
87                if let Some(deadline) = ctx.volatile_ctx.leader_region_lease_deadline.as_ref() {
88                    info!(
89                        "Running into the downgrade region leader slow path, region: {}, sleep until {:?}",
90                        ctx.persistent_ctx.region_id, deadline
91                    );
92                    tokio::time::sleep_until(*deadline).await;
93                } else {
94                    warn!(
95                        "Leader region lease deadline is not set, region: {}",
96                        ctx.persistent_ctx.region_id
97                    );
98                }
99            }
100        }
101        ctx.update_downgrade_leader_region_elapsed(now);
102
103        Ok((
104            Box::new(UpgradeCandidateRegion::default()),
105            Status::executing(false),
106        ))
107    }
108
109    fn as_any(&self) -> &dyn Any {
110        self
111    }
112}
113
114impl DowngradeLeaderRegion {
115    /// Builds downgrade region instruction.
116    fn build_downgrade_region_instruction(
117        &self,
118        ctx: &Context,
119        flush_timeout: Duration,
120    ) -> Instruction {
121        let pc = &ctx.persistent_ctx;
122        let region_id = pc.region_id;
123        Instruction::DowngradeRegions(vec![DowngradeRegion {
124            region_id,
125            flush_timeout: Some(flush_timeout),
126        }])
127    }
128
129    /// Tries to downgrade a leader region.
130    ///
131    /// Retry:
132    /// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout.
133    /// - Failed to downgrade region on the Datanode.
134    ///
135    /// Abort:
136    /// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable.
137    /// - [PushMessage](error::Error::PushMessage), The receiver is dropped.
138    /// - [MailboxReceiver](error::Error::MailboxReceiver), The sender is dropped without sending (impossible).
139    /// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply).
140    /// - [ExceededDeadline](error::Error::ExceededDeadline)
141    /// - Invalid JSON.
142    async fn downgrade_region(&self, ctx: &mut Context) -> Result<()> {
143        let region_id = ctx.persistent_ctx.region_id;
144        let operation_timeout =
145            ctx.next_operation_timeout()
146                .context(error::ExceededDeadlineSnafu {
147                    operation: "Downgrade region",
148                })?;
149        let downgrade_instruction = self.build_downgrade_region_instruction(ctx, operation_timeout);
150
151        let leader = &ctx.persistent_ctx.from_peer;
152        let msg = MailboxMessage::json_message(
153            &format!("Downgrade leader region: {}", region_id),
154            &format!("Metasrv@{}", ctx.server_addr()),
155            &format!("Datanode-{}@{}", leader.id, leader.addr),
156            common_time::util::current_time_millis(),
157            &downgrade_instruction,
158        )
159        .with_context(|_| error::SerializeToJsonSnafu {
160            input: downgrade_instruction.to_string(),
161        })?;
162
163        let ch = Channel::Datanode(leader.id);
164        let now = Instant::now();
165        let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
166
167        match receiver.await {
168            Ok(msg) => {
169                let reply = HeartbeatMailbox::json_reply(&msg)?;
170                info!(
171                    "Received downgrade region reply: {:?}, region: {}, elapsed: {:?}",
172                    reply,
173                    region_id,
174                    now.elapsed()
175                );
176                let InstructionReply::DowngradeRegions(DowngradeRegionsReply { replies }) = reply
177                else {
178                    return error::UnexpectedInstructionReplySnafu {
179                        mailbox_message: msg.to_string(),
180                        reason: "expect downgrade region reply",
181                    }
182                    .fail();
183                };
184
185                // TODO(weny): handle multiple replies.
186                let DowngradeRegionReply {
187                    region_id,
188                    last_entry_id,
189                    metadata_last_entry_id,
190                    exists,
191                    error,
192                } = &replies[0];
193
194                if error.is_some() {
195                    return error::RetryLaterSnafu {
196                        reason: format!(
197                            "Failed to downgrade the region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
198                            region_id, leader, error, now.elapsed()
199                        ),
200                    }
201                    .fail();
202                }
203
204                if !exists {
205                    warn!(
206                        "Trying to downgrade the region {} on datanode {:?}, but region doesn't exist!, elapsed: {:?}",
207                        region_id,
208                        leader,
209                        now.elapsed()
210                    );
211                } else {
212                    info!(
213                        "Region {} leader is downgraded on datanode {:?}, last_entry_id: {:?}, metadata_last_entry_id: {:?}, elapsed: {:?}",
214                        region_id,
215                        leader,
216                        last_entry_id,
217                        metadata_last_entry_id,
218                        now.elapsed()
219                    );
220                }
221
222                if let Some(last_entry_id) = last_entry_id {
223                    ctx.volatile_ctx.set_last_entry_id(*last_entry_id);
224                }
225
226                if let Some(metadata_last_entry_id) = metadata_last_entry_id {
227                    ctx.volatile_ctx
228                        .set_metadata_last_entry_id(*metadata_last_entry_id);
229                }
230
231                Ok(())
232            }
233            Err(error::Error::MailboxTimeout { .. }) => {
234                let reason = format!(
235                    "Mailbox received timeout for downgrade leader region {region_id} on datanode {:?}, elapsed: {:?}",
236                    leader,
237                    now.elapsed()
238                );
239                error::RetryLaterSnafu { reason }.fail()
240            }
241            Err(err) => Err(err),
242        }
243    }
244
245    async fn update_leader_region_lease_deadline(&self, ctx: &mut Context) {
246        let leader = &ctx.persistent_ctx.from_peer;
247
248        let last_connection_at = match find_datanode_lease_value(&ctx.in_memory, leader.id).await {
249            Ok(lease_value) => lease_value.map(|lease_value| lease_value.timestamp_millis),
250            Err(err) => {
251                error!(err; "Failed to find datanode lease value for datanode: {}, during region migration, region: {}", leader, ctx.persistent_ctx.region_id);
252                return;
253            }
254        };
255
256        if let Some(last_connection_at) = last_connection_at {
257            let now = current_time_millis();
258            let elapsed = now - last_connection_at;
259            let region_lease = Duration::from_secs(REGION_LEASE_SECS);
260
261            // It's safe to update the region leader lease deadline here because:
262            // 1. The old region leader has already been marked as downgraded in metadata,
263            //    which means any attempts to renew its lease will be rejected.
264            // 2. The pusher disconnect time record only gets removed when the datanode (from_peer)
265            //    establishes a new heartbeat connection stream.
266            if elapsed >= (REGION_LEASE_SECS * 1000) as i64 {
267                ctx.volatile_ctx.reset_leader_region_lease_deadline();
268                info!(
269                    "Datanode {}({}) has been disconnected for longer than the region lease period ({:?}), reset leader region lease deadline to None, region: {}",
270                    leader, last_connection_at, region_lease, ctx.persistent_ctx.region_id
271                );
272            } else if elapsed > 0 {
273                // `now - last_connection_at` < REGION_LEASE_SECS * 1000
274                let lease_timeout =
275                    region_lease - Duration::from_millis((now - last_connection_at) as u64);
276                ctx.volatile_ctx.reset_leader_region_lease_deadline();
277                ctx.volatile_ctx
278                    .set_leader_region_lease_deadline(lease_timeout);
279                info!(
280                    "Datanode {}({}) last connected {:?} ago, updated leader region lease deadline to {:?}, region: {}",
281                    leader,
282                    last_connection_at,
283                    elapsed,
284                    ctx.volatile_ctx.leader_region_lease_deadline,
285                    ctx.persistent_ctx.region_id
286                );
287            } else {
288                warn!(
289                    "Datanode {} has invalid last connection timestamp: {} (which is after current time: {}), region: {}",
290                    leader, last_connection_at, now, ctx.persistent_ctx.region_id
291                )
292            }
293        } else {
294            warn!(
295                "Failed to find last connection time for datanode {}, unable to update region lease deadline, region: {}",
296                leader, ctx.persistent_ctx.region_id
297            )
298        }
299    }
300
301    /// Downgrades a leader region.
302    ///
303    /// Fast path:
304    /// - Waits for the reply of downgrade instruction.
305    ///
306    /// Slow path:
307    /// - Waits for the lease of the leader region expired.
308    ///
309    /// Abort:
310    /// - ExceededDeadline
311    async fn downgrade_region_with_retry(&self, ctx: &mut Context) -> Result<()> {
312        let mut retry = 0;
313
314        loop {
315            let timer = Instant::now();
316            if let Err(err) = self.downgrade_region(ctx).await {
317                ctx.update_operations_elapsed(timer);
318                retry += 1;
319                // Throws the error immediately if the procedure exceeded the deadline.
320                if matches!(err, error::Error::ExceededDeadline { .. }) {
321                    error!(err; "Failed to downgrade region leader, region: {}, exceeded deadline", ctx.persistent_ctx.region_id);
322                    return Err(err);
323                } else if matches!(err, error::Error::PusherNotFound { .. }) {
324                    // Throws the error immediately if the datanode is unreachable.
325                    error!(err; "Failed to downgrade region leader, region: {}, datanode({}) is unreachable(PusherNotFound)", ctx.persistent_ctx.region_id, ctx.persistent_ctx.from_peer.id);
326                    self.update_leader_region_lease_deadline(ctx).await;
327                    return Err(err);
328                } else if err.is_retryable() && retry < self.optimistic_retry {
329                    error!(err; "Failed to downgrade region leader, region: {}, retry later", ctx.persistent_ctx.region_id);
330                    sleep(self.retry_initial_interval).await;
331                } else {
332                    return Err(BoxedError::new(err)).context(error::DowngradeLeaderSnafu {
333                        region_id: ctx.persistent_ctx.region_id,
334                    })?;
335                }
336            } else {
337                ctx.update_operations_elapsed(timer);
338                // Resets the deadline.
339                ctx.volatile_ctx.reset_leader_region_lease_deadline();
340                break;
341            }
342        }
343
344        Ok(())
345    }
346}
347
348#[cfg(test)]
349mod tests {
350    use std::assert_matches::assert_matches;
351    use std::collections::HashMap;
352
353    use common_meta::key::table_route::TableRouteValue;
354    use common_meta::key::test_utils::new_test_table_info;
355    use common_meta::peer::Peer;
356    use common_meta::rpc::router::{Region, RegionRoute};
357    use store_api::storage::RegionId;
358    use tokio::time::Instant;
359
360    use super::*;
361    use crate::error::Error;
362    use crate::procedure::region_migration::manager::RegionMigrationTriggerReason;
363    use crate::procedure::region_migration::test_util::{TestingEnv, new_procedure_context};
364    use crate::procedure::region_migration::{ContextFactory, PersistentContext};
365    use crate::procedure::test_util::{
366        new_close_region_reply, new_downgrade_region_reply, send_mock_reply,
367    };
368
369    fn new_persistent_context() -> PersistentContext {
370        PersistentContext {
371            catalog: "greptime".into(),
372            schema: "public".into(),
373            from_peer: Peer::empty(1),
374            to_peer: Peer::empty(2),
375            region_id: RegionId::new(1024, 1),
376            timeout: Duration::from_millis(1000),
377            trigger_reason: RegionMigrationTriggerReason::Manual,
378        }
379    }
380
381    async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap<u32, String>) {
382        let table_info =
383            new_test_table_info(ctx.persistent_ctx.region_id.table_id(), vec![1]).into();
384        let region_routes = vec![RegionRoute {
385            region: Region::new_test(ctx.persistent_ctx.region_id),
386            leader_peer: Some(ctx.persistent_ctx.from_peer.clone()),
387            follower_peers: vec![ctx.persistent_ctx.to_peer.clone()],
388            ..Default::default()
389        }];
390        ctx.table_metadata_manager
391            .create_table_metadata(
392                table_info,
393                TableRouteValue::physical(region_routes),
394                wal_options,
395            )
396            .await
397            .unwrap();
398    }
399
400    #[tokio::test]
401    async fn test_datanode_is_unreachable() {
402        let state = DowngradeLeaderRegion::default();
403        let persistent_context = new_persistent_context();
404        let env = TestingEnv::new();
405        let mut ctx = env.context_factory().new_context(persistent_context);
406        prepare_table_metadata(&ctx, HashMap::default()).await;
407        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
408
409        assert_matches!(err, Error::PusherNotFound { .. });
410        assert!(!err.is_retryable());
411    }
412
413    #[tokio::test]
414    async fn test_pusher_dropped() {
415        let state = DowngradeLeaderRegion::default();
416        let persistent_context = new_persistent_context();
417        let from_peer_id = persistent_context.from_peer.id;
418
419        let mut env = TestingEnv::new();
420        let mut ctx = env.context_factory().new_context(persistent_context);
421        prepare_table_metadata(&ctx, HashMap::default()).await;
422        let mailbox_ctx = env.mailbox_context();
423
424        let (tx, rx) = tokio::sync::mpsc::channel(1);
425
426        mailbox_ctx
427            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
428            .await;
429
430        drop(rx);
431
432        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
433
434        assert_matches!(err, Error::PushMessage { .. });
435        assert!(!err.is_retryable());
436    }
437
438    #[tokio::test]
439    async fn test_procedure_exceeded_deadline() {
440        let state = DowngradeLeaderRegion::default();
441        let persistent_context = new_persistent_context();
442        let env = TestingEnv::new();
443        let mut ctx = env.context_factory().new_context(persistent_context);
444        prepare_table_metadata(&ctx, HashMap::default()).await;
445        ctx.volatile_ctx.metrics.operations_elapsed =
446            ctx.persistent_ctx.timeout + Duration::from_secs(1);
447
448        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
449
450        assert_matches!(err, Error::ExceededDeadline { .. });
451        assert!(!err.is_retryable());
452
453        let err = state
454            .downgrade_region_with_retry(&mut ctx)
455            .await
456            .unwrap_err();
457        assert_matches!(err, Error::ExceededDeadline { .. });
458        assert!(!err.is_retryable());
459    }
460
461    #[tokio::test]
462    async fn test_unexpected_instruction_reply() {
463        let state = DowngradeLeaderRegion::default();
464        let persistent_context = new_persistent_context();
465        let from_peer_id = persistent_context.from_peer.id;
466
467        let mut env = TestingEnv::new();
468        let mut ctx = env.context_factory().new_context(persistent_context);
469        prepare_table_metadata(&ctx, HashMap::default()).await;
470        let mailbox_ctx = env.mailbox_context();
471        let mailbox = mailbox_ctx.mailbox().clone();
472
473        let (tx, rx) = tokio::sync::mpsc::channel(1);
474
475        mailbox_ctx
476            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
477            .await;
478
479        // Sends an incorrect reply.
480        send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
481
482        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
483
484        assert_matches!(err, Error::UnexpectedInstructionReply { .. });
485        assert!(!err.is_retryable());
486    }
487
488    #[tokio::test]
489    async fn test_instruction_exceeded_deadline() {
490        let state = DowngradeLeaderRegion::default();
491        let persistent_context = new_persistent_context();
492        let from_peer_id = persistent_context.from_peer.id;
493
494        let mut env = TestingEnv::new();
495        let mut ctx = env.context_factory().new_context(persistent_context);
496        prepare_table_metadata(&ctx, HashMap::default()).await;
497        let mailbox_ctx = env.mailbox_context();
498        let mailbox = mailbox_ctx.mailbox().clone();
499
500        let (tx, rx) = tokio::sync::mpsc::channel(1);
501
502        mailbox_ctx
503            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
504            .await;
505
506        send_mock_reply(mailbox, rx, |id| {
507            Err(error::MailboxTimeoutSnafu { id }.build())
508        });
509
510        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
511
512        assert_matches!(err, Error::RetryLater { .. });
513        assert!(err.is_retryable());
514    }
515
516    #[tokio::test]
517    async fn test_downgrade_region_failed() {
518        let state = DowngradeLeaderRegion::default();
519        let persistent_context = new_persistent_context();
520        let from_peer_id = persistent_context.from_peer.id;
521
522        let mut env = TestingEnv::new();
523        let mut ctx = env.context_factory().new_context(persistent_context);
524        prepare_table_metadata(&ctx, HashMap::default()).await;
525        let mailbox_ctx = env.mailbox_context();
526        let mailbox = mailbox_ctx.mailbox().clone();
527
528        let (tx, rx) = tokio::sync::mpsc::channel(1);
529
530        mailbox_ctx
531            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
532            .await;
533
534        send_mock_reply(mailbox, rx, |id| {
535            Ok(new_downgrade_region_reply(
536                id,
537                None,
538                false,
539                Some("test mocked".to_string()),
540            ))
541        });
542
543        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
544
545        assert_matches!(err, Error::RetryLater { .. });
546        assert!(err.is_retryable());
547        assert!(format!("{err:?}").contains("test mocked"), "err: {err:?}",);
548    }
549
550    #[tokio::test]
551    async fn test_downgrade_region_with_retry_fast_path() {
552        let state = DowngradeLeaderRegion::default();
553        let persistent_context = new_persistent_context();
554        let from_peer_id = persistent_context.from_peer.id;
555
556        let mut env = TestingEnv::new();
557        let mut ctx = env.context_factory().new_context(persistent_context);
558        prepare_table_metadata(&ctx, HashMap::default()).await;
559        let mailbox_ctx = env.mailbox_context();
560        let mailbox = mailbox_ctx.mailbox().clone();
561
562        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
563
564        mailbox_ctx
565            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
566            .await;
567
568        common_runtime::spawn_global(async move {
569            // retry: 0.
570            let resp = rx.recv().await.unwrap().unwrap();
571            let reply_id = resp.mailbox_message.unwrap().id;
572            mailbox
573                .on_recv(
574                    reply_id,
575                    Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
576                )
577                .await
578                .unwrap();
579
580            // retry: 1.
581            let resp = rx.recv().await.unwrap().unwrap();
582            let reply_id = resp.mailbox_message.unwrap().id;
583            mailbox
584                .on_recv(
585                    reply_id,
586                    Ok(new_downgrade_region_reply(reply_id, Some(1), true, None)),
587                )
588                .await
589                .unwrap();
590        });
591
592        state.downgrade_region_with_retry(&mut ctx).await.unwrap();
593        assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, Some(1));
594        assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none());
595    }
596
597    #[tokio::test]
598    async fn test_downgrade_region_with_retry_slow_path() {
599        let state = DowngradeLeaderRegion {
600            optimistic_retry: 3,
601            retry_initial_interval: Duration::from_millis(100),
602        };
603        let persistent_context = new_persistent_context();
604        let from_peer_id = persistent_context.from_peer.id;
605
606        let mut env = TestingEnv::new();
607        let mut ctx = env.context_factory().new_context(persistent_context);
608        let mailbox_ctx = env.mailbox_context();
609        let mailbox = mailbox_ctx.mailbox().clone();
610
611        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
612
613        mailbox_ctx
614            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
615            .await;
616
617        common_runtime::spawn_global(async move {
618            for _ in 0..3 {
619                let resp = rx.recv().await.unwrap().unwrap();
620                let reply_id = resp.mailbox_message.unwrap().id;
621                mailbox
622                    .on_recv(
623                        reply_id,
624                        Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
625                    )
626                    .await
627                    .unwrap();
628            }
629        });
630
631        ctx.volatile_ctx
632            .set_leader_region_lease_deadline(Duration::from_secs(5));
633        let expected_deadline = ctx.volatile_ctx.leader_region_lease_deadline.unwrap();
634        let err = state
635            .downgrade_region_with_retry(&mut ctx)
636            .await
637            .unwrap_err();
638        assert_matches!(err, error::Error::DowngradeLeader { .. });
639        assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, None);
640        // Should remain no change.
641        assert_eq!(
642            ctx.volatile_ctx.leader_region_lease_deadline.unwrap(),
643            expected_deadline
644        )
645    }
646
647    #[tokio::test]
648    async fn test_next_upgrade_candidate_state() {
649        let mut state = Box::<DowngradeLeaderRegion>::default();
650        let persistent_context = new_persistent_context();
651        let from_peer_id = persistent_context.from_peer.id;
652
653        let mut env = TestingEnv::new();
654        let mut ctx = env.context_factory().new_context(persistent_context);
655        prepare_table_metadata(&ctx, HashMap::default()).await;
656        let mailbox_ctx = env.mailbox_context();
657        let mailbox = mailbox_ctx.mailbox().clone();
658
659        let (tx, rx) = tokio::sync::mpsc::channel(1);
660
661        mailbox_ctx
662            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
663            .await;
664
665        send_mock_reply(mailbox, rx, |id| {
666            Ok(new_downgrade_region_reply(id, Some(1), true, None))
667        });
668
669        let timer = Instant::now();
670        let procedure_ctx = new_procedure_context();
671        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
672        let elapsed = timer.elapsed().as_secs();
673        assert!(elapsed < REGION_LEASE_SECS / 2);
674        assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, Some(1));
675        assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none());
676
677        let _ = next
678            .as_any()
679            .downcast_ref::<UpgradeCandidateRegion>()
680            .unwrap();
681    }
682
683    #[tokio::test]
684    async fn test_downgrade_region_procedure_exceeded_deadline() {
685        let mut state = Box::<UpgradeCandidateRegion>::default();
686        state.retry_initial_interval = Duration::from_millis(100);
687        let persistent_context = new_persistent_context();
688        let to_peer_id = persistent_context.to_peer.id;
689
690        let mut env = TestingEnv::new();
691        let mut ctx = env.context_factory().new_context(persistent_context);
692        let mailbox_ctx = env.mailbox_context();
693        let mailbox = mailbox_ctx.mailbox().clone();
694        ctx.volatile_ctx.metrics.operations_elapsed =
695            ctx.persistent_ctx.timeout + Duration::from_secs(1);
696
697        let (tx, rx) = tokio::sync::mpsc::channel(1);
698        mailbox_ctx
699            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
700            .await;
701
702        send_mock_reply(mailbox, rx, |id| {
703            Ok(new_downgrade_region_reply(id, None, true, None))
704        });
705        let procedure_ctx = new_procedure_context();
706        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
707        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
708        assert_matches!(update_metadata, UpdateMetadata::Rollback);
709    }
710}