Skip to main content

meta_srv/procedure/region_migration/
downgrade_leader_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::time::Duration;
17
18use api::v1::meta::MailboxMessage;
19use common_error::ext::BoxedError;
20use common_meta::distributed_time_constants::default_distributed_time_constants;
21use common_meta::instruction::{
22    DowngradeRegion, DowngradeRegionReply, DowngradeRegionsReply, Instruction, InstructionReply,
23};
24use common_procedure::{Context as ProcedureContext, Status};
25use common_telemetry::tracing_context::TracingContext;
26use common_telemetry::{debug, error, info, warn};
27use common_time::util::current_time_millis;
28use serde::{Deserialize, Serialize};
29use snafu::{OptionExt, ResultExt};
30use tokio::time::{Instant, sleep};
31
32use crate::discovery::utils::find_datanode_lease_value;
33use crate::error::{self, Result};
34use crate::handler::HeartbeatMailbox;
35use crate::procedure::region_migration::update_metadata::UpdateMetadata;
36use crate::procedure::region_migration::upgrade_candidate_region::UpgradeCandidateRegion;
37use crate::procedure::region_migration::{Context, State};
38use crate::procedure::utils::instruction_error_result;
39use crate::service::mailbox::Channel;
40
41#[derive(Debug, Serialize, Deserialize)]
42pub struct DowngradeLeaderRegion {
43    // The optimistic retry times.
44    optimistic_retry: usize,
45    // The retry initial interval.
46    retry_initial_interval: Duration,
47}
48
49impl Default for DowngradeLeaderRegion {
50    fn default() -> Self {
51        Self {
52            optimistic_retry: 3,
53            retry_initial_interval: Duration::from_millis(500),
54        }
55    }
56}
57
58#[async_trait::async_trait]
59#[typetag::serde]
60impl State for DowngradeLeaderRegion {
61    async fn next(
62        &mut self,
63        ctx: &mut Context,
64        _procedure_ctx: &ProcedureContext,
65    ) -> Result<(Box<dyn State>, Status)> {
66        let now = Instant::now();
67        // Ensures the `leader_region_lease_deadline` must exist after recovering.
68        ctx.volatile_ctx
69            .set_leader_region_lease_deadline(default_distributed_time_constants().region_lease);
70
71        match self.downgrade_region_with_retry(ctx).await {
72            Ok(_) => {
73                // Do nothing
74                info!(
75                    "Downgraded region leader success, region: {:?}",
76                    ctx.persistent_ctx.region_ids
77                );
78            }
79            Err(error::Error::ExceededDeadline { .. }) => {
80                info!(
81                    "Downgrade region leader exceeded deadline, region: {:?}",
82                    ctx.persistent_ctx.region_ids
83                );
84                // Rollbacks the metadata if procedure is timeout
85                return Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false)));
86            }
87            Err(err) => {
88                error!(err; "Occurs non-retryable error, region: {:?}", ctx.persistent_ctx.region_ids);
89                if let Some(deadline) = ctx.volatile_ctx.leader_region_lease_deadline.as_ref() {
90                    info!(
91                        "Running into the downgrade region leader slow path, region: {:?}, sleep until {:?}",
92                        ctx.persistent_ctx.region_ids, deadline
93                    );
94                    tokio::time::sleep_until(*deadline).await;
95                } else {
96                    warn!(
97                        "Leader region lease deadline is not set, region: {:?}",
98                        ctx.persistent_ctx.region_ids
99                    );
100                }
101            }
102        }
103        ctx.update_downgrade_leader_region_elapsed(now);
104
105        Ok((
106            Box::new(UpgradeCandidateRegion::default()),
107            Status::executing(false),
108        ))
109    }
110
111    fn as_any(&self) -> &dyn Any {
112        self
113    }
114}
115
116impl DowngradeLeaderRegion {
117    /// Builds downgrade region instruction.
118    fn build_downgrade_region_instruction(
119        &self,
120        ctx: &Context,
121        flush_timeout: Duration,
122    ) -> Instruction {
123        let region_ids = &ctx.persistent_ctx.region_ids;
124        let mut downgrade_regions = Vec::with_capacity(region_ids.len());
125        for region_id in region_ids {
126            downgrade_regions.push(DowngradeRegion {
127                region_id: *region_id,
128                flush_timeout: Some(flush_timeout),
129            });
130        }
131
132        Instruction::DowngradeRegions(downgrade_regions)
133    }
134
135    fn handle_downgrade_region_reply(
136        &self,
137        ctx: &mut Context,
138        reply: &DowngradeRegionReply,
139        now: &Instant,
140    ) -> Result<()> {
141        let leader = &ctx.persistent_ctx.from_peer;
142        let DowngradeRegionReply {
143            region_id,
144            last_entry_id,
145            metadata_last_entry_id,
146            exists,
147            error,
148        } = reply;
149
150        if let Some(error) = error {
151            return instruction_error_result(
152                error,
153                format!(
154                    "Failed to downgrade the region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
155                    region_id,
156                    leader,
157                    error,
158                    now.elapsed()
159                ),
160            );
161        }
162
163        if !exists {
164            warn!(
165                "Trying to downgrade the region {} on datanode {:?}, but region doesn't exist!, elapsed: {:?}",
166                region_id,
167                leader,
168                now.elapsed()
169            );
170        } else {
171            info!(
172                "Region {} leader is downgraded on datanode {:?}, last_entry_id: {:?}, metadata_last_entry_id: {:?}, elapsed: {:?}",
173                region_id,
174                leader,
175                last_entry_id,
176                metadata_last_entry_id,
177                now.elapsed()
178            );
179        }
180
181        if let Some(last_entry_id) = last_entry_id {
182            debug!(
183                "set last_entry_id: {:?}, region_id: {:?}",
184                last_entry_id, region_id
185            );
186            ctx.volatile_ctx
187                .set_last_entry_id(*region_id, *last_entry_id);
188        }
189
190        if let Some(metadata_last_entry_id) = metadata_last_entry_id {
191            ctx.volatile_ctx
192                .set_metadata_last_entry_id(*region_id, *metadata_last_entry_id);
193        }
194
195        Ok(())
196    }
197
198    /// Tries to downgrade a leader region.
199    ///
200    /// Retry:
201    /// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout.
202    /// - Failed to downgrade region on the Datanode.
203    ///
204    /// Abort:
205    /// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable.
206    /// - [PushMessage](error::Error::PushMessage), The receiver is dropped.
207    /// - [MailboxReceiver](error::Error::MailboxReceiver), The sender is dropped without sending (impossible).
208    /// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply).
209    /// - [ExceededDeadline](error::Error::ExceededDeadline)
210    /// - Invalid JSON.
211    async fn downgrade_region(&self, ctx: &mut Context) -> Result<()> {
212        let region_ids = &ctx.persistent_ctx.region_ids;
213        let operation_timeout =
214            ctx.next_operation_timeout()
215                .context(error::ExceededDeadlineSnafu {
216                    operation: "Downgrade region",
217                })?;
218        let downgrade_instruction = self.build_downgrade_region_instruction(ctx, operation_timeout);
219
220        let leader = &ctx.persistent_ctx.from_peer;
221        let tracing_ctx = TracingContext::from_current_span();
222        let msg = MailboxMessage::json_message(
223            &format!("Downgrade leader regions: {:?}", region_ids),
224            &format!("Metasrv@{}", ctx.server_addr()),
225            &format!("Datanode-{}@{}", leader.id, leader.addr),
226            common_time::util::current_time_millis(),
227            &downgrade_instruction,
228            Some(tracing_ctx.to_w3c()),
229        )
230        .with_context(|_| error::SerializeToJsonSnafu {
231            input: downgrade_instruction.to_string(),
232        })?;
233
234        let ch = Channel::Datanode(leader.id);
235        let now = Instant::now();
236        let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
237
238        match receiver.await {
239            Ok(msg) => {
240                let reply = HeartbeatMailbox::json_reply(&msg)?;
241                info!(
242                    "Received downgrade region reply: {:?}, region: {:?}, elapsed: {:?}",
243                    reply,
244                    region_ids,
245                    now.elapsed()
246                );
247                let InstructionReply::DowngradeRegions(DowngradeRegionsReply { replies }) = reply
248                else {
249                    return error::UnexpectedInstructionReplySnafu {
250                        mailbox_message: msg.to_string(),
251                        reason: "expect downgrade region reply",
252                    }
253                    .fail();
254                };
255
256                for reply in replies {
257                    self.handle_downgrade_region_reply(ctx, &reply, &now)?;
258                }
259                Ok(())
260            }
261            Err(error::Error::MailboxTimeout { .. }) => {
262                let reason = format!(
263                    "Mailbox received timeout for downgrade leader region {region_ids:?} on datanode {:?}, elapsed: {:?}",
264                    leader,
265                    now.elapsed()
266                );
267                error::RetryLaterSnafu { reason }.fail()
268            }
269            Err(err) => Err(err),
270        }
271    }
272
273    async fn update_leader_region_lease_deadline(&self, ctx: &mut Context) {
274        let leader = &ctx.persistent_ctx.from_peer;
275
276        let last_connection_at = match find_datanode_lease_value(&ctx.in_memory, leader.id).await {
277            Ok(lease_value) => lease_value.map(|lease_value| lease_value.timestamp_millis),
278            Err(err) => {
279                error!(err; "Failed to find datanode lease value for datanode: {}, during region migration, region: {:?}", leader, ctx.persistent_ctx.region_ids);
280                return;
281            }
282        };
283
284        if let Some(last_connection_at) = last_connection_at {
285            let now = current_time_millis();
286            let elapsed = now - last_connection_at;
287            let region_lease = default_distributed_time_constants().region_lease;
288
289            // It's safe to update the region leader lease deadline here because:
290            // 1. The old region leader has already been marked as downgraded in metadata,
291            //    which means any attempts to renew its lease will be rejected.
292            // 2. The pusher disconnect time record only gets removed when the datanode (from_peer)
293            //    establishes a new heartbeat connection stream.
294            if elapsed >= (region_lease.as_secs() * 1000) as i64 {
295                ctx.volatile_ctx.reset_leader_region_lease_deadline();
296                info!(
297                    "Datanode {}({}) has been disconnected for longer than the region lease period ({:?}), reset leader region lease deadline to None, region: {:?}",
298                    leader, last_connection_at, region_lease, ctx.persistent_ctx.region_ids
299                );
300            } else if elapsed > 0 {
301                // `now - last_connection_at` < REGION_LEASE_SECS * 1000
302                let lease_timeout =
303                    region_lease - Duration::from_millis((now - last_connection_at) as u64);
304                ctx.volatile_ctx.reset_leader_region_lease_deadline();
305                ctx.volatile_ctx
306                    .set_leader_region_lease_deadline(lease_timeout);
307                info!(
308                    "Datanode {}({}) last connected {:?} ago, updated leader region lease deadline to {:?}, region: {:?}",
309                    leader,
310                    last_connection_at,
311                    elapsed,
312                    ctx.volatile_ctx.leader_region_lease_deadline,
313                    ctx.persistent_ctx.region_ids
314                );
315            } else {
316                warn!(
317                    "Datanode {} has invalid last connection timestamp: {} (which is after current time: {}), region: {:?}",
318                    leader, last_connection_at, now, ctx.persistent_ctx.region_ids
319                )
320            }
321        } else {
322            warn!(
323                "Failed to find last connection time for datanode {}, unable to update region lease deadline, region: {:?}",
324                leader, ctx.persistent_ctx.region_ids
325            )
326        }
327    }
328
329    /// Downgrades a leader region.
330    ///
331    /// Fast path:
332    /// - Waits for the reply of downgrade instruction.
333    ///
334    /// Slow path:
335    /// - Waits for the lease of the leader region expired.
336    ///
337    /// Abort:
338    /// - ExceededDeadline
339    async fn downgrade_region_with_retry(&self, ctx: &mut Context) -> Result<()> {
340        let mut retry = 0;
341
342        loop {
343            let timer = Instant::now();
344            if let Err(err) = self.downgrade_region(ctx).await {
345                ctx.update_operations_elapsed(timer);
346                retry += 1;
347                // Throws the error immediately if the procedure exceeded the deadline.
348                if matches!(err, error::Error::ExceededDeadline { .. }) {
349                    error!(err; "Failed to downgrade region leader, regions: {:?}, exceeded deadline", ctx.persistent_ctx.region_ids);
350                    return Err(err);
351                } else if matches!(err, error::Error::PusherNotFound { .. }) {
352                    // Throws the error immediately if the datanode is unreachable.
353                    error!(err; "Failed to downgrade region leader, regions: {:?}, datanode({}) is unreachable(PusherNotFound)", ctx.persistent_ctx.region_ids, ctx.persistent_ctx.from_peer.id);
354                    self.update_leader_region_lease_deadline(ctx).await;
355                    return Err(err);
356                } else if err.is_retryable() && retry < self.optimistic_retry {
357                    error!(err; "Failed to downgrade region leader, regions: {:?}, retry later", ctx.persistent_ctx.region_ids);
358                    sleep(self.retry_initial_interval).await;
359                } else {
360                    return Err(BoxedError::new(err)).context(error::DowngradeLeaderSnafu {
361                        // TODO(weny): handle multiple regions.
362                        region_id: ctx.persistent_ctx.region_ids[0],
363                    })?;
364                }
365            } else {
366                ctx.update_operations_elapsed(timer);
367                // Resets the deadline.
368                ctx.volatile_ctx.reset_leader_region_lease_deadline();
369                break;
370            }
371        }
372
373        Ok(())
374    }
375}
376
377#[cfg(test)]
378mod tests {
379    use std::assert_matches;
380    use std::collections::HashMap;
381
382    use common_meta::key::table_route::TableRouteValue;
383    use common_meta::key::test_utils::new_test_table_info;
384    use common_meta::peer::Peer;
385    use common_meta::rpc::router::{Region, RegionRoute};
386    use common_meta::wal_provider::RegionWalOptions;
387    use store_api::storage::RegionId;
388    use tokio::time::Instant;
389
390    use super::*;
391    use crate::error::Error;
392    use crate::procedure::region_migration::manager::RegionMigrationTriggerReason;
393    use crate::procedure::region_migration::test_util::{TestingEnv, new_procedure_context};
394    use crate::procedure::region_migration::{ContextFactory, PersistentContext};
395    use crate::procedure::test_util::{
396        new_close_region_reply, new_downgrade_region_reply, send_mock_reply,
397    };
398
399    fn new_persistent_context() -> PersistentContext {
400        PersistentContext::new(
401            vec![("greptime".into(), "public".into())],
402            Peer::empty(1),
403            Peer::empty(2),
404            vec![RegionId::new(1024, 1)],
405            Duration::from_millis(1000),
406            RegionMigrationTriggerReason::Manual,
407        )
408    }
409
410    async fn prepare_table_metadata(ctx: &Context, wal_options: RegionWalOptions) {
411        let region_id = ctx.persistent_ctx.region_ids[0];
412        let table_info = new_test_table_info(region_id.table_id());
413        let region_routes = vec![RegionRoute {
414            region: Region::new_test(region_id),
415            leader_peer: Some(ctx.persistent_ctx.from_peer.clone()),
416            follower_peers: vec![ctx.persistent_ctx.to_peer.clone()],
417            ..Default::default()
418        }];
419        ctx.table_metadata_manager
420            .create_table_metadata(
421                table_info,
422                TableRouteValue::physical(region_routes),
423                wal_options,
424            )
425            .await
426            .unwrap();
427    }
428
429    #[tokio::test]
430    async fn test_datanode_is_unreachable() {
431        let state = DowngradeLeaderRegion::default();
432        let persistent_context = new_persistent_context();
433        let env = TestingEnv::new();
434        let mut ctx = env.context_factory().new_context(persistent_context);
435        prepare_table_metadata(&ctx, HashMap::default()).await;
436        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
437
438        assert_matches!(err, Error::PusherNotFound { .. });
439        assert!(!err.is_retryable());
440    }
441
442    #[tokio::test]
443    async fn test_pusher_dropped() {
444        let state = DowngradeLeaderRegion::default();
445        let persistent_context = new_persistent_context();
446        let from_peer_id = persistent_context.from_peer.id;
447
448        let mut env = TestingEnv::new();
449        let mut ctx = env.context_factory().new_context(persistent_context);
450        prepare_table_metadata(&ctx, HashMap::default()).await;
451        let mailbox_ctx = env.mailbox_context();
452
453        let (tx, rx) = tokio::sync::mpsc::channel(1);
454
455        mailbox_ctx
456            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
457            .await;
458
459        drop(rx);
460
461        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
462
463        assert_matches!(err, Error::PushMessage { .. });
464        assert!(!err.is_retryable());
465    }
466
467    #[tokio::test]
468    async fn test_procedure_exceeded_deadline() {
469        let state = DowngradeLeaderRegion::default();
470        let persistent_context = new_persistent_context();
471        let env = TestingEnv::new();
472        let mut ctx = env.context_factory().new_context(persistent_context);
473        prepare_table_metadata(&ctx, HashMap::default()).await;
474        ctx.volatile_ctx.metrics.operations_elapsed =
475            ctx.persistent_ctx.timeout + Duration::from_secs(1);
476
477        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
478
479        assert_matches!(err, Error::ExceededDeadline { .. });
480        assert!(!err.is_retryable());
481
482        let err = state
483            .downgrade_region_with_retry(&mut ctx)
484            .await
485            .unwrap_err();
486        assert_matches!(err, Error::ExceededDeadline { .. });
487        assert!(!err.is_retryable());
488    }
489
490    #[tokio::test]
491    async fn test_unexpected_instruction_reply() {
492        let state = DowngradeLeaderRegion::default();
493        let persistent_context = new_persistent_context();
494        let from_peer_id = persistent_context.from_peer.id;
495
496        let mut env = TestingEnv::new();
497        let mut ctx = env.context_factory().new_context(persistent_context);
498        prepare_table_metadata(&ctx, HashMap::default()).await;
499        let mailbox_ctx = env.mailbox_context();
500        let mailbox = mailbox_ctx.mailbox().clone();
501
502        let (tx, rx) = tokio::sync::mpsc::channel(1);
503
504        mailbox_ctx
505            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
506            .await;
507
508        // Sends an incorrect reply.
509        send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
510
511        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
512
513        assert_matches!(err, Error::UnexpectedInstructionReply { .. });
514        assert!(!err.is_retryable());
515    }
516
517    #[tokio::test]
518    async fn test_instruction_exceeded_deadline() {
519        let state = DowngradeLeaderRegion::default();
520        let persistent_context = new_persistent_context();
521        let from_peer_id = persistent_context.from_peer.id;
522
523        let mut env = TestingEnv::new();
524        let mut ctx = env.context_factory().new_context(persistent_context);
525        prepare_table_metadata(&ctx, HashMap::default()).await;
526        let mailbox_ctx = env.mailbox_context();
527        let mailbox = mailbox_ctx.mailbox().clone();
528
529        let (tx, rx) = tokio::sync::mpsc::channel(1);
530
531        mailbox_ctx
532            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
533            .await;
534
535        send_mock_reply(mailbox, rx, |id| {
536            Err(error::MailboxTimeoutSnafu { id }.build())
537        });
538
539        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
540
541        assert_matches!(err, Error::RetryLater { .. });
542        assert!(err.is_retryable());
543    }
544
545    #[tokio::test]
546    async fn test_downgrade_region_failed() {
547        let state = DowngradeLeaderRegion::default();
548        let persistent_context = new_persistent_context();
549        let from_peer_id = persistent_context.from_peer.id;
550
551        let mut env = TestingEnv::new();
552        let mut ctx = env.context_factory().new_context(persistent_context);
553        prepare_table_metadata(&ctx, HashMap::default()).await;
554        let mailbox_ctx = env.mailbox_context();
555        let mailbox = mailbox_ctx.mailbox().clone();
556
557        let (tx, rx) = tokio::sync::mpsc::channel(1);
558
559        mailbox_ctx
560            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
561            .await;
562
563        send_mock_reply(mailbox, rx, |id| {
564            Ok(new_downgrade_region_reply(
565                id,
566                None,
567                false,
568                Some("test mocked".to_string()),
569            ))
570        });
571
572        let err = state.downgrade_region(&mut ctx).await.unwrap_err();
573
574        assert_matches!(err, Error::RetryLater { .. });
575        assert!(err.is_retryable());
576        assert!(format!("{err:?}").contains("test mocked"), "err: {err:?}",);
577    }
578
579    #[tokio::test]
580    async fn test_downgrade_region_with_retry_fast_path() {
581        let state = DowngradeLeaderRegion::default();
582        let persistent_context = new_persistent_context();
583        let from_peer_id = persistent_context.from_peer.id;
584
585        let mut env = TestingEnv::new();
586        let mut ctx = env.context_factory().new_context(persistent_context);
587        prepare_table_metadata(&ctx, HashMap::default()).await;
588        let mailbox_ctx = env.mailbox_context();
589        let mailbox = mailbox_ctx.mailbox().clone();
590
591        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
592
593        mailbox_ctx
594            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
595            .await;
596
597        common_runtime::spawn_global(async move {
598            // retry: 0.
599            let resp = rx.recv().await.unwrap().unwrap();
600            let reply_id = resp.mailbox_message.unwrap().id;
601            mailbox
602                .on_recv(
603                    reply_id,
604                    Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
605                )
606                .await
607                .unwrap();
608
609            // retry: 1.
610            let resp = rx.recv().await.unwrap().unwrap();
611            let reply_id = resp.mailbox_message.unwrap().id;
612            mailbox
613                .on_recv(
614                    reply_id,
615                    Ok(new_downgrade_region_reply(reply_id, Some(1), true, None)),
616                )
617                .await
618                .unwrap();
619        });
620
621        state.downgrade_region_with_retry(&mut ctx).await.unwrap();
622        assert_eq!(
623            ctx.volatile_ctx
624                .leader_region_last_entry_ids
625                .get(&RegionId::new(0, 0))
626                .cloned(),
627            Some(1)
628        );
629        assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none());
630    }
631
632    #[tokio::test]
633    async fn test_downgrade_region_with_retry_slow_path() {
634        let state = DowngradeLeaderRegion {
635            optimistic_retry: 3,
636            retry_initial_interval: Duration::from_millis(100),
637        };
638        let persistent_context = new_persistent_context();
639        let from_peer_id = persistent_context.from_peer.id;
640
641        let mut env = TestingEnv::new();
642        let mut ctx = env.context_factory().new_context(persistent_context);
643        let mailbox_ctx = env.mailbox_context();
644        let mailbox = mailbox_ctx.mailbox().clone();
645
646        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
647
648        mailbox_ctx
649            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
650            .await;
651
652        common_runtime::spawn_global(async move {
653            for _ in 0..3 {
654                let resp = rx.recv().await.unwrap().unwrap();
655                let reply_id = resp.mailbox_message.unwrap().id;
656                mailbox
657                    .on_recv(
658                        reply_id,
659                        Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
660                    )
661                    .await
662                    .unwrap();
663            }
664        });
665
666        ctx.volatile_ctx
667            .set_leader_region_lease_deadline(Duration::from_secs(5));
668        let expected_deadline = ctx.volatile_ctx.leader_region_lease_deadline.unwrap();
669        let err = state
670            .downgrade_region_with_retry(&mut ctx)
671            .await
672            .unwrap_err();
673        assert_matches!(err, error::Error::DowngradeLeader { .. });
674        // assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, None);
675        // Should remain no change.
676        assert_eq!(
677            ctx.volatile_ctx.leader_region_lease_deadline.unwrap(),
678            expected_deadline
679        )
680    }
681
682    #[tokio::test]
683    async fn test_next_upgrade_candidate_state() {
684        let mut state = Box::<DowngradeLeaderRegion>::default();
685        let persistent_context = new_persistent_context();
686        let from_peer_id = persistent_context.from_peer.id;
687
688        let mut env = TestingEnv::new();
689        let mut ctx = env.context_factory().new_context(persistent_context);
690        prepare_table_metadata(&ctx, HashMap::default()).await;
691        let mailbox_ctx = env.mailbox_context();
692        let mailbox = mailbox_ctx.mailbox().clone();
693
694        let (tx, rx) = tokio::sync::mpsc::channel(1);
695
696        mailbox_ctx
697            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
698            .await;
699
700        send_mock_reply(mailbox, rx, |id| {
701            Ok(new_downgrade_region_reply(id, Some(1), true, None))
702        });
703
704        let timer = Instant::now();
705        let procedure_ctx = new_procedure_context();
706        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
707        let elapsed = timer.elapsed().as_secs();
708        let region_lease = default_distributed_time_constants().region_lease.as_secs();
709        assert!(elapsed < region_lease / 2);
710        assert_eq!(
711            ctx.volatile_ctx
712                .leader_region_last_entry_ids
713                .get(&RegionId::new(0, 0))
714                .cloned(),
715            Some(1)
716        );
717        assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none());
718
719        let _ = next
720            .as_any()
721            .downcast_ref::<UpgradeCandidateRegion>()
722            .unwrap();
723    }
724
725    #[tokio::test]
726    async fn test_downgrade_region_procedure_exceeded_deadline() {
727        let mut state = Box::<UpgradeCandidateRegion>::default();
728        state.retry_initial_interval = Duration::from_millis(100);
729        let persistent_context = new_persistent_context();
730        let to_peer_id = persistent_context.to_peer.id;
731
732        let mut env = TestingEnv::new();
733        let mut ctx = env.context_factory().new_context(persistent_context);
734        let mailbox_ctx = env.mailbox_context();
735        let mailbox = mailbox_ctx.mailbox().clone();
736        ctx.volatile_ctx.metrics.operations_elapsed =
737            ctx.persistent_ctx.timeout + Duration::from_secs(1);
738
739        let (tx, rx) = tokio::sync::mpsc::channel(1);
740        mailbox_ctx
741            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
742            .await;
743
744        send_mock_reply(mailbox, rx, |id| {
745            Ok(new_downgrade_region_reply(id, None, true, None))
746        });
747        let procedure_ctx = new_procedure_context();
748        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
749        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
750        assert_matches!(update_metadata, UpdateMetadata::Rollback);
751    }
752}