meta_srv/procedure/region_migration/
downgrade_leader_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::time::Duration;
17
18use api::v1::meta::MailboxMessage;
19use common_error::ext::BoxedError;
20use common_meta::distributed_time_constants::REGION_LEASE_SECS;
21use common_meta::instruction::{
22    DowngradeRegion, DowngradeRegionReply, DowngradeRegionsReply, Instruction, InstructionReply,
23};
24use common_procedure::{Context as ProcedureContext, Status};
25use common_telemetry::{debug, error, info, warn};
26use common_time::util::current_time_millis;
27use serde::{Deserialize, Serialize};
28use snafu::{OptionExt, ResultExt};
29use tokio::time::{Instant, sleep};
30
31use crate::discovery::utils::find_datanode_lease_value;
32use crate::error::{self, Result};
33use crate::handler::HeartbeatMailbox;
34use crate::procedure::region_migration::update_metadata::UpdateMetadata;
35use crate::procedure::region_migration::upgrade_candidate_region::UpgradeCandidateRegion;
36use crate::procedure::region_migration::{Context, State};
37use crate::service::mailbox::Channel;
38
39#[derive(Debug, Serialize, Deserialize)]
40pub struct DowngradeLeaderRegion {
41    // The optimistic retry times.
42    optimistic_retry: usize,
43    // The retry initial interval.
44    retry_initial_interval: Duration,
45}
46
47impl Default for DowngradeLeaderRegion {
48    fn default() -> Self {
49        Self {
50            optimistic_retry: 3,
51            retry_initial_interval: Duration::from_millis(500),
52        }
53    }
54}
55
56#[async_trait::async_trait]
57#[typetag::serde]
58impl State for DowngradeLeaderRegion {
59    async fn next(
60        &mut self,
61        ctx: &mut Context,
62        _procedure_ctx: &ProcedureContext,
63    ) -> Result<(Box<dyn State>, Status)> {
64        let now = Instant::now();
65        // Ensures the `leader_region_lease_deadline` must exist after recovering.
66        ctx.volatile_ctx
67            .set_leader_region_lease_deadline(Duration::from_secs(REGION_LEASE_SECS));
68
69        match self.downgrade_region_with_retry(ctx).await {
70            Ok(_) => {
71                // Do nothing
72                info!(
73                    "Downgraded region leader success, region: {:?}",
74                    ctx.persistent_ctx.region_ids
75                );
76            }
77            Err(error::Error::ExceededDeadline { .. }) => {
78                info!(
79                    "Downgrade region leader exceeded deadline, region: {:?}",
80                    ctx.persistent_ctx.region_ids
81                );
82                // Rollbacks the metadata if procedure is timeout
83                return Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false)));
84            }
85            Err(err) => {
86                error!(err; "Occurs non-retryable error, region: {:?}", ctx.persistent_ctx.region_ids);
87                if let Some(deadline) = ctx.volatile_ctx.leader_region_lease_deadline.as_ref() {
88                    info!(
89                        "Running into the downgrade region leader slow path, region: {:?}, sleep until {:?}",
90                        ctx.persistent_ctx.region_ids, deadline
91                    );
92                    tokio::time::sleep_until(*deadline).await;
93                } else {
94                    warn!(
95                        "Leader region lease deadline is not set, region: {:?}",
96                        ctx.persistent_ctx.region_ids
97                    );
98                }
99            }
100        }
101        ctx.update_downgrade_leader_region_elapsed(now);
102
103        Ok((
104            Box::new(UpgradeCandidateRegion::default()),
105            Status::executing(false),
106        ))
107    }
108
109    fn as_any(&self) -> &dyn Any {
110        self
111    }
112}
113
114impl DowngradeLeaderRegion {
115    /// Builds downgrade region instruction.
116    fn build_downgrade_region_instruction(
117        &self,
118        ctx: &Context,
119        flush_timeout: Duration,
120    ) -> Instruction {
121        let region_ids = &ctx.persistent_ctx.region_ids;
122        let mut downgrade_regions = Vec::with_capacity(region_ids.len());
123        for region_id in region_ids {
124            downgrade_regions.push(DowngradeRegion {
125                region_id: *region_id,
126                flush_timeout: Some(flush_timeout),
127            });
128        }
129
130        Instruction::DowngradeRegions(downgrade_regions)
131    }
132
133    fn handle_downgrade_region_reply(
134        &self,
135        ctx: &mut Context,
136        reply: &DowngradeRegionReply,
137        now: &Instant,
138    ) -> Result<()> {
139        let leader = &ctx.persistent_ctx.from_peer;
140        let DowngradeRegionReply {
141            region_id,
142            last_entry_id,
143            metadata_last_entry_id,
144            exists,
145            error,
146        } = reply;
147
148        if error.is_some() {
149            return error::RetryLaterSnafu {
150                reason: format!(
151                    "Failed to downgrade the region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
152                    region_id, leader, error, now.elapsed()
153                ),
154            }
155            .fail();
156        }
157
158        if !exists {
159            warn!(
160                "Trying to downgrade the region {} on datanode {:?}, but region doesn't exist!, elapsed: {:?}",
161                region_id,
162                leader,
163                now.elapsed()
164            );
165        } else {
166            info!(
167                "Region {} leader is downgraded on datanode {:?}, last_entry_id: {:?}, metadata_last_entry_id: {:?}, elapsed: {:?}",
168                region_id,
169                leader,
170                last_entry_id,
171                metadata_last_entry_id,
172                now.elapsed()
173            );
174        }
175
176        if let Some(last_entry_id) = last_entry_id {
177            debug!(
178                "set last_entry_id: {:?}, region_id: {:?}",
179                last_entry_id, region_id
180            );
181            ctx.volatile_ctx
182                .set_last_entry_id(*region_id, *last_entry_id);
183        }
184
185        if let Some(metadata_last_entry_id) = metadata_last_entry_id {
186            ctx.volatile_ctx
187                .set_metadata_last_entry_id(*region_id, *metadata_last_entry_id);
188        }
189
190        Ok(())
191    }
192
193    /// Tries to downgrade a leader region.
194    ///
195    /// Retry:
196    /// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout.
197    /// - Failed to downgrade region on the Datanode.
198    ///
199    /// Abort:
200    /// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable.
201    /// - [PushMessage](error::Error::PushMessage), The receiver is dropped.
202    /// - [MailboxReceiver](error::Error::MailboxReceiver), The sender is dropped without sending (impossible).
203    /// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply).
204    /// - [ExceededDeadline](error::Error::ExceededDeadline)
205    /// - Invalid JSON.
206    async fn downgrade_region(&self, ctx: &mut Context) -> Result<()> {
207        let region_ids = &ctx.persistent_ctx.region_ids;
208        let operation_timeout =
209            ctx.next_operation_timeout()
210                .context(error::ExceededDeadlineSnafu {
211                    operation: "Downgrade region",
212                })?;
213        let downgrade_instruction = self.build_downgrade_region_instruction(ctx, operation_timeout);
214
215        let leader = &ctx.persistent_ctx.from_peer;
216        let msg = MailboxMessage::json_message(
217            &format!("Downgrade leader regions: {:?}", region_ids),
218            &format!("Metasrv@{}", ctx.server_addr()),
219            &format!("Datanode-{}@{}", leader.id, leader.addr),
220            common_time::util::current_time_millis(),
221            &downgrade_instruction,
222        )
223        .with_context(|_| error::SerializeToJsonSnafu {
224            input: downgrade_instruction.to_string(),
225        })?;
226
227        let ch = Channel::Datanode(leader.id);
228        let now = Instant::now();
229        let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
230
231        match receiver.await {
232            Ok(msg) => {
233                let reply = HeartbeatMailbox::json_reply(&msg)?;
234                info!(
235                    "Received downgrade region reply: {:?}, region: {:?}, elapsed: {:?}",
236                    reply,
237                    region_ids,
238                    now.elapsed()
239                );
240                let InstructionReply::DowngradeRegions(DowngradeRegionsReply { replies }) = reply
241                else {
242                    return error::UnexpectedInstructionReplySnafu {
243                        mailbox_message: msg.to_string(),
244                        reason: "expect downgrade region reply",
245                    }
246                    .fail();
247                };
248
249                for reply in replies {
250                    self.handle_downgrade_region_reply(ctx, &reply, &now)?;
251                }
252                Ok(())
253            }
254            Err(error::Error::MailboxTimeout { .. }) => {
255                let reason = format!(
256                    "Mailbox received timeout for downgrade leader region {region_ids:?} on datanode {:?}, elapsed: {:?}",
257                    leader,
258                    now.elapsed()
259                );
260                error::RetryLaterSnafu { reason }.fail()
261            }
262            Err(err) => Err(err),
263        }
264    }
265
266    async fn update_leader_region_lease_deadline(&self, ctx: &mut Context) {
267        let leader = &ctx.persistent_ctx.from_peer;
268
269        let last_connection_at = match find_datanode_lease_value(&ctx.in_memory, leader.id).await {
270            Ok(lease_value) => lease_value.map(|lease_value| lease_value.timestamp_millis),
271            Err(err) => {
272                error!(err; "Failed to find datanode lease value for datanode: {}, during region migration, region: {:?}", leader, ctx.persistent_ctx.region_ids);
273                return;
274            }
275        };
276
277        if let Some(last_connection_at) = last_connection_at {
278            let now = current_time_millis();
279            let elapsed = now - last_connection_at;
280            let region_lease = Duration::from_secs(REGION_LEASE_SECS);
281
282            // It's safe to update the region leader lease deadline here because:
283            // 1. The old region leader has already been marked as downgraded in metadata,
284            //    which means any attempts to renew its lease will be rejected.
285            // 2. The pusher disconnect time record only gets removed when the datanode (from_peer)
286            //    establishes a new heartbeat connection stream.
287            if elapsed >= (REGION_LEASE_SECS * 1000) as i64 {
288                ctx.volatile_ctx.reset_leader_region_lease_deadline();
289                info!(
290                    "Datanode {}({}) has been disconnected for longer than the region lease period ({:?}), reset leader region lease deadline to None, region: {:?}",
291                    leader, last_connection_at, region_lease, ctx.persistent_ctx.region_ids
292                );
293            } else if elapsed > 0 {
294                // `now - last_connection_at` < REGION_LEASE_SECS * 1000
295                let lease_timeout =
296                    region_lease - Duration::from_millis((now - last_connection_at) as u64);
297                ctx.volatile_ctx.reset_leader_region_lease_deadline();
298                ctx.volatile_ctx
299                    .set_leader_region_lease_deadline(lease_timeout);
300                info!(
301                    "Datanode {}({}) last connected {:?} ago, updated leader region lease deadline to {:?}, region: {:?}",
302                    leader,
303                    last_connection_at,
304                    elapsed,
305                    ctx.volatile_ctx.leader_region_lease_deadline,
306                    ctx.persistent_ctx.region_ids
307                );
308            } else {
309                warn!(
310                    "Datanode {} has invalid last connection timestamp: {} (which is after current time: {}), region: {:?}",
311                    leader, last_connection_at, now, ctx.persistent_ctx.region_ids
312                )
313            }
314        } else {
315            warn!(
316                "Failed to find last connection time for datanode {}, unable to update region lease deadline, region: {:?}",
317                leader, ctx.persistent_ctx.region_ids
318            )
319        }
320    }
321
322    /// Downgrades a leader region.
323    ///
324    /// Fast path:
325    /// - Waits for the reply of downgrade instruction.
326    ///
327    /// Slow path:
328    /// - Waits for the lease of the leader region expired.
329    ///
330    /// Abort:
331    /// - ExceededDeadline
332    async fn downgrade_region_with_retry(&self, ctx: &mut Context) -> Result<()> {
333        let mut retry = 0;
334
335        loop {
336            let timer = Instant::now();
337            if let Err(err) = self.downgrade_region(ctx).await {
338                ctx.update_operations_elapsed(timer);
339                retry += 1;
340                // Throws the error immediately if the procedure exceeded the deadline.
341                if matches!(err, error::Error::ExceededDeadline { .. }) {
342                    error!(err; "Failed to downgrade region leader, regions: {:?}, exceeded deadline", ctx.persistent_ctx.region_ids);
343                    return Err(err);
344                } else if matches!(err, error::Error::PusherNotFound { .. }) {
345                    // Throws the error immediately if the datanode is unreachable.
346                    error!(err; "Failed to downgrade region leader, regions: {:?}, datanode({}) is unreachable(PusherNotFound)", ctx.persistent_ctx.region_ids, ctx.persistent_ctx.from_peer.id);
347                    self.update_leader_region_lease_deadline(ctx).await;
348                    return Err(err);
349                } else if err.is_retryable() && retry < self.optimistic_retry {
350                    error!(err; "Failed to downgrade region leader, regions: {:?}, retry later", ctx.persistent_ctx.region_ids);
351                    sleep(self.retry_initial_interval).await;
352                } else {
353                    return Err(BoxedError::new(err)).context(error::DowngradeLeaderSnafu {
354                        // TODO(weny): handle multiple regions.
355                        region_id: ctx.persistent_ctx.region_ids[0],
356                    })?;
357                }
358            } else {
359                ctx.update_operations_elapsed(timer);
360                // Resets the deadline.
361                ctx.volatile_ctx.reset_leader_region_lease_deadline();
362                break;
363            }
364        }
365
366        Ok(())
367    }
368}
369
370#[cfg(test)]
371mod tests {
372    use std::assert_matches::assert_matches;
373    use std::collections::HashMap;
374
375    use common_meta::key::table_route::TableRouteValue;
376    use common_meta::key::test_utils::new_test_table_info;
377    use common_meta::peer::Peer;
378    use common_meta::rpc::router::{Region, RegionRoute};
379    use store_api::storage::RegionId;
380    use tokio::time::Instant;
381
382    use super::*;
383    use crate::error::Error;
384    use crate::procedure::region_migration::manager::RegionMigrationTriggerReason;
385    use crate::procedure::region_migration::test_util::{TestingEnv, new_procedure_context};
386    use crate::procedure::region_migration::{ContextFactory, PersistentContext};
387    use crate::procedure::test_util::{
388        new_close_region_reply, new_downgrade_region_reply, send_mock_reply,
389    };
390
391    fn new_persistent_context() -> PersistentContext {
392        PersistentContext::new(
393            vec![("greptime".into(), "public".into())],
394            Peer::empty(1),
395            Peer::empty(2),
396            vec![RegionId::new(1024, 1)],
397            Duration::from_millis(1000),
398            RegionMigrationTriggerReason::Manual,
399        )
400    }
401
402    async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap<u32, String>) {
403        let region_id = ctx.persistent_ctx.region_ids[0];
404        let table_info = new_test_table_info(region_id.table_id(), vec![1]).into();
405        let region_routes = vec![RegionRoute {
406            region: Region::new_test(region_id),
407            leader_peer: Some(ctx.persistent_ctx.from_peer.clone()),
408            follower_peers: vec![ctx.persistent_ctx.to_peer.clone()],
409            ..Default::default()
410        }];
411        ctx.table_metadata_manager
412            .create_table_metadata(
413                table_info,
414                TableRouteValue::physical(region_routes),
415                wal_options,
416            )
417            .await
418            .unwrap();
419    }
420
421    #[tokio::test]
422    async fn test_datanode_is_unreachable() {
423        let state = DowngradeLeaderRegion::default();
424        let persistent_context = new_persistent_context();
425        let env = TestingEnv::new();
426        let mut ctx = env.context_factory().new_context(persistent_context);
427        prepare_table_metadata(&ctx, HashMap::default()).await;
428        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
429
430        assert_matches!(err, Error::PusherNotFound { .. });
431        assert!(!err.is_retryable());
432    }
433
434    #[tokio::test]
435    async fn test_pusher_dropped() {
436        let state = DowngradeLeaderRegion::default();
437        let persistent_context = new_persistent_context();
438        let from_peer_id = persistent_context.from_peer.id;
439
440        let mut env = TestingEnv::new();
441        let mut ctx = env.context_factory().new_context(persistent_context);
442        prepare_table_metadata(&ctx, HashMap::default()).await;
443        let mailbox_ctx = env.mailbox_context();
444
445        let (tx, rx) = tokio::sync::mpsc::channel(1);
446
447        mailbox_ctx
448            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
449            .await;
450
451        drop(rx);
452
453        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
454
455        assert_matches!(err, Error::PushMessage { .. });
456        assert!(!err.is_retryable());
457    }
458
459    #[tokio::test]
460    async fn test_procedure_exceeded_deadline() {
461        let state = DowngradeLeaderRegion::default();
462        let persistent_context = new_persistent_context();
463        let env = TestingEnv::new();
464        let mut ctx = env.context_factory().new_context(persistent_context);
465        prepare_table_metadata(&ctx, HashMap::default()).await;
466        ctx.volatile_ctx.metrics.operations_elapsed =
467            ctx.persistent_ctx.timeout + Duration::from_secs(1);
468
469        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
470
471        assert_matches!(err, Error::ExceededDeadline { .. });
472        assert!(!err.is_retryable());
473
474        let err = state
475            .downgrade_region_with_retry(&mut ctx)
476            .await
477            .unwrap_err();
478        assert_matches!(err, Error::ExceededDeadline { .. });
479        assert!(!err.is_retryable());
480    }
481
482    #[tokio::test]
483    async fn test_unexpected_instruction_reply() {
484        let state = DowngradeLeaderRegion::default();
485        let persistent_context = new_persistent_context();
486        let from_peer_id = persistent_context.from_peer.id;
487
488        let mut env = TestingEnv::new();
489        let mut ctx = env.context_factory().new_context(persistent_context);
490        prepare_table_metadata(&ctx, HashMap::default()).await;
491        let mailbox_ctx = env.mailbox_context();
492        let mailbox = mailbox_ctx.mailbox().clone();
493
494        let (tx, rx) = tokio::sync::mpsc::channel(1);
495
496        mailbox_ctx
497            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
498            .await;
499
500        // Sends an incorrect reply.
501        send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
502
503        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
504
505        assert_matches!(err, Error::UnexpectedInstructionReply { .. });
506        assert!(!err.is_retryable());
507    }
508
509    #[tokio::test]
510    async fn test_instruction_exceeded_deadline() {
511        let state = DowngradeLeaderRegion::default();
512        let persistent_context = new_persistent_context();
513        let from_peer_id = persistent_context.from_peer.id;
514
515        let mut env = TestingEnv::new();
516        let mut ctx = env.context_factory().new_context(persistent_context);
517        prepare_table_metadata(&ctx, HashMap::default()).await;
518        let mailbox_ctx = env.mailbox_context();
519        let mailbox = mailbox_ctx.mailbox().clone();
520
521        let (tx, rx) = tokio::sync::mpsc::channel(1);
522
523        mailbox_ctx
524            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
525            .await;
526
527        send_mock_reply(mailbox, rx, |id| {
528            Err(error::MailboxTimeoutSnafu { id }.build())
529        });
530
531        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
532
533        assert_matches!(err, Error::RetryLater { .. });
534        assert!(err.is_retryable());
535    }
536
537    #[tokio::test]
538    async fn test_downgrade_region_failed() {
539        let state = DowngradeLeaderRegion::default();
540        let persistent_context = new_persistent_context();
541        let from_peer_id = persistent_context.from_peer.id;
542
543        let mut env = TestingEnv::new();
544        let mut ctx = env.context_factory().new_context(persistent_context);
545        prepare_table_metadata(&ctx, HashMap::default()).await;
546        let mailbox_ctx = env.mailbox_context();
547        let mailbox = mailbox_ctx.mailbox().clone();
548
549        let (tx, rx) = tokio::sync::mpsc::channel(1);
550
551        mailbox_ctx
552            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
553            .await;
554
555        send_mock_reply(mailbox, rx, |id| {
556            Ok(new_downgrade_region_reply(
557                id,
558                None,
559                false,
560                Some("test mocked".to_string()),
561            ))
562        });
563
564        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
565
566        assert_matches!(err, Error::RetryLater { .. });
567        assert!(err.is_retryable());
568        assert!(format!("{err:?}").contains("test mocked"), "err: {err:?}",);
569    }
570
571    #[tokio::test]
572    async fn test_downgrade_region_with_retry_fast_path() {
573        let state = DowngradeLeaderRegion::default();
574        let persistent_context = new_persistent_context();
575        let from_peer_id = persistent_context.from_peer.id;
576
577        let mut env = TestingEnv::new();
578        let mut ctx = env.context_factory().new_context(persistent_context);
579        prepare_table_metadata(&ctx, HashMap::default()).await;
580        let mailbox_ctx = env.mailbox_context();
581        let mailbox = mailbox_ctx.mailbox().clone();
582
583        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
584
585        mailbox_ctx
586            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
587            .await;
588
589        common_runtime::spawn_global(async move {
590            // retry: 0.
591            let resp = rx.recv().await.unwrap().unwrap();
592            let reply_id = resp.mailbox_message.unwrap().id;
593            mailbox
594                .on_recv(
595                    reply_id,
596                    Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
597                )
598                .await
599                .unwrap();
600
601            // retry: 1.
602            let resp = rx.recv().await.unwrap().unwrap();
603            let reply_id = resp.mailbox_message.unwrap().id;
604            mailbox
605                .on_recv(
606                    reply_id,
607                    Ok(new_downgrade_region_reply(reply_id, Some(1), true, None)),
608                )
609                .await
610                .unwrap();
611        });
612
613        state.downgrade_region_with_retry(&mut ctx).await.unwrap();
614        assert_eq!(
615            ctx.volatile_ctx
616                .leader_region_last_entry_ids
617                .get(&RegionId::new(0, 0))
618                .cloned(),
619            Some(1)
620        );
621        assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none());
622    }
623
624    #[tokio::test]
625    async fn test_downgrade_region_with_retry_slow_path() {
626        let state = DowngradeLeaderRegion {
627            optimistic_retry: 3,
628            retry_initial_interval: Duration::from_millis(100),
629        };
630        let persistent_context = new_persistent_context();
631        let from_peer_id = persistent_context.from_peer.id;
632
633        let mut env = TestingEnv::new();
634        let mut ctx = env.context_factory().new_context(persistent_context);
635        let mailbox_ctx = env.mailbox_context();
636        let mailbox = mailbox_ctx.mailbox().clone();
637
638        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
639
640        mailbox_ctx
641            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
642            .await;
643
644        common_runtime::spawn_global(async move {
645            for _ in 0..3 {
646                let resp = rx.recv().await.unwrap().unwrap();
647                let reply_id = resp.mailbox_message.unwrap().id;
648                mailbox
649                    .on_recv(
650                        reply_id,
651                        Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
652                    )
653                    .await
654                    .unwrap();
655            }
656        });
657
658        ctx.volatile_ctx
659            .set_leader_region_lease_deadline(Duration::from_secs(5));
660        let expected_deadline = ctx.volatile_ctx.leader_region_lease_deadline.unwrap();
661        let err = state
662            .downgrade_region_with_retry(&mut ctx)
663            .await
664            .unwrap_err();
665        assert_matches!(err, error::Error::DowngradeLeader { .. });
666        // assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, None);
667        // Should remain no change.
668        assert_eq!(
669            ctx.volatile_ctx.leader_region_lease_deadline.unwrap(),
670            expected_deadline
671        )
672    }
673
674    #[tokio::test]
675    async fn test_next_upgrade_candidate_state() {
676        let mut state = Box::<DowngradeLeaderRegion>::default();
677        let persistent_context = new_persistent_context();
678        let from_peer_id = persistent_context.from_peer.id;
679
680        let mut env = TestingEnv::new();
681        let mut ctx = env.context_factory().new_context(persistent_context);
682        prepare_table_metadata(&ctx, HashMap::default()).await;
683        let mailbox_ctx = env.mailbox_context();
684        let mailbox = mailbox_ctx.mailbox().clone();
685
686        let (tx, rx) = tokio::sync::mpsc::channel(1);
687
688        mailbox_ctx
689            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
690            .await;
691
692        send_mock_reply(mailbox, rx, |id| {
693            Ok(new_downgrade_region_reply(id, Some(1), true, None))
694        });
695
696        let timer = Instant::now();
697        let procedure_ctx = new_procedure_context();
698        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
699        let elapsed = timer.elapsed().as_secs();
700        assert!(elapsed < REGION_LEASE_SECS / 2);
701        assert_eq!(
702            ctx.volatile_ctx
703                .leader_region_last_entry_ids
704                .get(&RegionId::new(0, 0))
705                .cloned(),
706            Some(1)
707        );
708        assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none());
709
710        let _ = next
711            .as_any()
712            .downcast_ref::<UpgradeCandidateRegion>()
713            .unwrap();
714    }
715
716    #[tokio::test]
717    async fn test_downgrade_region_procedure_exceeded_deadline() {
718        let mut state = Box::<UpgradeCandidateRegion>::default();
719        state.retry_initial_interval = Duration::from_millis(100);
720        let persistent_context = new_persistent_context();
721        let to_peer_id = persistent_context.to_peer.id;
722
723        let mut env = TestingEnv::new();
724        let mut ctx = env.context_factory().new_context(persistent_context);
725        let mailbox_ctx = env.mailbox_context();
726        let mailbox = mailbox_ctx.mailbox().clone();
727        ctx.volatile_ctx.metrics.operations_elapsed =
728            ctx.persistent_ctx.timeout + Duration::from_secs(1);
729
730        let (tx, rx) = tokio::sync::mpsc::channel(1);
731        mailbox_ctx
732            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
733            .await;
734
735        send_mock_reply(mailbox, rx, |id| {
736            Ok(new_downgrade_region_reply(id, None, true, None))
737        });
738        let procedure_ctx = new_procedure_context();
739        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
740        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
741        assert_matches!(update_metadata, UpdateMetadata::Rollback);
742    }
743}