meta_srv/procedure/region_migration/
downgrade_leader_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::time::Duration;
17
18use api::v1::meta::MailboxMessage;
19use common_error::ext::BoxedError;
20use common_meta::distributed_time_constants::default_distributed_time_constants;
21use common_meta::instruction::{
22    DowngradeRegion, DowngradeRegionReply, DowngradeRegionsReply, Instruction, InstructionReply,
23};
24use common_procedure::{Context as ProcedureContext, Status};
25use common_telemetry::tracing_context::TracingContext;
26use common_telemetry::{debug, error, info, warn};
27use common_time::util::current_time_millis;
28use serde::{Deserialize, Serialize};
29use snafu::{OptionExt, ResultExt};
30use tokio::time::{Instant, sleep};
31
32use crate::discovery::utils::find_datanode_lease_value;
33use crate::error::{self, Result};
34use crate::handler::HeartbeatMailbox;
35use crate::procedure::region_migration::update_metadata::UpdateMetadata;
36use crate::procedure::region_migration::upgrade_candidate_region::UpgradeCandidateRegion;
37use crate::procedure::region_migration::{Context, State};
38use crate::service::mailbox::Channel;
39
40#[derive(Debug, Serialize, Deserialize)]
41pub struct DowngradeLeaderRegion {
42    // The optimistic retry times.
43    optimistic_retry: usize,
44    // The retry initial interval.
45    retry_initial_interval: Duration,
46}
47
48impl Default for DowngradeLeaderRegion {
49    fn default() -> Self {
50        Self {
51            optimistic_retry: 3,
52            retry_initial_interval: Duration::from_millis(500),
53        }
54    }
55}
56
57#[async_trait::async_trait]
58#[typetag::serde]
59impl State for DowngradeLeaderRegion {
60    async fn next(
61        &mut self,
62        ctx: &mut Context,
63        _procedure_ctx: &ProcedureContext,
64    ) -> Result<(Box<dyn State>, Status)> {
65        let now = Instant::now();
66        // Ensures the `leader_region_lease_deadline` must exist after recovering.
67        ctx.volatile_ctx
68            .set_leader_region_lease_deadline(default_distributed_time_constants().region_lease);
69
70        match self.downgrade_region_with_retry(ctx).await {
71            Ok(_) => {
72                // Do nothing
73                info!(
74                    "Downgraded region leader success, region: {:?}",
75                    ctx.persistent_ctx.region_ids
76                );
77            }
78            Err(error::Error::ExceededDeadline { .. }) => {
79                info!(
80                    "Downgrade region leader exceeded deadline, region: {:?}",
81                    ctx.persistent_ctx.region_ids
82                );
83                // Rollbacks the metadata if procedure is timeout
84                return Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false)));
85            }
86            Err(err) => {
87                error!(err; "Occurs non-retryable error, region: {:?}", ctx.persistent_ctx.region_ids);
88                if let Some(deadline) = ctx.volatile_ctx.leader_region_lease_deadline.as_ref() {
89                    info!(
90                        "Running into the downgrade region leader slow path, region: {:?}, sleep until {:?}",
91                        ctx.persistent_ctx.region_ids, deadline
92                    );
93                    tokio::time::sleep_until(*deadline).await;
94                } else {
95                    warn!(
96                        "Leader region lease deadline is not set, region: {:?}",
97                        ctx.persistent_ctx.region_ids
98                    );
99                }
100            }
101        }
102        ctx.update_downgrade_leader_region_elapsed(now);
103
104        Ok((
105            Box::new(UpgradeCandidateRegion::default()),
106            Status::executing(false),
107        ))
108    }
109
110    fn as_any(&self) -> &dyn Any {
111        self
112    }
113}
114
115impl DowngradeLeaderRegion {
116    /// Builds downgrade region instruction.
117    fn build_downgrade_region_instruction(
118        &self,
119        ctx: &Context,
120        flush_timeout: Duration,
121    ) -> Instruction {
122        let region_ids = &ctx.persistent_ctx.region_ids;
123        let mut downgrade_regions = Vec::with_capacity(region_ids.len());
124        for region_id in region_ids {
125            downgrade_regions.push(DowngradeRegion {
126                region_id: *region_id,
127                flush_timeout: Some(flush_timeout),
128            });
129        }
130
131        Instruction::DowngradeRegions(downgrade_regions)
132    }
133
134    fn handle_downgrade_region_reply(
135        &self,
136        ctx: &mut Context,
137        reply: &DowngradeRegionReply,
138        now: &Instant,
139    ) -> Result<()> {
140        let leader = &ctx.persistent_ctx.from_peer;
141        let DowngradeRegionReply {
142            region_id,
143            last_entry_id,
144            metadata_last_entry_id,
145            exists,
146            error,
147        } = reply;
148
149        if error.is_some() {
150            return error::RetryLaterSnafu {
151                reason: format!(
152                    "Failed to downgrade the region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
153                    region_id, leader, error, now.elapsed()
154                ),
155            }
156            .fail();
157        }
158
159        if !exists {
160            warn!(
161                "Trying to downgrade the region {} on datanode {:?}, but region doesn't exist!, elapsed: {:?}",
162                region_id,
163                leader,
164                now.elapsed()
165            );
166        } else {
167            info!(
168                "Region {} leader is downgraded on datanode {:?}, last_entry_id: {:?}, metadata_last_entry_id: {:?}, elapsed: {:?}",
169                region_id,
170                leader,
171                last_entry_id,
172                metadata_last_entry_id,
173                now.elapsed()
174            );
175        }
176
177        if let Some(last_entry_id) = last_entry_id {
178            debug!(
179                "set last_entry_id: {:?}, region_id: {:?}",
180                last_entry_id, region_id
181            );
182            ctx.volatile_ctx
183                .set_last_entry_id(*region_id, *last_entry_id);
184        }
185
186        if let Some(metadata_last_entry_id) = metadata_last_entry_id {
187            ctx.volatile_ctx
188                .set_metadata_last_entry_id(*region_id, *metadata_last_entry_id);
189        }
190
191        Ok(())
192    }
193
194    /// Tries to downgrade a leader region.
195    ///
196    /// Retry:
197    /// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout.
198    /// - Failed to downgrade region on the Datanode.
199    ///
200    /// Abort:
201    /// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable.
202    /// - [PushMessage](error::Error::PushMessage), The receiver is dropped.
203    /// - [MailboxReceiver](error::Error::MailboxReceiver), The sender is dropped without sending (impossible).
204    /// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply).
205    /// - [ExceededDeadline](error::Error::ExceededDeadline)
206    /// - Invalid JSON.
207    async fn downgrade_region(&self, ctx: &mut Context) -> Result<()> {
208        let region_ids = &ctx.persistent_ctx.region_ids;
209        let operation_timeout =
210            ctx.next_operation_timeout()
211                .context(error::ExceededDeadlineSnafu {
212                    operation: "Downgrade region",
213                })?;
214        let downgrade_instruction = self.build_downgrade_region_instruction(ctx, operation_timeout);
215
216        let leader = &ctx.persistent_ctx.from_peer;
217        let tracing_ctx = TracingContext::from_current_span();
218        let msg = MailboxMessage::json_message(
219            &format!("Downgrade leader regions: {:?}", region_ids),
220            &format!("Metasrv@{}", ctx.server_addr()),
221            &format!("Datanode-{}@{}", leader.id, leader.addr),
222            common_time::util::current_time_millis(),
223            &downgrade_instruction,
224            Some(tracing_ctx.to_w3c()),
225        )
226        .with_context(|_| error::SerializeToJsonSnafu {
227            input: downgrade_instruction.to_string(),
228        })?;
229
230        let ch = Channel::Datanode(leader.id);
231        let now = Instant::now();
232        let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
233
234        match receiver.await {
235            Ok(msg) => {
236                let reply = HeartbeatMailbox::json_reply(&msg)?;
237                info!(
238                    "Received downgrade region reply: {:?}, region: {:?}, elapsed: {:?}",
239                    reply,
240                    region_ids,
241                    now.elapsed()
242                );
243                let InstructionReply::DowngradeRegions(DowngradeRegionsReply { replies }) = reply
244                else {
245                    return error::UnexpectedInstructionReplySnafu {
246                        mailbox_message: msg.to_string(),
247                        reason: "expect downgrade region reply",
248                    }
249                    .fail();
250                };
251
252                for reply in replies {
253                    self.handle_downgrade_region_reply(ctx, &reply, &now)?;
254                }
255                Ok(())
256            }
257            Err(error::Error::MailboxTimeout { .. }) => {
258                let reason = format!(
259                    "Mailbox received timeout for downgrade leader region {region_ids:?} on datanode {:?}, elapsed: {:?}",
260                    leader,
261                    now.elapsed()
262                );
263                error::RetryLaterSnafu { reason }.fail()
264            }
265            Err(err) => Err(err),
266        }
267    }
268
269    async fn update_leader_region_lease_deadline(&self, ctx: &mut Context) {
270        let leader = &ctx.persistent_ctx.from_peer;
271
272        let last_connection_at = match find_datanode_lease_value(&ctx.in_memory, leader.id).await {
273            Ok(lease_value) => lease_value.map(|lease_value| lease_value.timestamp_millis),
274            Err(err) => {
275                error!(err; "Failed to find datanode lease value for datanode: {}, during region migration, region: {:?}", leader, ctx.persistent_ctx.region_ids);
276                return;
277            }
278        };
279
280        if let Some(last_connection_at) = last_connection_at {
281            let now = current_time_millis();
282            let elapsed = now - last_connection_at;
283            let region_lease = default_distributed_time_constants().region_lease;
284
285            // It's safe to update the region leader lease deadline here because:
286            // 1. The old region leader has already been marked as downgraded in metadata,
287            //    which means any attempts to renew its lease will be rejected.
288            // 2. The pusher disconnect time record only gets removed when the datanode (from_peer)
289            //    establishes a new heartbeat connection stream.
290            if elapsed >= (region_lease.as_secs() * 1000) as i64 {
291                ctx.volatile_ctx.reset_leader_region_lease_deadline();
292                info!(
293                    "Datanode {}({}) has been disconnected for longer than the region lease period ({:?}), reset leader region lease deadline to None, region: {:?}",
294                    leader, last_connection_at, region_lease, ctx.persistent_ctx.region_ids
295                );
296            } else if elapsed > 0 {
297                // `now - last_connection_at` < REGION_LEASE_SECS * 1000
298                let lease_timeout =
299                    region_lease - Duration::from_millis((now - last_connection_at) as u64);
300                ctx.volatile_ctx.reset_leader_region_lease_deadline();
301                ctx.volatile_ctx
302                    .set_leader_region_lease_deadline(lease_timeout);
303                info!(
304                    "Datanode {}({}) last connected {:?} ago, updated leader region lease deadline to {:?}, region: {:?}",
305                    leader,
306                    last_connection_at,
307                    elapsed,
308                    ctx.volatile_ctx.leader_region_lease_deadline,
309                    ctx.persistent_ctx.region_ids
310                );
311            } else {
312                warn!(
313                    "Datanode {} has invalid last connection timestamp: {} (which is after current time: {}), region: {:?}",
314                    leader, last_connection_at, now, ctx.persistent_ctx.region_ids
315                )
316            }
317        } else {
318            warn!(
319                "Failed to find last connection time for datanode {}, unable to update region lease deadline, region: {:?}",
320                leader, ctx.persistent_ctx.region_ids
321            )
322        }
323    }
324
325    /// Downgrades a leader region.
326    ///
327    /// Fast path:
328    /// - Waits for the reply of downgrade instruction.
329    ///
330    /// Slow path:
331    /// - Waits for the lease of the leader region expired.
332    ///
333    /// Abort:
334    /// - ExceededDeadline
335    async fn downgrade_region_with_retry(&self, ctx: &mut Context) -> Result<()> {
336        let mut retry = 0;
337
338        loop {
339            let timer = Instant::now();
340            if let Err(err) = self.downgrade_region(ctx).await {
341                ctx.update_operations_elapsed(timer);
342                retry += 1;
343                // Throws the error immediately if the procedure exceeded the deadline.
344                if matches!(err, error::Error::ExceededDeadline { .. }) {
345                    error!(err; "Failed to downgrade region leader, regions: {:?}, exceeded deadline", ctx.persistent_ctx.region_ids);
346                    return Err(err);
347                } else if matches!(err, error::Error::PusherNotFound { .. }) {
348                    // Throws the error immediately if the datanode is unreachable.
349                    error!(err; "Failed to downgrade region leader, regions: {:?}, datanode({}) is unreachable(PusherNotFound)", ctx.persistent_ctx.region_ids, ctx.persistent_ctx.from_peer.id);
350                    self.update_leader_region_lease_deadline(ctx).await;
351                    return Err(err);
352                } else if err.is_retryable() && retry < self.optimistic_retry {
353                    error!(err; "Failed to downgrade region leader, regions: {:?}, retry later", ctx.persistent_ctx.region_ids);
354                    sleep(self.retry_initial_interval).await;
355                } else {
356                    return Err(BoxedError::new(err)).context(error::DowngradeLeaderSnafu {
357                        // TODO(weny): handle multiple regions.
358                        region_id: ctx.persistent_ctx.region_ids[0],
359                    })?;
360                }
361            } else {
362                ctx.update_operations_elapsed(timer);
363                // Resets the deadline.
364                ctx.volatile_ctx.reset_leader_region_lease_deadline();
365                break;
366            }
367        }
368
369        Ok(())
370    }
371}
372
373#[cfg(test)]
374mod tests {
375    use std::assert_matches::assert_matches;
376    use std::collections::HashMap;
377
378    use common_meta::key::table_route::TableRouteValue;
379    use common_meta::key::test_utils::new_test_table_info;
380    use common_meta::peer::Peer;
381    use common_meta::rpc::router::{Region, RegionRoute};
382    use store_api::storage::RegionId;
383    use tokio::time::Instant;
384
385    use super::*;
386    use crate::error::Error;
387    use crate::procedure::region_migration::manager::RegionMigrationTriggerReason;
388    use crate::procedure::region_migration::test_util::{TestingEnv, new_procedure_context};
389    use crate::procedure::region_migration::{ContextFactory, PersistentContext};
390    use crate::procedure::test_util::{
391        new_close_region_reply, new_downgrade_region_reply, send_mock_reply,
392    };
393
394    fn new_persistent_context() -> PersistentContext {
395        PersistentContext::new(
396            vec![("greptime".into(), "public".into())],
397            Peer::empty(1),
398            Peer::empty(2),
399            vec![RegionId::new(1024, 1)],
400            Duration::from_millis(1000),
401            RegionMigrationTriggerReason::Manual,
402        )
403    }
404
405    async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap<u32, String>) {
406        let region_id = ctx.persistent_ctx.region_ids[0];
407        let table_info = new_test_table_info(region_id.table_id());
408        let region_routes = vec![RegionRoute {
409            region: Region::new_test(region_id),
410            leader_peer: Some(ctx.persistent_ctx.from_peer.clone()),
411            follower_peers: vec![ctx.persistent_ctx.to_peer.clone()],
412            ..Default::default()
413        }];
414        ctx.table_metadata_manager
415            .create_table_metadata(
416                table_info,
417                TableRouteValue::physical(region_routes),
418                wal_options,
419            )
420            .await
421            .unwrap();
422    }
423
424    #[tokio::test]
425    async fn test_datanode_is_unreachable() {
426        let state = DowngradeLeaderRegion::default();
427        let persistent_context = new_persistent_context();
428        let env = TestingEnv::new();
429        let mut ctx = env.context_factory().new_context(persistent_context);
430        prepare_table_metadata(&ctx, HashMap::default()).await;
431        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
432
433        assert_matches!(err, Error::PusherNotFound { .. });
434        assert!(!err.is_retryable());
435    }
436
437    #[tokio::test]
438    async fn test_pusher_dropped() {
439        let state = DowngradeLeaderRegion::default();
440        let persistent_context = new_persistent_context();
441        let from_peer_id = persistent_context.from_peer.id;
442
443        let mut env = TestingEnv::new();
444        let mut ctx = env.context_factory().new_context(persistent_context);
445        prepare_table_metadata(&ctx, HashMap::default()).await;
446        let mailbox_ctx = env.mailbox_context();
447
448        let (tx, rx) = tokio::sync::mpsc::channel(1);
449
450        mailbox_ctx
451            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
452            .await;
453
454        drop(rx);
455
456        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
457
458        assert_matches!(err, Error::PushMessage { .. });
459        assert!(!err.is_retryable());
460    }
461
462    #[tokio::test]
463    async fn test_procedure_exceeded_deadline() {
464        let state = DowngradeLeaderRegion::default();
465        let persistent_context = new_persistent_context();
466        let env = TestingEnv::new();
467        let mut ctx = env.context_factory().new_context(persistent_context);
468        prepare_table_metadata(&ctx, HashMap::default()).await;
469        ctx.volatile_ctx.metrics.operations_elapsed =
470            ctx.persistent_ctx.timeout + Duration::from_secs(1);
471
472        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
473
474        assert_matches!(err, Error::ExceededDeadline { .. });
475        assert!(!err.is_retryable());
476
477        let err = state
478            .downgrade_region_with_retry(&mut ctx)
479            .await
480            .unwrap_err();
481        assert_matches!(err, Error::ExceededDeadline { .. });
482        assert!(!err.is_retryable());
483    }
484
485    #[tokio::test]
486    async fn test_unexpected_instruction_reply() {
487        let state = DowngradeLeaderRegion::default();
488        let persistent_context = new_persistent_context();
489        let from_peer_id = persistent_context.from_peer.id;
490
491        let mut env = TestingEnv::new();
492        let mut ctx = env.context_factory().new_context(persistent_context);
493        prepare_table_metadata(&ctx, HashMap::default()).await;
494        let mailbox_ctx = env.mailbox_context();
495        let mailbox = mailbox_ctx.mailbox().clone();
496
497        let (tx, rx) = tokio::sync::mpsc::channel(1);
498
499        mailbox_ctx
500            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
501            .await;
502
503        // Sends an incorrect reply.
504        send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
505
506        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
507
508        assert_matches!(err, Error::UnexpectedInstructionReply { .. });
509        assert!(!err.is_retryable());
510    }
511
512    #[tokio::test]
513    async fn test_instruction_exceeded_deadline() {
514        let state = DowngradeLeaderRegion::default();
515        let persistent_context = new_persistent_context();
516        let from_peer_id = persistent_context.from_peer.id;
517
518        let mut env = TestingEnv::new();
519        let mut ctx = env.context_factory().new_context(persistent_context);
520        prepare_table_metadata(&ctx, HashMap::default()).await;
521        let mailbox_ctx = env.mailbox_context();
522        let mailbox = mailbox_ctx.mailbox().clone();
523
524        let (tx, rx) = tokio::sync::mpsc::channel(1);
525
526        mailbox_ctx
527            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
528            .await;
529
530        send_mock_reply(mailbox, rx, |id| {
531            Err(error::MailboxTimeoutSnafu { id }.build())
532        });
533
534        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
535
536        assert_matches!(err, Error::RetryLater { .. });
537        assert!(err.is_retryable());
538    }
539
540    #[tokio::test]
541    async fn test_downgrade_region_failed() {
542        let state = DowngradeLeaderRegion::default();
543        let persistent_context = new_persistent_context();
544        let from_peer_id = persistent_context.from_peer.id;
545
546        let mut env = TestingEnv::new();
547        let mut ctx = env.context_factory().new_context(persistent_context);
548        prepare_table_metadata(&ctx, HashMap::default()).await;
549        let mailbox_ctx = env.mailbox_context();
550        let mailbox = mailbox_ctx.mailbox().clone();
551
552        let (tx, rx) = tokio::sync::mpsc::channel(1);
553
554        mailbox_ctx
555            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
556            .await;
557
558        send_mock_reply(mailbox, rx, |id| {
559            Ok(new_downgrade_region_reply(
560                id,
561                None,
562                false,
563                Some("test mocked".to_string()),
564            ))
565        });
566
567        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
568
569        assert_matches!(err, Error::RetryLater { .. });
570        assert!(err.is_retryable());
571        assert!(format!("{err:?}").contains("test mocked"), "err: {err:?}",);
572    }
573
574    #[tokio::test]
575    async fn test_downgrade_region_with_retry_fast_path() {
576        let state = DowngradeLeaderRegion::default();
577        let persistent_context = new_persistent_context();
578        let from_peer_id = persistent_context.from_peer.id;
579
580        let mut env = TestingEnv::new();
581        let mut ctx = env.context_factory().new_context(persistent_context);
582        prepare_table_metadata(&ctx, HashMap::default()).await;
583        let mailbox_ctx = env.mailbox_context();
584        let mailbox = mailbox_ctx.mailbox().clone();
585
586        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
587
588        mailbox_ctx
589            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
590            .await;
591
592        common_runtime::spawn_global(async move {
593            // retry: 0.
594            let resp = rx.recv().await.unwrap().unwrap();
595            let reply_id = resp.mailbox_message.unwrap().id;
596            mailbox
597                .on_recv(
598                    reply_id,
599                    Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
600                )
601                .await
602                .unwrap();
603
604            // retry: 1.
605            let resp = rx.recv().await.unwrap().unwrap();
606            let reply_id = resp.mailbox_message.unwrap().id;
607            mailbox
608                .on_recv(
609                    reply_id,
610                    Ok(new_downgrade_region_reply(reply_id, Some(1), true, None)),
611                )
612                .await
613                .unwrap();
614        });
615
616        state.downgrade_region_with_retry(&mut ctx).await.unwrap();
617        assert_eq!(
618            ctx.volatile_ctx
619                .leader_region_last_entry_ids
620                .get(&RegionId::new(0, 0))
621                .cloned(),
622            Some(1)
623        );
624        assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none());
625    }
626
627    #[tokio::test]
628    async fn test_downgrade_region_with_retry_slow_path() {
629        let state = DowngradeLeaderRegion {
630            optimistic_retry: 3,
631            retry_initial_interval: Duration::from_millis(100),
632        };
633        let persistent_context = new_persistent_context();
634        let from_peer_id = persistent_context.from_peer.id;
635
636        let mut env = TestingEnv::new();
637        let mut ctx = env.context_factory().new_context(persistent_context);
638        let mailbox_ctx = env.mailbox_context();
639        let mailbox = mailbox_ctx.mailbox().clone();
640
641        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
642
643        mailbox_ctx
644            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
645            .await;
646
647        common_runtime::spawn_global(async move {
648            for _ in 0..3 {
649                let resp = rx.recv().await.unwrap().unwrap();
650                let reply_id = resp.mailbox_message.unwrap().id;
651                mailbox
652                    .on_recv(
653                        reply_id,
654                        Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
655                    )
656                    .await
657                    .unwrap();
658            }
659        });
660
661        ctx.volatile_ctx
662            .set_leader_region_lease_deadline(Duration::from_secs(5));
663        let expected_deadline = ctx.volatile_ctx.leader_region_lease_deadline.unwrap();
664        let err = state
665            .downgrade_region_with_retry(&mut ctx)
666            .await
667            .unwrap_err();
668        assert_matches!(err, error::Error::DowngradeLeader { .. });
669        // assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, None);
670        // Should remain no change.
671        assert_eq!(
672            ctx.volatile_ctx.leader_region_lease_deadline.unwrap(),
673            expected_deadline
674        )
675    }
676
677    #[tokio::test]
678    async fn test_next_upgrade_candidate_state() {
679        let mut state = Box::<DowngradeLeaderRegion>::default();
680        let persistent_context = new_persistent_context();
681        let from_peer_id = persistent_context.from_peer.id;
682
683        let mut env = TestingEnv::new();
684        let mut ctx = env.context_factory().new_context(persistent_context);
685        prepare_table_metadata(&ctx, HashMap::default()).await;
686        let mailbox_ctx = env.mailbox_context();
687        let mailbox = mailbox_ctx.mailbox().clone();
688
689        let (tx, rx) = tokio::sync::mpsc::channel(1);
690
691        mailbox_ctx
692            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
693            .await;
694
695        send_mock_reply(mailbox, rx, |id| {
696            Ok(new_downgrade_region_reply(id, Some(1), true, None))
697        });
698
699        let timer = Instant::now();
700        let procedure_ctx = new_procedure_context();
701        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
702        let elapsed = timer.elapsed().as_secs();
703        let region_lease = default_distributed_time_constants().region_lease.as_secs();
704        assert!(elapsed < region_lease / 2);
705        assert_eq!(
706            ctx.volatile_ctx
707                .leader_region_last_entry_ids
708                .get(&RegionId::new(0, 0))
709                .cloned(),
710            Some(1)
711        );
712        assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none());
713
714        let _ = next
715            .as_any()
716            .downcast_ref::<UpgradeCandidateRegion>()
717            .unwrap();
718    }
719
720    #[tokio::test]
721    async fn test_downgrade_region_procedure_exceeded_deadline() {
722        let mut state = Box::<UpgradeCandidateRegion>::default();
723        state.retry_initial_interval = Duration::from_millis(100);
724        let persistent_context = new_persistent_context();
725        let to_peer_id = persistent_context.to_peer.id;
726
727        let mut env = TestingEnv::new();
728        let mut ctx = env.context_factory().new_context(persistent_context);
729        let mailbox_ctx = env.mailbox_context();
730        let mailbox = mailbox_ctx.mailbox().clone();
731        ctx.volatile_ctx.metrics.operations_elapsed =
732            ctx.persistent_ctx.timeout + Duration::from_secs(1);
733
734        let (tx, rx) = tokio::sync::mpsc::channel(1);
735        mailbox_ctx
736            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
737            .await;
738
739        send_mock_reply(mailbox, rx, |id| {
740            Ok(new_downgrade_region_reply(id, None, true, None))
741        });
742        let procedure_ctx = new_procedure_context();
743        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
744        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
745        assert_matches!(update_metadata, UpdateMetadata::Rollback);
746    }
747}