meta_srv/procedure/region_migration/
upgrade_candidate_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::time::Duration;
17
18use api::v1::meta::MailboxMessage;
19use common_meta::ddl::utils::parse_region_wal_options;
20use common_meta::instruction::{
21    Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply, UpgradeRegionsReply,
22};
23use common_meta::lock_key::RemoteWalLock;
24use common_meta::wal_options_allocator::extract_topic_from_wal_options;
25use common_procedure::{Context as ProcedureContext, Status};
26use common_telemetry::{error, warn};
27use common_wal::options::WalOptions;
28use serde::{Deserialize, Serialize};
29use snafu::{OptionExt, ResultExt, ensure};
30use tokio::time::{Instant, sleep};
31
32use crate::error::{self, Result};
33use crate::handler::HeartbeatMailbox;
34use crate::procedure::region_migration::update_metadata::UpdateMetadata;
35use crate::procedure::region_migration::{Context, State};
36use crate::service::mailbox::Channel;
37
38#[derive(Debug, Serialize, Deserialize)]
39pub struct UpgradeCandidateRegion {
40    // The optimistic retry times.
41    pub(crate) optimistic_retry: usize,
42    // The retry initial interval.
43    pub(crate) retry_initial_interval: Duration,
44    // If it's true it requires the candidate region MUST replay the WAL to the latest entry id.
45    // Otherwise, it will rollback to the old leader region.
46    pub(crate) require_ready: bool,
47}
48
49impl Default for UpgradeCandidateRegion {
50    fn default() -> Self {
51        Self {
52            optimistic_retry: 3,
53            retry_initial_interval: Duration::from_millis(500),
54            require_ready: true,
55        }
56    }
57}
58
59#[async_trait::async_trait]
60#[typetag::serde]
61impl State for UpgradeCandidateRegion {
62    async fn next(
63        &mut self,
64        ctx: &mut Context,
65        procedure_ctx: &ProcedureContext,
66    ) -> Result<(Box<dyn State>, Status)> {
67        let now = Instant::now();
68
69        let region_wal_option = self.get_region_wal_option(ctx).await?;
70        let region_id = ctx.persistent_ctx.region_id;
71        if region_wal_option.is_none() {
72            warn!(
73                "Region {} wal options not found, during upgrade candidate region",
74                region_id
75            );
76        }
77
78        if self
79            .upgrade_region_with_retry(ctx, procedure_ctx, region_wal_option.as_ref())
80            .await
81        {
82            ctx.update_upgrade_candidate_region_elapsed(now);
83            Ok((Box::new(UpdateMetadata::Upgrade), Status::executing(false)))
84        } else {
85            ctx.update_upgrade_candidate_region_elapsed(now);
86            Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false)))
87        }
88    }
89
90    fn as_any(&self) -> &dyn Any {
91        self
92    }
93}
94
95impl UpgradeCandidateRegion {
96    async fn get_region_wal_option(&self, ctx: &mut Context) -> Result<Option<WalOptions>> {
97        let region_id = ctx.persistent_ctx.region_id;
98        match ctx.get_from_peer_datanode_table_value().await {
99            Ok(datanode_table_value) => {
100                let region_wal_options =
101                    parse_region_wal_options(&datanode_table_value.region_info.region_wal_options)
102                        .context(error::ParseWalOptionsSnafu)?;
103                Ok(region_wal_options.get(&region_id.region_number()).cloned())
104            }
105            Err(error::Error::DatanodeTableNotFound { datanode_id, .. }) => {
106                warn!(
107                    "Datanode table not found, during upgrade candidate region, the target region might already been migrated, region_id: {}, datanode_id: {}",
108                    region_id, datanode_id
109                );
110                Ok(None)
111            }
112            Err(e) => Err(e),
113        }
114    }
115
116    /// Builds upgrade region instruction.
117    async fn build_upgrade_region_instruction(
118        &self,
119        ctx: &mut Context,
120        replay_timeout: Duration,
121    ) -> Result<Instruction> {
122        let pc = &ctx.persistent_ctx;
123        let region_id = pc.region_id;
124        let last_entry_id = ctx.volatile_ctx.leader_region_last_entry_id;
125        let metadata_last_entry_id = ctx.volatile_ctx.leader_region_metadata_last_entry_id;
126        // Try our best to retrieve replay checkpoint.
127        let datanode_table_value = ctx.get_from_peer_datanode_table_value().await.ok();
128        let checkpoint = if let Some(topic) = datanode_table_value.as_ref().and_then(|v| {
129            extract_topic_from_wal_options(region_id, &v.region_info.region_wal_options)
130        }) {
131            ctx.fetch_replay_checkpoint(&topic).await.ok().flatten()
132        } else {
133            None
134        };
135
136        let upgrade_instruction = Instruction::UpgradeRegions(vec![
137            UpgradeRegion {
138                region_id,
139                last_entry_id,
140                metadata_last_entry_id,
141                replay_timeout,
142                location_id: Some(ctx.persistent_ctx.from_peer.id),
143                replay_entry_id: None,
144                metadata_replay_entry_id: None,
145            }
146            .with_replay_entry_id(checkpoint.map(|c| c.entry_id))
147            .with_metadata_replay_entry_id(checkpoint.and_then(|c| c.metadata_entry_id)),
148        ]);
149
150        Ok(upgrade_instruction)
151    }
152
153    /// Tries to upgrade a candidate region.
154    ///
155    /// Retry:
156    /// - If `require_ready` is true, but the candidate region returns `ready` is false.
157    /// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout.
158    ///
159    /// Abort:
160    /// - The candidate region doesn't exist.
161    /// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable.
162    /// - [PushMessage](error::Error::PushMessage), The receiver is dropped.
163    /// - [MailboxReceiver](error::Error::MailboxReceiver), The sender is dropped without sending (impossible).
164    /// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply) (impossible).
165    /// - [ExceededDeadline](error::Error::ExceededDeadline)
166    /// - Invalid JSON (impossible).
167    async fn upgrade_region(&self, ctx: &mut Context) -> Result<()> {
168        let operation_timeout =
169            ctx.next_operation_timeout()
170                .context(error::ExceededDeadlineSnafu {
171                    operation: "Upgrade region",
172                })?;
173        let upgrade_instruction = self
174            .build_upgrade_region_instruction(ctx, operation_timeout)
175            .await?;
176
177        let pc = &ctx.persistent_ctx;
178        let region_id = pc.region_id;
179        let candidate = &pc.to_peer;
180
181        let msg = MailboxMessage::json_message(
182            &format!("Upgrade candidate region: {}", region_id),
183            &format!("Metasrv@{}", ctx.server_addr()),
184            &format!("Datanode-{}@{}", candidate.id, candidate.addr),
185            common_time::util::current_time_millis(),
186            &upgrade_instruction,
187        )
188        .with_context(|_| error::SerializeToJsonSnafu {
189            input: upgrade_instruction.to_string(),
190        })?;
191
192        let ch = Channel::Datanode(candidate.id);
193        let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
194
195        match receiver.await {
196            Ok(msg) => {
197                let reply = HeartbeatMailbox::json_reply(&msg)?;
198                let InstructionReply::UpgradeRegions(UpgradeRegionsReply { replies }) = reply
199                else {
200                    return error::UnexpectedInstructionReplySnafu {
201                        mailbox_message: msg.to_string(),
202                        reason: "Unexpected reply of the upgrade region instruction",
203                    }
204                    .fail();
205                };
206                // TODO(weny): handle multiple replies.
207                let UpgradeRegionReply {
208                    ready,
209                    exists,
210                    error,
211                    ..
212                } = &replies[0];
213
214                // Notes: The order of handling is important.
215                if error.is_some() {
216                    return error::RetryLaterSnafu {
217                        reason: format!(
218                            "Failed to upgrade the region {} on datanode {:?}, error: {:?}",
219                            region_id, candidate, error
220                        ),
221                    }
222                    .fail();
223                }
224
225                ensure!(
226                    exists,
227                    error::UnexpectedSnafu {
228                        violated: format!(
229                            "Candidate region {} doesn't exist on datanode {:?}",
230                            region_id, candidate
231                        )
232                    }
233                );
234
235                if self.require_ready && !ready {
236                    return error::RetryLaterSnafu {
237                        reason: format!(
238                            "Candidate region {} still replaying the wal on datanode {:?}",
239                            region_id, candidate
240                        ),
241                    }
242                    .fail();
243                }
244
245                Ok(())
246            }
247            Err(error::Error::MailboxTimeout { .. }) => {
248                let reason = format!(
249                    "Mailbox received timeout for upgrade candidate region {region_id} on datanode {:?}",
250                    candidate,
251                );
252                error::RetryLaterSnafu { reason }.fail()
253            }
254            Err(err) => Err(err),
255        }
256    }
257
258    /// Upgrades a candidate region.
259    ///
260    /// Returns true if the candidate region is upgraded successfully.
261    async fn upgrade_region_with_retry(
262        &self,
263        ctx: &mut Context,
264        procedure_ctx: &ProcedureContext,
265        wal_options: Option<&WalOptions>,
266    ) -> bool {
267        let mut retry = 0;
268        let mut upgraded = false;
269
270        loop {
271            let timer = Instant::now();
272            // If using Kafka WAL, acquire a read lock on the topic to prevent WAL pruning during the upgrade.
273            let _guard = if let Some(WalOptions::Kafka(kafka_wal_options)) = wal_options {
274                Some(
275                    procedure_ctx
276                        .provider
277                        .acquire_lock(
278                            &(RemoteWalLock::Read(kafka_wal_options.topic.clone()).into()),
279                        )
280                        .await,
281                )
282            } else {
283                None
284            };
285            if let Err(err) = self.upgrade_region(ctx).await {
286                retry += 1;
287                ctx.update_operations_elapsed(timer);
288                if matches!(err, error::Error::ExceededDeadline { .. }) {
289                    error!("Failed to upgrade region, exceeded deadline");
290                    break;
291                } else if err.is_retryable() && retry < self.optimistic_retry {
292                    error!("Failed to upgrade region, error: {err:?}, retry later");
293                    sleep(self.retry_initial_interval).await;
294                } else {
295                    error!("Failed to upgrade region, error: {err:?}");
296                    break;
297                }
298            } else {
299                ctx.update_operations_elapsed(timer);
300                upgraded = true;
301                break;
302            }
303        }
304
305        upgraded
306    }
307}
308
309#[cfg(test)]
310mod tests {
311    use std::assert_matches::assert_matches;
312    use std::collections::HashMap;
313
314    use common_meta::key::table_route::TableRouteValue;
315    use common_meta::key::test_utils::new_test_table_info;
316    use common_meta::peer::Peer;
317    use common_meta::rpc::router::{Region, RegionRoute};
318    use store_api::storage::RegionId;
319
320    use super::*;
321    use crate::error::Error;
322    use crate::procedure::region_migration::manager::RegionMigrationTriggerReason;
323    use crate::procedure::region_migration::test_util::{TestingEnv, new_procedure_context};
324    use crate::procedure::region_migration::{ContextFactory, PersistentContext};
325    use crate::procedure::test_util::{
326        new_close_region_reply, new_upgrade_region_reply, send_mock_reply,
327    };
328
329    fn new_persistent_context() -> PersistentContext {
330        PersistentContext {
331            catalog: "greptime".into(),
332            schema: "public".into(),
333            from_peer: Peer::empty(1),
334            to_peer: Peer::empty(2),
335            region_id: RegionId::new(1024, 1),
336            timeout: Duration::from_millis(1000),
337            trigger_reason: RegionMigrationTriggerReason::Manual,
338        }
339    }
340
341    async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap<u32, String>) {
342        let table_info =
343            new_test_table_info(ctx.persistent_ctx.region_id.table_id(), vec![1]).into();
344        let region_routes = vec![RegionRoute {
345            region: Region::new_test(ctx.persistent_ctx.region_id),
346            leader_peer: Some(ctx.persistent_ctx.from_peer.clone()),
347            follower_peers: vec![ctx.persistent_ctx.to_peer.clone()],
348            ..Default::default()
349        }];
350        ctx.table_metadata_manager
351            .create_table_metadata(
352                table_info,
353                TableRouteValue::physical(region_routes),
354                wal_options,
355            )
356            .await
357            .unwrap();
358    }
359
360    #[tokio::test]
361    async fn test_datanode_is_unreachable() {
362        let state = UpgradeCandidateRegion::default();
363        let persistent_context = new_persistent_context();
364        let env = TestingEnv::new();
365        let mut ctx = env.context_factory().new_context(persistent_context);
366        prepare_table_metadata(&ctx, HashMap::default()).await;
367        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
368
369        assert_matches!(err, Error::PusherNotFound { .. });
370        assert!(!err.is_retryable());
371    }
372
373    #[tokio::test]
374    async fn test_pusher_dropped() {
375        let state = UpgradeCandidateRegion::default();
376        let persistent_context = new_persistent_context();
377        let to_peer_id = persistent_context.to_peer.id;
378
379        let mut env = TestingEnv::new();
380        let mut ctx = env.context_factory().new_context(persistent_context);
381        prepare_table_metadata(&ctx, HashMap::default()).await;
382        let mailbox_ctx = env.mailbox_context();
383
384        let (tx, rx) = tokio::sync::mpsc::channel(1);
385
386        mailbox_ctx
387            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
388            .await;
389
390        drop(rx);
391
392        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
393
394        assert_matches!(err, Error::PushMessage { .. });
395        assert!(!err.is_retryable());
396    }
397
398    #[tokio::test]
399    async fn test_procedure_exceeded_deadline() {
400        let state = UpgradeCandidateRegion::default();
401        let persistent_context = new_persistent_context();
402        let env = TestingEnv::new();
403        let mut ctx = env.context_factory().new_context(persistent_context);
404        prepare_table_metadata(&ctx, HashMap::default()).await;
405        ctx.volatile_ctx.metrics.operations_elapsed =
406            ctx.persistent_ctx.timeout + Duration::from_secs(1);
407
408        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
409
410        assert_matches!(err, Error::ExceededDeadline { .. });
411        assert!(!err.is_retryable());
412    }
413
414    #[tokio::test]
415    async fn test_unexpected_instruction_reply() {
416        let state = UpgradeCandidateRegion::default();
417        let persistent_context = new_persistent_context();
418        let to_peer_id = persistent_context.to_peer.id;
419
420        let mut env = TestingEnv::new();
421        let mut ctx = env.context_factory().new_context(persistent_context);
422        prepare_table_metadata(&ctx, HashMap::default()).await;
423        let mailbox_ctx = env.mailbox_context();
424        let mailbox = mailbox_ctx.mailbox().clone();
425
426        let (tx, rx) = tokio::sync::mpsc::channel(1);
427
428        mailbox_ctx
429            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
430            .await;
431
432        send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
433
434        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
435        assert_matches!(err, Error::UnexpectedInstructionReply { .. });
436        assert!(!err.is_retryable());
437    }
438
439    #[tokio::test]
440    async fn test_upgrade_region_failed() {
441        let state = UpgradeCandidateRegion::default();
442        let persistent_context = new_persistent_context();
443        let to_peer_id = persistent_context.to_peer.id;
444
445        let mut env = TestingEnv::new();
446        let mut ctx = env.context_factory().new_context(persistent_context);
447        prepare_table_metadata(&ctx, HashMap::default()).await;
448        let mailbox_ctx = env.mailbox_context();
449        let mailbox = mailbox_ctx.mailbox().clone();
450
451        let (tx, rx) = tokio::sync::mpsc::channel(1);
452
453        mailbox_ctx
454            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
455            .await;
456
457        // A reply contains an error.
458        send_mock_reply(mailbox, rx, |id| {
459            Ok(new_upgrade_region_reply(
460                id,
461                true,
462                true,
463                Some("test mocked".to_string()),
464            ))
465        });
466
467        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
468
469        assert_matches!(err, Error::RetryLater { .. });
470        assert!(err.is_retryable());
471        assert!(format!("{err:?}").contains("test mocked"));
472    }
473
474    #[tokio::test]
475    async fn test_upgrade_region_not_found() {
476        let state = UpgradeCandidateRegion::default();
477        let persistent_context = new_persistent_context();
478        let to_peer_id = persistent_context.to_peer.id;
479
480        let mut env = TestingEnv::new();
481        let mut ctx = env.context_factory().new_context(persistent_context);
482        prepare_table_metadata(&ctx, HashMap::default()).await;
483        let mailbox_ctx = env.mailbox_context();
484        let mailbox = mailbox_ctx.mailbox().clone();
485
486        let (tx, rx) = tokio::sync::mpsc::channel(1);
487
488        mailbox_ctx
489            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
490            .await;
491
492        send_mock_reply(mailbox, rx, |id| {
493            Ok(new_upgrade_region_reply(id, true, false, None))
494        });
495
496        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
497
498        assert_matches!(err, Error::Unexpected { .. });
499        assert!(!err.is_retryable());
500        assert!(err.to_string().contains("doesn't exist"));
501    }
502
503    #[tokio::test]
504    async fn test_upgrade_region_require_ready() {
505        let mut state = UpgradeCandidateRegion {
506            require_ready: true,
507            ..Default::default()
508        };
509
510        let persistent_context = new_persistent_context();
511        let to_peer_id = persistent_context.to_peer.id;
512
513        let mut env = TestingEnv::new();
514        let mut ctx = env.context_factory().new_context(persistent_context);
515        prepare_table_metadata(&ctx, HashMap::default()).await;
516        let mailbox_ctx = env.mailbox_context();
517        let mailbox = mailbox_ctx.mailbox().clone();
518
519        let (tx, rx) = tokio::sync::mpsc::channel(1);
520
521        mailbox_ctx
522            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
523            .await;
524
525        send_mock_reply(mailbox, rx, |id| {
526            Ok(new_upgrade_region_reply(id, false, true, None))
527        });
528
529        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
530
531        assert_matches!(err, Error::RetryLater { .. });
532        assert!(err.is_retryable());
533        assert!(format!("{err:?}").contains("still replaying the wal"));
534
535        // Sets the `require_ready` to false.
536        state.require_ready = false;
537
538        let mailbox = mailbox_ctx.mailbox().clone();
539        let (tx, rx) = tokio::sync::mpsc::channel(1);
540
541        mailbox_ctx
542            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
543            .await;
544
545        send_mock_reply(mailbox, rx, |id| {
546            Ok(new_upgrade_region_reply(id, false, true, None))
547        });
548
549        state.upgrade_region(&mut ctx).await.unwrap();
550    }
551
552    #[tokio::test]
553    async fn test_upgrade_region_with_retry_ok() {
554        let mut state = Box::<UpgradeCandidateRegion>::default();
555        state.retry_initial_interval = Duration::from_millis(100);
556        let persistent_context = new_persistent_context();
557        let to_peer_id = persistent_context.to_peer.id;
558
559        let mut env = TestingEnv::new();
560        let mut ctx = env.context_factory().new_context(persistent_context);
561        prepare_table_metadata(&ctx, HashMap::default()).await;
562        let mailbox_ctx = env.mailbox_context();
563        let mailbox = mailbox_ctx.mailbox().clone();
564
565        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
566
567        mailbox_ctx
568            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
569            .await;
570
571        common_runtime::spawn_global(async move {
572            let resp = rx.recv().await.unwrap().unwrap();
573            let reply_id = resp.mailbox_message.unwrap().id;
574            mailbox
575                .on_recv(
576                    reply_id,
577                    Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
578                )
579                .await
580                .unwrap();
581
582            // retry: 1
583            let resp = rx.recv().await.unwrap().unwrap();
584            let reply_id = resp.mailbox_message.unwrap().id;
585            mailbox
586                .on_recv(
587                    reply_id,
588                    Ok(new_upgrade_region_reply(reply_id, false, true, None)),
589                )
590                .await
591                .unwrap();
592
593            // retry: 2
594            let resp = rx.recv().await.unwrap().unwrap();
595            let reply_id = resp.mailbox_message.unwrap().id;
596            mailbox
597                .on_recv(
598                    reply_id,
599                    Ok(new_upgrade_region_reply(reply_id, true, true, None)),
600                )
601                .await
602                .unwrap();
603        });
604
605        let procedure_ctx = new_procedure_context();
606        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
607
608        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
609
610        assert_matches!(update_metadata, UpdateMetadata::Upgrade);
611    }
612
613    #[tokio::test]
614    async fn test_upgrade_region_with_retry_failed() {
615        let mut state = Box::<UpgradeCandidateRegion>::default();
616        state.retry_initial_interval = Duration::from_millis(100);
617        let persistent_context = new_persistent_context();
618        let to_peer_id = persistent_context.to_peer.id;
619
620        let mut env = TestingEnv::new();
621        let mut ctx = env.context_factory().new_context(persistent_context);
622        prepare_table_metadata(&ctx, HashMap::default()).await;
623        let mailbox_ctx = env.mailbox_context();
624        let mailbox = mailbox_ctx.mailbox().clone();
625
626        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
627
628        mailbox_ctx
629            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
630            .await;
631
632        common_runtime::spawn_global(async move {
633            let resp = rx.recv().await.unwrap().unwrap();
634            let reply_id = resp.mailbox_message.unwrap().id;
635            mailbox
636                .on_recv(
637                    reply_id,
638                    Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
639                )
640                .await
641                .unwrap();
642
643            // retry: 1
644            let resp = rx.recv().await.unwrap().unwrap();
645            let reply_id = resp.mailbox_message.unwrap().id;
646            mailbox
647                .on_recv(
648                    reply_id,
649                    Ok(new_upgrade_region_reply(reply_id, false, true, None)),
650                )
651                .await
652                .unwrap();
653
654            // retry: 2
655            let resp = rx.recv().await.unwrap().unwrap();
656            let reply_id = resp.mailbox_message.unwrap().id;
657            mailbox
658                .on_recv(
659                    reply_id,
660                    Ok(new_upgrade_region_reply(reply_id, false, false, None)),
661                )
662                .await
663                .unwrap();
664        });
665        let procedure_ctx = new_procedure_context();
666        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
667
668        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
669        assert_matches!(update_metadata, UpdateMetadata::Rollback);
670    }
671
672    #[tokio::test]
673    async fn test_upgrade_region_procedure_exceeded_deadline() {
674        let mut state = Box::<UpgradeCandidateRegion>::default();
675        state.retry_initial_interval = Duration::from_millis(100);
676        let persistent_context = new_persistent_context();
677        let to_peer_id = persistent_context.to_peer.id;
678
679        let mut env = TestingEnv::new();
680        let mut ctx = env.context_factory().new_context(persistent_context);
681        prepare_table_metadata(&ctx, HashMap::default()).await;
682        let mailbox_ctx = env.mailbox_context();
683        let mailbox = mailbox_ctx.mailbox().clone();
684        ctx.volatile_ctx.metrics.operations_elapsed =
685            ctx.persistent_ctx.timeout + Duration::from_secs(1);
686
687        let (tx, rx) = tokio::sync::mpsc::channel(1);
688        mailbox_ctx
689            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
690            .await;
691
692        send_mock_reply(mailbox, rx, |id| {
693            Ok(new_upgrade_region_reply(id, false, true, None))
694        });
695        let procedure_ctx = new_procedure_context();
696        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
697        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
698        assert_matches!(update_metadata, UpdateMetadata::Rollback);
699    }
700}