meta_srv/procedure/region_migration/
downgrade_leader_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::time::Duration;
17
18use api::v1::meta::MailboxMessage;
19use common_error::ext::BoxedError;
20use common_meta::distributed_time_constants::REGION_LEASE_SECS;
21use common_meta::instruction::{
22    DowngradeRegion, DowngradeRegionReply, Instruction, InstructionReply,
23};
24use common_procedure::{Context as ProcedureContext, Status};
25use common_telemetry::{error, info, warn};
26use common_time::util::current_time_millis;
27use serde::{Deserialize, Serialize};
28use snafu::{OptionExt, ResultExt};
29use tokio::time::{Instant, sleep};
30
31use crate::discovery::utils::find_datanode_lease_value;
32use crate::error::{self, Result};
33use crate::handler::HeartbeatMailbox;
34use crate::procedure::region_migration::update_metadata::UpdateMetadata;
35use crate::procedure::region_migration::upgrade_candidate_region::UpgradeCandidateRegion;
36use crate::procedure::region_migration::{Context, State};
37use crate::service::mailbox::Channel;
38
39#[derive(Debug, Serialize, Deserialize)]
40pub struct DowngradeLeaderRegion {
41    // The optimistic retry times.
42    optimistic_retry: usize,
43    // The retry initial interval.
44    retry_initial_interval: Duration,
45}
46
47impl Default for DowngradeLeaderRegion {
48    fn default() -> Self {
49        Self {
50            optimistic_retry: 3,
51            retry_initial_interval: Duration::from_millis(500),
52        }
53    }
54}
55
56#[async_trait::async_trait]
57#[typetag::serde]
58impl State for DowngradeLeaderRegion {
59    async fn next(
60        &mut self,
61        ctx: &mut Context,
62        _procedure_ctx: &ProcedureContext,
63    ) -> Result<(Box<dyn State>, Status)> {
64        let now = Instant::now();
65        // Ensures the `leader_region_lease_deadline` must exist after recovering.
66        ctx.volatile_ctx
67            .set_leader_region_lease_deadline(Duration::from_secs(REGION_LEASE_SECS));
68
69        match self.downgrade_region_with_retry(ctx).await {
70            Ok(_) => {
71                // Do nothing
72                info!(
73                    "Downgraded region leader success, region: {}",
74                    ctx.persistent_ctx.region_id
75                );
76            }
77            Err(error::Error::ExceededDeadline { .. }) => {
78                info!(
79                    "Downgrade region leader exceeded deadline, region: {}",
80                    ctx.persistent_ctx.region_id
81                );
82                // Rollbacks the metadata if procedure is timeout
83                return Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false)));
84            }
85            Err(err) => {
86                error!(err; "Occurs non-retryable error, region: {}", ctx.persistent_ctx.region_id);
87                if let Some(deadline) = ctx.volatile_ctx.leader_region_lease_deadline.as_ref() {
88                    info!(
89                        "Running into the downgrade region leader slow path, region: {}, sleep until {:?}",
90                        ctx.persistent_ctx.region_id, deadline
91                    );
92                    tokio::time::sleep_until(*deadline).await;
93                } else {
94                    warn!(
95                        "Leader region lease deadline is not set, region: {}",
96                        ctx.persistent_ctx.region_id
97                    );
98                }
99            }
100        }
101        ctx.update_downgrade_leader_region_elapsed(now);
102
103        Ok((
104            Box::new(UpgradeCandidateRegion::default()),
105            Status::executing(false),
106        ))
107    }
108
109    fn as_any(&self) -> &dyn Any {
110        self
111    }
112}
113
114impl DowngradeLeaderRegion {
115    /// Builds downgrade region instruction.
116    fn build_downgrade_region_instruction(
117        &self,
118        ctx: &Context,
119        flush_timeout: Duration,
120    ) -> Instruction {
121        let pc = &ctx.persistent_ctx;
122        let region_id = pc.region_id;
123        Instruction::DowngradeRegion(DowngradeRegion {
124            region_id,
125            flush_timeout: Some(flush_timeout),
126        })
127    }
128
129    /// Tries to downgrade a leader region.
130    ///
131    /// Retry:
132    /// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout.
133    /// - Failed to downgrade region on the Datanode.
134    ///
135    /// Abort:
136    /// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable.
137    /// - [PushMessage](error::Error::PushMessage), The receiver is dropped.
138    /// - [MailboxReceiver](error::Error::MailboxReceiver), The sender is dropped without sending (impossible).
139    /// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply).
140    /// - [ExceededDeadline](error::Error::ExceededDeadline)
141    /// - Invalid JSON.
142    async fn downgrade_region(&self, ctx: &mut Context) -> Result<()> {
143        let region_id = ctx.persistent_ctx.region_id;
144        let operation_timeout =
145            ctx.next_operation_timeout()
146                .context(error::ExceededDeadlineSnafu {
147                    operation: "Downgrade region",
148                })?;
149        let downgrade_instruction = self.build_downgrade_region_instruction(ctx, operation_timeout);
150
151        let leader = &ctx.persistent_ctx.from_peer;
152        let msg = MailboxMessage::json_message(
153            &format!("Downgrade leader region: {}", region_id),
154            &format!("Metasrv@{}", ctx.server_addr()),
155            &format!("Datanode-{}@{}", leader.id, leader.addr),
156            common_time::util::current_time_millis(),
157            &downgrade_instruction,
158        )
159        .with_context(|_| error::SerializeToJsonSnafu {
160            input: downgrade_instruction.to_string(),
161        })?;
162
163        let ch = Channel::Datanode(leader.id);
164        let now = Instant::now();
165        let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
166
167        match receiver.await {
168            Ok(msg) => {
169                let reply = HeartbeatMailbox::json_reply(&msg)?;
170                info!(
171                    "Received downgrade region reply: {:?}, region: {}, elapsed: {:?}",
172                    reply,
173                    region_id,
174                    now.elapsed()
175                );
176                let InstructionReply::DowngradeRegion(DowngradeRegionReply {
177                    last_entry_id,
178                    metadata_last_entry_id,
179                    exists,
180                    error,
181                }) = reply
182                else {
183                    return error::UnexpectedInstructionReplySnafu {
184                        mailbox_message: msg.to_string(),
185                        reason: "expect downgrade region reply",
186                    }
187                    .fail();
188                };
189
190                if error.is_some() {
191                    return error::RetryLaterSnafu {
192                        reason: format!(
193                            "Failed to downgrade the region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
194                            region_id, leader, error, now.elapsed()
195                        ),
196                    }
197                    .fail();
198                }
199
200                if !exists {
201                    warn!(
202                        "Trying to downgrade the region {} on datanode {:?}, but region doesn't exist!, elapsed: {:?}",
203                        region_id,
204                        leader,
205                        now.elapsed()
206                    );
207                } else {
208                    info!(
209                        "Region {} leader is downgraded on datanode {:?}, last_entry_id: {:?}, metadata_last_entry_id: {:?}, elapsed: {:?}",
210                        region_id,
211                        leader,
212                        last_entry_id,
213                        metadata_last_entry_id,
214                        now.elapsed()
215                    );
216                }
217
218                if let Some(last_entry_id) = last_entry_id {
219                    ctx.volatile_ctx.set_last_entry_id(last_entry_id);
220                }
221
222                if let Some(metadata_last_entry_id) = metadata_last_entry_id {
223                    ctx.volatile_ctx
224                        .set_metadata_last_entry_id(metadata_last_entry_id);
225                }
226
227                Ok(())
228            }
229            Err(error::Error::MailboxTimeout { .. }) => {
230                let reason = format!(
231                    "Mailbox received timeout for downgrade leader region {region_id} on datanode {:?}, elapsed: {:?}",
232                    leader,
233                    now.elapsed()
234                );
235                error::RetryLaterSnafu { reason }.fail()
236            }
237            Err(err) => Err(err),
238        }
239    }
240
241    async fn update_leader_region_lease_deadline(&self, ctx: &mut Context) {
242        let leader = &ctx.persistent_ctx.from_peer;
243
244        let last_connection_at = match find_datanode_lease_value(&ctx.in_memory, leader.id).await {
245            Ok(lease_value) => lease_value.map(|lease_value| lease_value.timestamp_millis),
246            Err(err) => {
247                error!(err; "Failed to find datanode lease value for datanode: {}, during region migration, region: {}", leader, ctx.persistent_ctx.region_id);
248                return;
249            }
250        };
251
252        if let Some(last_connection_at) = last_connection_at {
253            let now = current_time_millis();
254            let elapsed = now - last_connection_at;
255            let region_lease = Duration::from_secs(REGION_LEASE_SECS);
256
257            // It's safe to update the region leader lease deadline here because:
258            // 1. The old region leader has already been marked as downgraded in metadata,
259            //    which means any attempts to renew its lease will be rejected.
260            // 2. The pusher disconnect time record only gets removed when the datanode (from_peer)
261            //    establishes a new heartbeat connection stream.
262            if elapsed >= (REGION_LEASE_SECS * 1000) as i64 {
263                ctx.volatile_ctx.reset_leader_region_lease_deadline();
264                info!(
265                    "Datanode {}({}) has been disconnected for longer than the region lease period ({:?}), reset leader region lease deadline to None, region: {}",
266                    leader, last_connection_at, region_lease, ctx.persistent_ctx.region_id
267                );
268            } else if elapsed > 0 {
269                // `now - last_connection_at` < REGION_LEASE_SECS * 1000
270                let lease_timeout =
271                    region_lease - Duration::from_millis((now - last_connection_at) as u64);
272                ctx.volatile_ctx.reset_leader_region_lease_deadline();
273                ctx.volatile_ctx
274                    .set_leader_region_lease_deadline(lease_timeout);
275                info!(
276                    "Datanode {}({}) last connected {:?} ago, updated leader region lease deadline to {:?}, region: {}",
277                    leader,
278                    last_connection_at,
279                    elapsed,
280                    ctx.volatile_ctx.leader_region_lease_deadline,
281                    ctx.persistent_ctx.region_id
282                );
283            } else {
284                warn!(
285                    "Datanode {} has invalid last connection timestamp: {} (which is after current time: {}), region: {}",
286                    leader, last_connection_at, now, ctx.persistent_ctx.region_id
287                )
288            }
289        } else {
290            warn!(
291                "Failed to find last connection time for datanode {}, unable to update region lease deadline, region: {}",
292                leader, ctx.persistent_ctx.region_id
293            )
294        }
295    }
296
297    /// Downgrades a leader region.
298    ///
299    /// Fast path:
300    /// - Waits for the reply of downgrade instruction.
301    ///
302    /// Slow path:
303    /// - Waits for the lease of the leader region expired.
304    ///
305    /// Abort:
306    /// - ExceededDeadline
307    async fn downgrade_region_with_retry(&self, ctx: &mut Context) -> Result<()> {
308        let mut retry = 0;
309
310        loop {
311            let timer = Instant::now();
312            if let Err(err) = self.downgrade_region(ctx).await {
313                ctx.update_operations_elapsed(timer);
314                retry += 1;
315                // Throws the error immediately if the procedure exceeded the deadline.
316                if matches!(err, error::Error::ExceededDeadline { .. }) {
317                    error!(err; "Failed to downgrade region leader, region: {}, exceeded deadline", ctx.persistent_ctx.region_id);
318                    return Err(err);
319                } else if matches!(err, error::Error::PusherNotFound { .. }) {
320                    // Throws the error immediately if the datanode is unreachable.
321                    error!(err; "Failed to downgrade region leader, region: {}, datanode({}) is unreachable(PusherNotFound)", ctx.persistent_ctx.region_id, ctx.persistent_ctx.from_peer.id);
322                    self.update_leader_region_lease_deadline(ctx).await;
323                    return Err(err);
324                } else if err.is_retryable() && retry < self.optimistic_retry {
325                    error!(err; "Failed to downgrade region leader, region: {}, retry later", ctx.persistent_ctx.region_id);
326                    sleep(self.retry_initial_interval).await;
327                } else {
328                    return Err(BoxedError::new(err)).context(error::DowngradeLeaderSnafu {
329                        region_id: ctx.persistent_ctx.region_id,
330                    })?;
331                }
332            } else {
333                ctx.update_operations_elapsed(timer);
334                // Resets the deadline.
335                ctx.volatile_ctx.reset_leader_region_lease_deadline();
336                break;
337            }
338        }
339
340        Ok(())
341    }
342}
343
344#[cfg(test)]
345mod tests {
346    use std::assert_matches::assert_matches;
347    use std::collections::HashMap;
348
349    use common_meta::key::table_route::TableRouteValue;
350    use common_meta::key::test_utils::new_test_table_info;
351    use common_meta::peer::Peer;
352    use common_meta::rpc::router::{Region, RegionRoute};
353    use store_api::storage::RegionId;
354    use tokio::time::Instant;
355
356    use super::*;
357    use crate::error::Error;
358    use crate::procedure::region_migration::manager::RegionMigrationTriggerReason;
359    use crate::procedure::region_migration::test_util::{TestingEnv, new_procedure_context};
360    use crate::procedure::region_migration::{ContextFactory, PersistentContext};
361    use crate::procedure::test_util::{
362        new_close_region_reply, new_downgrade_region_reply, send_mock_reply,
363    };
364
365    fn new_persistent_context() -> PersistentContext {
366        PersistentContext {
367            catalog: "greptime".into(),
368            schema: "public".into(),
369            from_peer: Peer::empty(1),
370            to_peer: Peer::empty(2),
371            region_id: RegionId::new(1024, 1),
372            timeout: Duration::from_millis(1000),
373            trigger_reason: RegionMigrationTriggerReason::Manual,
374        }
375    }
376
377    async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap<u32, String>) {
378        let table_info =
379            new_test_table_info(ctx.persistent_ctx.region_id.table_id(), vec![1]).into();
380        let region_routes = vec![RegionRoute {
381            region: Region::new_test(ctx.persistent_ctx.region_id),
382            leader_peer: Some(ctx.persistent_ctx.from_peer.clone()),
383            follower_peers: vec![ctx.persistent_ctx.to_peer.clone()],
384            ..Default::default()
385        }];
386        ctx.table_metadata_manager
387            .create_table_metadata(
388                table_info,
389                TableRouteValue::physical(region_routes),
390                wal_options,
391            )
392            .await
393            .unwrap();
394    }
395
396    #[tokio::test]
397    async fn test_datanode_is_unreachable() {
398        let state = DowngradeLeaderRegion::default();
399        let persistent_context = new_persistent_context();
400        let env = TestingEnv::new();
401        let mut ctx = env.context_factory().new_context(persistent_context);
402        prepare_table_metadata(&ctx, HashMap::default()).await;
403        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
404
405        assert_matches!(err, Error::PusherNotFound { .. });
406        assert!(!err.is_retryable());
407    }
408
409    #[tokio::test]
410    async fn test_pusher_dropped() {
411        let state = DowngradeLeaderRegion::default();
412        let persistent_context = new_persistent_context();
413        let from_peer_id = persistent_context.from_peer.id;
414
415        let mut env = TestingEnv::new();
416        let mut ctx = env.context_factory().new_context(persistent_context);
417        prepare_table_metadata(&ctx, HashMap::default()).await;
418        let mailbox_ctx = env.mailbox_context();
419
420        let (tx, rx) = tokio::sync::mpsc::channel(1);
421
422        mailbox_ctx
423            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
424            .await;
425
426        drop(rx);
427
428        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
429
430        assert_matches!(err, Error::PushMessage { .. });
431        assert!(!err.is_retryable());
432    }
433
434    #[tokio::test]
435    async fn test_procedure_exceeded_deadline() {
436        let state = DowngradeLeaderRegion::default();
437        let persistent_context = new_persistent_context();
438        let env = TestingEnv::new();
439        let mut ctx = env.context_factory().new_context(persistent_context);
440        prepare_table_metadata(&ctx, HashMap::default()).await;
441        ctx.volatile_ctx.metrics.operations_elapsed =
442            ctx.persistent_ctx.timeout + Duration::from_secs(1);
443
444        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
445
446        assert_matches!(err, Error::ExceededDeadline { .. });
447        assert!(!err.is_retryable());
448
449        let err = state
450            .downgrade_region_with_retry(&mut ctx)
451            .await
452            .unwrap_err();
453        assert_matches!(err, Error::ExceededDeadline { .. });
454        assert!(!err.is_retryable());
455    }
456
457    #[tokio::test]
458    async fn test_unexpected_instruction_reply() {
459        let state = DowngradeLeaderRegion::default();
460        let persistent_context = new_persistent_context();
461        let from_peer_id = persistent_context.from_peer.id;
462
463        let mut env = TestingEnv::new();
464        let mut ctx = env.context_factory().new_context(persistent_context);
465        prepare_table_metadata(&ctx, HashMap::default()).await;
466        let mailbox_ctx = env.mailbox_context();
467        let mailbox = mailbox_ctx.mailbox().clone();
468
469        let (tx, rx) = tokio::sync::mpsc::channel(1);
470
471        mailbox_ctx
472            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
473            .await;
474
475        // Sends an incorrect reply.
476        send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
477
478        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
479
480        assert_matches!(err, Error::UnexpectedInstructionReply { .. });
481        assert!(!err.is_retryable());
482    }
483
484    #[tokio::test]
485    async fn test_instruction_exceeded_deadline() {
486        let state = DowngradeLeaderRegion::default();
487        let persistent_context = new_persistent_context();
488        let from_peer_id = persistent_context.from_peer.id;
489
490        let mut env = TestingEnv::new();
491        let mut ctx = env.context_factory().new_context(persistent_context);
492        prepare_table_metadata(&ctx, HashMap::default()).await;
493        let mailbox_ctx = env.mailbox_context();
494        let mailbox = mailbox_ctx.mailbox().clone();
495
496        let (tx, rx) = tokio::sync::mpsc::channel(1);
497
498        mailbox_ctx
499            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
500            .await;
501
502        send_mock_reply(mailbox, rx, |id| {
503            Err(error::MailboxTimeoutSnafu { id }.build())
504        });
505
506        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
507
508        assert_matches!(err, Error::RetryLater { .. });
509        assert!(err.is_retryable());
510    }
511
512    #[tokio::test]
513    async fn test_downgrade_region_failed() {
514        let state = DowngradeLeaderRegion::default();
515        let persistent_context = new_persistent_context();
516        let from_peer_id = persistent_context.from_peer.id;
517
518        let mut env = TestingEnv::new();
519        let mut ctx = env.context_factory().new_context(persistent_context);
520        prepare_table_metadata(&ctx, HashMap::default()).await;
521        let mailbox_ctx = env.mailbox_context();
522        let mailbox = mailbox_ctx.mailbox().clone();
523
524        let (tx, rx) = tokio::sync::mpsc::channel(1);
525
526        mailbox_ctx
527            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
528            .await;
529
530        send_mock_reply(mailbox, rx, |id| {
531            Ok(new_downgrade_region_reply(
532                id,
533                None,
534                false,
535                Some("test mocked".to_string()),
536            ))
537        });
538
539        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
540
541        assert_matches!(err, Error::RetryLater { .. });
542        assert!(err.is_retryable());
543        assert!(format!("{err:?}").contains("test mocked"), "err: {err:?}",);
544    }
545
546    #[tokio::test]
547    async fn test_downgrade_region_with_retry_fast_path() {
548        let state = DowngradeLeaderRegion::default();
549        let persistent_context = new_persistent_context();
550        let from_peer_id = persistent_context.from_peer.id;
551
552        let mut env = TestingEnv::new();
553        let mut ctx = env.context_factory().new_context(persistent_context);
554        prepare_table_metadata(&ctx, HashMap::default()).await;
555        let mailbox_ctx = env.mailbox_context();
556        let mailbox = mailbox_ctx.mailbox().clone();
557
558        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
559
560        mailbox_ctx
561            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
562            .await;
563
564        common_runtime::spawn_global(async move {
565            // retry: 0.
566            let resp = rx.recv().await.unwrap().unwrap();
567            let reply_id = resp.mailbox_message.unwrap().id;
568            mailbox
569                .on_recv(
570                    reply_id,
571                    Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
572                )
573                .await
574                .unwrap();
575
576            // retry: 1.
577            let resp = rx.recv().await.unwrap().unwrap();
578            let reply_id = resp.mailbox_message.unwrap().id;
579            mailbox
580                .on_recv(
581                    reply_id,
582                    Ok(new_downgrade_region_reply(reply_id, Some(1), true, None)),
583                )
584                .await
585                .unwrap();
586        });
587
588        state.downgrade_region_with_retry(&mut ctx).await.unwrap();
589        assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, Some(1));
590        assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none());
591    }
592
593    #[tokio::test]
594    async fn test_downgrade_region_with_retry_slow_path() {
595        let state = DowngradeLeaderRegion {
596            optimistic_retry: 3,
597            retry_initial_interval: Duration::from_millis(100),
598        };
599        let persistent_context = new_persistent_context();
600        let from_peer_id = persistent_context.from_peer.id;
601
602        let mut env = TestingEnv::new();
603        let mut ctx = env.context_factory().new_context(persistent_context);
604        let mailbox_ctx = env.mailbox_context();
605        let mailbox = mailbox_ctx.mailbox().clone();
606
607        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
608
609        mailbox_ctx
610            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
611            .await;
612
613        common_runtime::spawn_global(async move {
614            for _ in 0..3 {
615                let resp = rx.recv().await.unwrap().unwrap();
616                let reply_id = resp.mailbox_message.unwrap().id;
617                mailbox
618                    .on_recv(
619                        reply_id,
620                        Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
621                    )
622                    .await
623                    .unwrap();
624            }
625        });
626
627        ctx.volatile_ctx
628            .set_leader_region_lease_deadline(Duration::from_secs(5));
629        let expected_deadline = ctx.volatile_ctx.leader_region_lease_deadline.unwrap();
630        let err = state
631            .downgrade_region_with_retry(&mut ctx)
632            .await
633            .unwrap_err();
634        assert_matches!(err, error::Error::DowngradeLeader { .. });
635        assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, None);
636        // Should remain no change.
637        assert_eq!(
638            ctx.volatile_ctx.leader_region_lease_deadline.unwrap(),
639            expected_deadline
640        )
641    }
642
643    #[tokio::test]
644    async fn test_next_upgrade_candidate_state() {
645        let mut state = Box::<DowngradeLeaderRegion>::default();
646        let persistent_context = new_persistent_context();
647        let from_peer_id = persistent_context.from_peer.id;
648
649        let mut env = TestingEnv::new();
650        let mut ctx = env.context_factory().new_context(persistent_context);
651        prepare_table_metadata(&ctx, HashMap::default()).await;
652        let mailbox_ctx = env.mailbox_context();
653        let mailbox = mailbox_ctx.mailbox().clone();
654
655        let (tx, rx) = tokio::sync::mpsc::channel(1);
656
657        mailbox_ctx
658            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
659            .await;
660
661        send_mock_reply(mailbox, rx, |id| {
662            Ok(new_downgrade_region_reply(id, Some(1), true, None))
663        });
664
665        let timer = Instant::now();
666        let procedure_ctx = new_procedure_context();
667        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
668        let elapsed = timer.elapsed().as_secs();
669        assert!(elapsed < REGION_LEASE_SECS / 2);
670        assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, Some(1));
671        assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none());
672
673        let _ = next
674            .as_any()
675            .downcast_ref::<UpgradeCandidateRegion>()
676            .unwrap();
677    }
678
679    #[tokio::test]
680    async fn test_downgrade_region_procedure_exceeded_deadline() {
681        let mut state = Box::<UpgradeCandidateRegion>::default();
682        state.retry_initial_interval = Duration::from_millis(100);
683        let persistent_context = new_persistent_context();
684        let to_peer_id = persistent_context.to_peer.id;
685
686        let mut env = TestingEnv::new();
687        let mut ctx = env.context_factory().new_context(persistent_context);
688        let mailbox_ctx = env.mailbox_context();
689        let mailbox = mailbox_ctx.mailbox().clone();
690        ctx.volatile_ctx.metrics.operations_elapsed =
691            ctx.persistent_ctx.timeout + Duration::from_secs(1);
692
693        let (tx, rx) = tokio::sync::mpsc::channel(1);
694        mailbox_ctx
695            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
696            .await;
697
698        send_mock_reply(mailbox, rx, |id| {
699            Ok(new_downgrade_region_reply(id, None, true, None))
700        });
701        let procedure_ctx = new_procedure_context();
702        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
703        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
704        assert_matches!(update_metadata, UpdateMetadata::Rollback);
705    }
706}