meta_srv/procedure/region_migration/
downgrade_leader_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::time::Duration;
17
18use api::v1::meta::MailboxMessage;
19use common_error::ext::BoxedError;
20use common_meta::distributed_time_constants::REGION_LEASE_SECS;
21use common_meta::instruction::{
22    DowngradeRegion, DowngradeRegionReply, DowngradeRegionsReply, Instruction, InstructionReply,
23};
24use common_procedure::{Context as ProcedureContext, Status};
25use common_telemetry::{debug, error, info, warn};
26use common_time::util::current_time_millis;
27use serde::{Deserialize, Serialize};
28use snafu::{OptionExt, ResultExt};
29use tokio::time::{Instant, sleep};
30
31use crate::discovery::utils::find_datanode_lease_value;
32use crate::error::{self, Result};
33use crate::handler::HeartbeatMailbox;
34use crate::procedure::region_migration::update_metadata::UpdateMetadata;
35use crate::procedure::region_migration::upgrade_candidate_region::UpgradeCandidateRegion;
36use crate::procedure::region_migration::{Context, State};
37use crate::service::mailbox::Channel;
38
39#[derive(Debug, Serialize, Deserialize)]
40pub struct DowngradeLeaderRegion {
41    // The optimistic retry times.
42    optimistic_retry: usize,
43    // The retry initial interval.
44    retry_initial_interval: Duration,
45}
46
47impl Default for DowngradeLeaderRegion {
48    fn default() -> Self {
49        Self {
50            optimistic_retry: 3,
51            retry_initial_interval: Duration::from_millis(500),
52        }
53    }
54}
55
56#[async_trait::async_trait]
57#[typetag::serde]
58impl State for DowngradeLeaderRegion {
59    async fn next(
60        &mut self,
61        ctx: &mut Context,
62        _procedure_ctx: &ProcedureContext,
63    ) -> Result<(Box<dyn State>, Status)> {
64        let now = Instant::now();
65        // Ensures the `leader_region_lease_deadline` must exist after recovering.
66        ctx.volatile_ctx
67            .set_leader_region_lease_deadline(Duration::from_secs(REGION_LEASE_SECS));
68
69        match self.downgrade_region_with_retry(ctx).await {
70            Ok(_) => {
71                // Do nothing
72                info!(
73                    "Downgraded region leader success, region: {:?}",
74                    ctx.persistent_ctx.region_ids
75                );
76            }
77            Err(error::Error::ExceededDeadline { .. }) => {
78                info!(
79                    "Downgrade region leader exceeded deadline, region: {:?}",
80                    ctx.persistent_ctx.region_ids
81                );
82                // Rollbacks the metadata if procedure is timeout
83                return Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false)));
84            }
85            Err(err) => {
86                error!(err; "Occurs non-retryable error, region: {:?}", ctx.persistent_ctx.region_ids);
87                if let Some(deadline) = ctx.volatile_ctx.leader_region_lease_deadline.as_ref() {
88                    info!(
89                        "Running into the downgrade region leader slow path, region: {:?}, sleep until {:?}",
90                        ctx.persistent_ctx.region_ids, deadline
91                    );
92                    tokio::time::sleep_until(*deadline).await;
93                } else {
94                    warn!(
95                        "Leader region lease deadline is not set, region: {:?}",
96                        ctx.persistent_ctx.region_ids
97                    );
98                }
99            }
100        }
101        ctx.update_downgrade_leader_region_elapsed(now);
102
103        Ok((
104            Box::new(UpgradeCandidateRegion::default()),
105            Status::executing(false),
106        ))
107    }
108
109    fn as_any(&self) -> &dyn Any {
110        self
111    }
112}
113
114impl DowngradeLeaderRegion {
115    /// Builds downgrade region instruction.
116    fn build_downgrade_region_instruction(
117        &self,
118        ctx: &Context,
119        flush_timeout: Duration,
120    ) -> Instruction {
121        let region_ids = &ctx.persistent_ctx.region_ids;
122        let mut downgrade_regions = Vec::with_capacity(region_ids.len());
123        for region_id in region_ids {
124            downgrade_regions.push(DowngradeRegion {
125                region_id: *region_id,
126                flush_timeout: Some(flush_timeout),
127            });
128        }
129
130        Instruction::DowngradeRegions(downgrade_regions)
131    }
132
133    fn handle_downgrade_region_reply(
134        &self,
135        ctx: &mut Context,
136        reply: &DowngradeRegionReply,
137        now: &Instant,
138    ) -> Result<()> {
139        let leader = &ctx.persistent_ctx.from_peer;
140        let DowngradeRegionReply {
141            region_id,
142            last_entry_id,
143            metadata_last_entry_id,
144            exists,
145            error,
146        } = reply;
147
148        if error.is_some() {
149            return error::RetryLaterSnafu {
150                reason: format!(
151                    "Failed to downgrade the region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
152                    region_id, leader, error, now.elapsed()
153                ),
154            }
155            .fail();
156        }
157
158        if !exists {
159            warn!(
160                "Trying to downgrade the region {} on datanode {:?}, but region doesn't exist!, elapsed: {:?}",
161                region_id,
162                leader,
163                now.elapsed()
164            );
165        } else {
166            info!(
167                "Region {} leader is downgraded on datanode {:?}, last_entry_id: {:?}, metadata_last_entry_id: {:?}, elapsed: {:?}",
168                region_id,
169                leader,
170                last_entry_id,
171                metadata_last_entry_id,
172                now.elapsed()
173            );
174        }
175
176        if let Some(last_entry_id) = last_entry_id {
177            debug!(
178                "set last_entry_id: {:?}, region_id: {:?}",
179                last_entry_id, region_id
180            );
181            ctx.volatile_ctx
182                .set_last_entry_id(*region_id, *last_entry_id);
183        }
184
185        if let Some(metadata_last_entry_id) = metadata_last_entry_id {
186            ctx.volatile_ctx
187                .set_metadata_last_entry_id(*region_id, *metadata_last_entry_id);
188        }
189
190        Ok(())
191    }
192
193    /// Tries to downgrade a leader region.
194    ///
195    /// Retry:
196    /// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout.
197    /// - Failed to downgrade region on the Datanode.
198    ///
199    /// Abort:
200    /// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable.
201    /// - [PushMessage](error::Error::PushMessage), The receiver is dropped.
202    /// - [MailboxReceiver](error::Error::MailboxReceiver), The sender is dropped without sending (impossible).
203    /// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply).
204    /// - [ExceededDeadline](error::Error::ExceededDeadline)
205    /// - Invalid JSON.
206    async fn downgrade_region(&self, ctx: &mut Context) -> Result<()> {
207        let region_ids = &ctx.persistent_ctx.region_ids;
208        let operation_timeout =
209            ctx.next_operation_timeout()
210                .context(error::ExceededDeadlineSnafu {
211                    operation: "Downgrade region",
212                })?;
213        let downgrade_instruction = self.build_downgrade_region_instruction(ctx, operation_timeout);
214
215        let leader = &ctx.persistent_ctx.from_peer;
216        let msg = MailboxMessage::json_message(
217            &format!("Downgrade leader regions: {:?}", region_ids),
218            &format!("Metasrv@{}", ctx.server_addr()),
219            &format!("Datanode-{}@{}", leader.id, leader.addr),
220            common_time::util::current_time_millis(),
221            &downgrade_instruction,
222        )
223        .with_context(|_| error::SerializeToJsonSnafu {
224            input: downgrade_instruction.to_string(),
225        })?;
226
227        let ch = Channel::Datanode(leader.id);
228        let now = Instant::now();
229        let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
230
231        match receiver.await {
232            Ok(msg) => {
233                let reply = HeartbeatMailbox::json_reply(&msg)?;
234                info!(
235                    "Received downgrade region reply: {:?}, region: {:?}, elapsed: {:?}",
236                    reply,
237                    region_ids,
238                    now.elapsed()
239                );
240                let InstructionReply::DowngradeRegions(DowngradeRegionsReply { replies }) = reply
241                else {
242                    return error::UnexpectedInstructionReplySnafu {
243                        mailbox_message: msg.to_string(),
244                        reason: "expect downgrade region reply",
245                    }
246                    .fail();
247                };
248
249                for reply in replies {
250                    self.handle_downgrade_region_reply(ctx, &reply, &now)?;
251                }
252                Ok(())
253            }
254            Err(error::Error::MailboxTimeout { .. }) => {
255                let reason = format!(
256                    "Mailbox received timeout for downgrade leader region {region_ids:?} on datanode {:?}, elapsed: {:?}",
257                    leader,
258                    now.elapsed()
259                );
260                error::RetryLaterSnafu { reason }.fail()
261            }
262            Err(err) => Err(err),
263        }
264    }
265
266    async fn update_leader_region_lease_deadline(&self, ctx: &mut Context) {
267        let leader = &ctx.persistent_ctx.from_peer;
268
269        let last_connection_at = match find_datanode_lease_value(&ctx.in_memory, leader.id).await {
270            Ok(lease_value) => lease_value.map(|lease_value| lease_value.timestamp_millis),
271            Err(err) => {
272                error!(err; "Failed to find datanode lease value for datanode: {}, during region migration, region: {:?}", leader, ctx.persistent_ctx.region_ids);
273                return;
274            }
275        };
276
277        if let Some(last_connection_at) = last_connection_at {
278            let now = current_time_millis();
279            let elapsed = now - last_connection_at;
280            let region_lease = Duration::from_secs(REGION_LEASE_SECS);
281
282            // It's safe to update the region leader lease deadline here because:
283            // 1. The old region leader has already been marked as downgraded in metadata,
284            //    which means any attempts to renew its lease will be rejected.
285            // 2. The pusher disconnect time record only gets removed when the datanode (from_peer)
286            //    establishes a new heartbeat connection stream.
287            if elapsed >= (REGION_LEASE_SECS * 1000) as i64 {
288                ctx.volatile_ctx.reset_leader_region_lease_deadline();
289                info!(
290                    "Datanode {}({}) has been disconnected for longer than the region lease period ({:?}), reset leader region lease deadline to None, region: {:?}",
291                    leader, last_connection_at, region_lease, ctx.persistent_ctx.region_ids
292                );
293            } else if elapsed > 0 {
294                // `now - last_connection_at` < REGION_LEASE_SECS * 1000
295                let lease_timeout =
296                    region_lease - Duration::from_millis((now - last_connection_at) as u64);
297                ctx.volatile_ctx.reset_leader_region_lease_deadline();
298                ctx.volatile_ctx
299                    .set_leader_region_lease_deadline(lease_timeout);
300                info!(
301                    "Datanode {}({}) last connected {:?} ago, updated leader region lease deadline to {:?}, region: {:?}",
302                    leader,
303                    last_connection_at,
304                    elapsed,
305                    ctx.volatile_ctx.leader_region_lease_deadline,
306                    ctx.persistent_ctx.region_ids
307                );
308            } else {
309                warn!(
310                    "Datanode {} has invalid last connection timestamp: {} (which is after current time: {}), region: {:?}",
311                    leader, last_connection_at, now, ctx.persistent_ctx.region_ids
312                )
313            }
314        } else {
315            warn!(
316                "Failed to find last connection time for datanode {}, unable to update region lease deadline, region: {:?}",
317                leader, ctx.persistent_ctx.region_ids
318            )
319        }
320    }
321
322    /// Downgrades a leader region.
323    ///
324    /// Fast path:
325    /// - Waits for the reply of downgrade instruction.
326    ///
327    /// Slow path:
328    /// - Waits for the lease of the leader region expired.
329    ///
330    /// Abort:
331    /// - ExceededDeadline
332    async fn downgrade_region_with_retry(&self, ctx: &mut Context) -> Result<()> {
333        let mut retry = 0;
334
335        loop {
336            let timer = Instant::now();
337            if let Err(err) = self.downgrade_region(ctx).await {
338                ctx.update_operations_elapsed(timer);
339                retry += 1;
340                // Throws the error immediately if the procedure exceeded the deadline.
341                if matches!(err, error::Error::ExceededDeadline { .. }) {
342                    error!(err; "Failed to downgrade region leader, regions: {:?}, exceeded deadline", ctx.persistent_ctx.region_ids);
343                    return Err(err);
344                } else if matches!(err, error::Error::PusherNotFound { .. }) {
345                    // Throws the error immediately if the datanode is unreachable.
346                    error!(err; "Failed to downgrade region leader, regions: {:?}, datanode({}) is unreachable(PusherNotFound)", ctx.persistent_ctx.region_ids, ctx.persistent_ctx.from_peer.id);
347                    self.update_leader_region_lease_deadline(ctx).await;
348                    return Err(err);
349                } else if err.is_retryable() && retry < self.optimistic_retry {
350                    error!(err; "Failed to downgrade region leader, regions: {:?}, retry later", ctx.persistent_ctx.region_ids);
351                    sleep(self.retry_initial_interval).await;
352                } else {
353                    return Err(BoxedError::new(err)).context(error::DowngradeLeaderSnafu {
354                        // TODO(weny): handle multiple regions.
355                        region_id: ctx.persistent_ctx.region_ids[0],
356                    })?;
357                }
358            } else {
359                ctx.update_operations_elapsed(timer);
360                // Resets the deadline.
361                ctx.volatile_ctx.reset_leader_region_lease_deadline();
362                break;
363            }
364        }
365
366        Ok(())
367    }
368}
369
370#[cfg(test)]
371mod tests {
372    use std::assert_matches::assert_matches;
373    use std::collections::HashMap;
374
375    use common_meta::key::table_route::TableRouteValue;
376    use common_meta::key::test_utils::new_test_table_info;
377    use common_meta::peer::Peer;
378    use common_meta::rpc::router::{Region, RegionRoute};
379    use store_api::storage::RegionId;
380    use tokio::time::Instant;
381
382    use super::*;
383    use crate::error::Error;
384    use crate::procedure::region_migration::manager::RegionMigrationTriggerReason;
385    use crate::procedure::region_migration::test_util::{TestingEnv, new_procedure_context};
386    use crate::procedure::region_migration::{ContextFactory, PersistentContext};
387    use crate::procedure::test_util::{
388        new_close_region_reply, new_downgrade_region_reply, send_mock_reply,
389    };
390
391    fn new_persistent_context() -> PersistentContext {
392        PersistentContext {
393            catalog: "greptime".into(),
394            schema: "public".into(),
395            from_peer: Peer::empty(1),
396            to_peer: Peer::empty(2),
397            region_ids: vec![RegionId::new(1024, 1)],
398            timeout: Duration::from_millis(1000),
399            trigger_reason: RegionMigrationTriggerReason::Manual,
400        }
401    }
402
403    async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap<u32, String>) {
404        let region_id = ctx.persistent_ctx.region_ids[0];
405        let table_info = new_test_table_info(region_id.table_id(), vec![1]).into();
406        let region_routes = vec![RegionRoute {
407            region: Region::new_test(region_id),
408            leader_peer: Some(ctx.persistent_ctx.from_peer.clone()),
409            follower_peers: vec![ctx.persistent_ctx.to_peer.clone()],
410            ..Default::default()
411        }];
412        ctx.table_metadata_manager
413            .create_table_metadata(
414                table_info,
415                TableRouteValue::physical(region_routes),
416                wal_options,
417            )
418            .await
419            .unwrap();
420    }
421
422    #[tokio::test]
423    async fn test_datanode_is_unreachable() {
424        let state = DowngradeLeaderRegion::default();
425        let persistent_context = new_persistent_context();
426        let env = TestingEnv::new();
427        let mut ctx = env.context_factory().new_context(persistent_context);
428        prepare_table_metadata(&ctx, HashMap::default()).await;
429        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
430
431        assert_matches!(err, Error::PusherNotFound { .. });
432        assert!(!err.is_retryable());
433    }
434
435    #[tokio::test]
436    async fn test_pusher_dropped() {
437        let state = DowngradeLeaderRegion::default();
438        let persistent_context = new_persistent_context();
439        let from_peer_id = persistent_context.from_peer.id;
440
441        let mut env = TestingEnv::new();
442        let mut ctx = env.context_factory().new_context(persistent_context);
443        prepare_table_metadata(&ctx, HashMap::default()).await;
444        let mailbox_ctx = env.mailbox_context();
445
446        let (tx, rx) = tokio::sync::mpsc::channel(1);
447
448        mailbox_ctx
449            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
450            .await;
451
452        drop(rx);
453
454        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
455
456        assert_matches!(err, Error::PushMessage { .. });
457        assert!(!err.is_retryable());
458    }
459
460    #[tokio::test]
461    async fn test_procedure_exceeded_deadline() {
462        let state = DowngradeLeaderRegion::default();
463        let persistent_context = new_persistent_context();
464        let env = TestingEnv::new();
465        let mut ctx = env.context_factory().new_context(persistent_context);
466        prepare_table_metadata(&ctx, HashMap::default()).await;
467        ctx.volatile_ctx.metrics.operations_elapsed =
468            ctx.persistent_ctx.timeout + Duration::from_secs(1);
469
470        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
471
472        assert_matches!(err, Error::ExceededDeadline { .. });
473        assert!(!err.is_retryable());
474
475        let err = state
476            .downgrade_region_with_retry(&mut ctx)
477            .await
478            .unwrap_err();
479        assert_matches!(err, Error::ExceededDeadline { .. });
480        assert!(!err.is_retryable());
481    }
482
483    #[tokio::test]
484    async fn test_unexpected_instruction_reply() {
485        let state = DowngradeLeaderRegion::default();
486        let persistent_context = new_persistent_context();
487        let from_peer_id = persistent_context.from_peer.id;
488
489        let mut env = TestingEnv::new();
490        let mut ctx = env.context_factory().new_context(persistent_context);
491        prepare_table_metadata(&ctx, HashMap::default()).await;
492        let mailbox_ctx = env.mailbox_context();
493        let mailbox = mailbox_ctx.mailbox().clone();
494
495        let (tx, rx) = tokio::sync::mpsc::channel(1);
496
497        mailbox_ctx
498            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
499            .await;
500
501        // Sends an incorrect reply.
502        send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
503
504        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
505
506        assert_matches!(err, Error::UnexpectedInstructionReply { .. });
507        assert!(!err.is_retryable());
508    }
509
510    #[tokio::test]
511    async fn test_instruction_exceeded_deadline() {
512        let state = DowngradeLeaderRegion::default();
513        let persistent_context = new_persistent_context();
514        let from_peer_id = persistent_context.from_peer.id;
515
516        let mut env = TestingEnv::new();
517        let mut ctx = env.context_factory().new_context(persistent_context);
518        prepare_table_metadata(&ctx, HashMap::default()).await;
519        let mailbox_ctx = env.mailbox_context();
520        let mailbox = mailbox_ctx.mailbox().clone();
521
522        let (tx, rx) = tokio::sync::mpsc::channel(1);
523
524        mailbox_ctx
525            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
526            .await;
527
528        send_mock_reply(mailbox, rx, |id| {
529            Err(error::MailboxTimeoutSnafu { id }.build())
530        });
531
532        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
533
534        assert_matches!(err, Error::RetryLater { .. });
535        assert!(err.is_retryable());
536    }
537
538    #[tokio::test]
539    async fn test_downgrade_region_failed() {
540        let state = DowngradeLeaderRegion::default();
541        let persistent_context = new_persistent_context();
542        let from_peer_id = persistent_context.from_peer.id;
543
544        let mut env = TestingEnv::new();
545        let mut ctx = env.context_factory().new_context(persistent_context);
546        prepare_table_metadata(&ctx, HashMap::default()).await;
547        let mailbox_ctx = env.mailbox_context();
548        let mailbox = mailbox_ctx.mailbox().clone();
549
550        let (tx, rx) = tokio::sync::mpsc::channel(1);
551
552        mailbox_ctx
553            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
554            .await;
555
556        send_mock_reply(mailbox, rx, |id| {
557            Ok(new_downgrade_region_reply(
558                id,
559                None,
560                false,
561                Some("test mocked".to_string()),
562            ))
563        });
564
565        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
566
567        assert_matches!(err, Error::RetryLater { .. });
568        assert!(err.is_retryable());
569        assert!(format!("{err:?}").contains("test mocked"), "err: {err:?}",);
570    }
571
572    #[tokio::test]
573    async fn test_downgrade_region_with_retry_fast_path() {
574        let state = DowngradeLeaderRegion::default();
575        let persistent_context = new_persistent_context();
576        let from_peer_id = persistent_context.from_peer.id;
577
578        let mut env = TestingEnv::new();
579        let mut ctx = env.context_factory().new_context(persistent_context);
580        prepare_table_metadata(&ctx, HashMap::default()).await;
581        let mailbox_ctx = env.mailbox_context();
582        let mailbox = mailbox_ctx.mailbox().clone();
583
584        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
585
586        mailbox_ctx
587            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
588            .await;
589
590        common_runtime::spawn_global(async move {
591            // retry: 0.
592            let resp = rx.recv().await.unwrap().unwrap();
593            let reply_id = resp.mailbox_message.unwrap().id;
594            mailbox
595                .on_recv(
596                    reply_id,
597                    Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
598                )
599                .await
600                .unwrap();
601
602            // retry: 1.
603            let resp = rx.recv().await.unwrap().unwrap();
604            let reply_id = resp.mailbox_message.unwrap().id;
605            mailbox
606                .on_recv(
607                    reply_id,
608                    Ok(new_downgrade_region_reply(reply_id, Some(1), true, None)),
609                )
610                .await
611                .unwrap();
612        });
613
614        state.downgrade_region_with_retry(&mut ctx).await.unwrap();
615        assert_eq!(
616            ctx.volatile_ctx
617                .leader_region_last_entry_ids
618                .get(&RegionId::new(0, 0))
619                .cloned(),
620            Some(1)
621        );
622        assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none());
623    }
624
625    #[tokio::test]
626    async fn test_downgrade_region_with_retry_slow_path() {
627        let state = DowngradeLeaderRegion {
628            optimistic_retry: 3,
629            retry_initial_interval: Duration::from_millis(100),
630        };
631        let persistent_context = new_persistent_context();
632        let from_peer_id = persistent_context.from_peer.id;
633
634        let mut env = TestingEnv::new();
635        let mut ctx = env.context_factory().new_context(persistent_context);
636        let mailbox_ctx = env.mailbox_context();
637        let mailbox = mailbox_ctx.mailbox().clone();
638
639        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
640
641        mailbox_ctx
642            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
643            .await;
644
645        common_runtime::spawn_global(async move {
646            for _ in 0..3 {
647                let resp = rx.recv().await.unwrap().unwrap();
648                let reply_id = resp.mailbox_message.unwrap().id;
649                mailbox
650                    .on_recv(
651                        reply_id,
652                        Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
653                    )
654                    .await
655                    .unwrap();
656            }
657        });
658
659        ctx.volatile_ctx
660            .set_leader_region_lease_deadline(Duration::from_secs(5));
661        let expected_deadline = ctx.volatile_ctx.leader_region_lease_deadline.unwrap();
662        let err = state
663            .downgrade_region_with_retry(&mut ctx)
664            .await
665            .unwrap_err();
666        assert_matches!(err, error::Error::DowngradeLeader { .. });
667        // assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, None);
668        // Should remain no change.
669        assert_eq!(
670            ctx.volatile_ctx.leader_region_lease_deadline.unwrap(),
671            expected_deadline
672        )
673    }
674
675    #[tokio::test]
676    async fn test_next_upgrade_candidate_state() {
677        let mut state = Box::<DowngradeLeaderRegion>::default();
678        let persistent_context = new_persistent_context();
679        let from_peer_id = persistent_context.from_peer.id;
680
681        let mut env = TestingEnv::new();
682        let mut ctx = env.context_factory().new_context(persistent_context);
683        prepare_table_metadata(&ctx, HashMap::default()).await;
684        let mailbox_ctx = env.mailbox_context();
685        let mailbox = mailbox_ctx.mailbox().clone();
686
687        let (tx, rx) = tokio::sync::mpsc::channel(1);
688
689        mailbox_ctx
690            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
691            .await;
692
693        send_mock_reply(mailbox, rx, |id| {
694            Ok(new_downgrade_region_reply(id, Some(1), true, None))
695        });
696
697        let timer = Instant::now();
698        let procedure_ctx = new_procedure_context();
699        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
700        let elapsed = timer.elapsed().as_secs();
701        assert!(elapsed < REGION_LEASE_SECS / 2);
702        assert_eq!(
703            ctx.volatile_ctx
704                .leader_region_last_entry_ids
705                .get(&RegionId::new(0, 0))
706                .cloned(),
707            Some(1)
708        );
709        assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none());
710
711        let _ = next
712            .as_any()
713            .downcast_ref::<UpgradeCandidateRegion>()
714            .unwrap();
715    }
716
717    #[tokio::test]
718    async fn test_downgrade_region_procedure_exceeded_deadline() {
719        let mut state = Box::<UpgradeCandidateRegion>::default();
720        state.retry_initial_interval = Duration::from_millis(100);
721        let persistent_context = new_persistent_context();
722        let to_peer_id = persistent_context.to_peer.id;
723
724        let mut env = TestingEnv::new();
725        let mut ctx = env.context_factory().new_context(persistent_context);
726        let mailbox_ctx = env.mailbox_context();
727        let mailbox = mailbox_ctx.mailbox().clone();
728        ctx.volatile_ctx.metrics.operations_elapsed =
729            ctx.persistent_ctx.timeout + Duration::from_secs(1);
730
731        let (tx, rx) = tokio::sync::mpsc::channel(1);
732        mailbox_ctx
733            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
734            .await;
735
736        send_mock_reply(mailbox, rx, |id| {
737            Ok(new_downgrade_region_reply(id, None, true, None))
738        });
739        let procedure_ctx = new_procedure_context();
740        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
741        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
742        assert_matches!(update_metadata, UpdateMetadata::Rollback);
743    }
744}