meta_srv/procedure/region_migration/
upgrade_candidate_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashSet;
17use std::time::Duration;
18
19use api::v1::meta::MailboxMessage;
20use common_meta::ddl::utils::parse_region_wal_options;
21use common_meta::instruction::{
22    Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply, UpgradeRegionsReply,
23};
24use common_meta::key::topic_region::TopicRegionKey;
25use common_meta::lock_key::RemoteWalLock;
26use common_meta::wal_provider::extract_topic_from_wal_options;
27use common_procedure::{Context as ProcedureContext, Status};
28use common_telemetry::tracing_context::TracingContext;
29use common_telemetry::{error, info};
30use common_wal::options::WalOptions;
31use serde::{Deserialize, Serialize};
32use snafu::{OptionExt, ResultExt, ensure};
33use tokio::time::{Instant, sleep};
34
35use crate::error::{self, Result};
36use crate::handler::HeartbeatMailbox;
37use crate::procedure::region_migration::update_metadata::UpdateMetadata;
38use crate::procedure::region_migration::{Context, State};
39use crate::service::mailbox::Channel;
40
41#[derive(Debug, Serialize, Deserialize)]
42pub struct UpgradeCandidateRegion {
43    // The optimistic retry times.
44    pub(crate) optimistic_retry: usize,
45    // The retry initial interval.
46    pub(crate) retry_initial_interval: Duration,
47    // If it's true it requires the candidate region MUST replay the WAL to the latest entry id.
48    // Otherwise, it will rollback to the old leader region.
49    pub(crate) require_ready: bool,
50}
51
52impl Default for UpgradeCandidateRegion {
53    fn default() -> Self {
54        Self {
55            optimistic_retry: 3,
56            retry_initial_interval: Duration::from_millis(500),
57            require_ready: true,
58        }
59    }
60}
61
62#[async_trait::async_trait]
63#[typetag::serde]
64impl State for UpgradeCandidateRegion {
65    async fn next(
66        &mut self,
67        ctx: &mut Context,
68        procedure_ctx: &ProcedureContext,
69    ) -> Result<(Box<dyn State>, Status)> {
70        let now = Instant::now();
71
72        let topics = self.get_kafka_topics(ctx).await?;
73        if self
74            .upgrade_region_with_retry(ctx, procedure_ctx, topics)
75            .await
76        {
77            ctx.update_upgrade_candidate_region_elapsed(now);
78            Ok((Box::new(UpdateMetadata::Upgrade), Status::executing(false)))
79        } else {
80            ctx.update_upgrade_candidate_region_elapsed(now);
81            Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false)))
82        }
83    }
84
85    fn as_any(&self) -> &dyn Any {
86        self
87    }
88}
89
90impl UpgradeCandidateRegion {
91    async fn get_kafka_topics(&self, ctx: &mut Context) -> Result<HashSet<String>> {
92        let table_regions = ctx.persistent_ctx.table_regions();
93        let datanode_table_values = ctx.get_from_peer_datanode_table_values().await?;
94        let mut topics = HashSet::new();
95        for (table_id, regions) in table_regions {
96            let Some(datanode_table_value) = datanode_table_values.get(&table_id) else {
97                continue;
98            };
99
100            let region_wal_options =
101                parse_region_wal_options(&datanode_table_value.region_info.region_wal_options)
102                    .context(error::ParseWalOptionsSnafu)?;
103
104            for region_id in regions {
105                let Some(WalOptions::Kafka(kafka_wal_options)) =
106                    region_wal_options.get(&region_id.region_number())
107                else {
108                    continue;
109                };
110                if !topics.contains(&kafka_wal_options.topic) {
111                    topics.insert(kafka_wal_options.topic.clone());
112                }
113            }
114        }
115
116        Ok(topics)
117    }
118
119    /// Builds upgrade region instruction.
120    async fn build_upgrade_region_instruction(
121        &self,
122        ctx: &mut Context,
123        replay_timeout: Duration,
124    ) -> Result<Instruction> {
125        let region_ids = ctx.persistent_ctx.region_ids.clone();
126        let datanode_table_values = ctx.get_from_peer_datanode_table_values().await?;
127        let mut region_topic = Vec::with_capacity(region_ids.len());
128        for region_id in region_ids.iter() {
129            let table_id = region_id.table_id();
130            if let Some(datanode_table_value) = datanode_table_values.get(&table_id)
131                && let Some(topic) = extract_topic_from_wal_options(
132                    *region_id,
133                    &datanode_table_value.region_info.region_wal_options,
134                )
135            {
136                region_topic.push((*region_id, topic));
137            }
138        }
139
140        let replay_checkpoints = ctx
141            .get_replay_checkpoints(
142                region_topic
143                    .iter()
144                    .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic))
145                    .collect(),
146            )
147            .await?;
148        // Build upgrade regions instruction.
149        let mut upgrade_regions = Vec::with_capacity(region_ids.len());
150        for region_id in region_ids {
151            let last_entry_id = ctx
152                .volatile_ctx
153                .leader_region_last_entry_ids
154                .get(&region_id)
155                .copied();
156            let metadata_last_entry_id = ctx
157                .volatile_ctx
158                .leader_region_metadata_last_entry_ids
159                .get(&region_id)
160                .copied();
161            let checkpoint = replay_checkpoints.get(&region_id).copied();
162            upgrade_regions.push(UpgradeRegion {
163                region_id,
164                last_entry_id,
165                metadata_last_entry_id,
166                replay_timeout,
167                location_id: Some(ctx.persistent_ctx.from_peer.id),
168                replay_entry_id: checkpoint.map(|c| c.entry_id),
169                metadata_replay_entry_id: checkpoint.and_then(|c| c.metadata_entry_id),
170            });
171        }
172
173        Ok(Instruction::UpgradeRegions(upgrade_regions))
174    }
175
176    fn handle_upgrade_region_reply(
177        &self,
178        ctx: &mut Context,
179        UpgradeRegionReply {
180            region_id,
181            ready,
182            exists,
183            error,
184        }: &UpgradeRegionReply,
185        now: &Instant,
186    ) -> Result<()> {
187        let candidate = &ctx.persistent_ctx.to_peer;
188        if error.is_some() {
189            return error::RetryLaterSnafu {
190                reason: format!(
191                    "Failed to upgrade the region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
192                    region_id,
193                    candidate,
194                    error,
195                    now.elapsed()
196                ),
197            }
198            .fail();
199        }
200
201        ensure!(
202            exists,
203            error::UnexpectedSnafu {
204                violated: format!(
205                    "Candidate region {} doesn't exist on datanode {:?}",
206                    region_id, candidate
207                )
208            }
209        );
210
211        if self.require_ready && !ready {
212            return error::RetryLaterSnafu {
213                reason: format!(
214                    "Candidate region {} still replaying the wal on datanode {:?}, elapsed: {:?}",
215                    region_id,
216                    candidate,
217                    now.elapsed()
218                ),
219            }
220            .fail();
221        }
222
223        Ok(())
224    }
225
226    /// Tries to upgrade a candidate region.
227    ///
228    /// Retry:
229    /// - If `require_ready` is true, but the candidate region returns `ready` is false.
230    /// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout.
231    ///
232    /// Abort:
233    /// - The candidate region doesn't exist.
234    /// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable.
235    /// - [PushMessage](error::Error::PushMessage), The receiver is dropped.
236    /// - [MailboxReceiver](error::Error::MailboxReceiver), The sender is dropped without sending (impossible).
237    /// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply) (impossible).
238    /// - [ExceededDeadline](error::Error::ExceededDeadline)
239    /// - Invalid JSON (impossible).
240    async fn upgrade_region(&self, ctx: &mut Context) -> Result<()> {
241        let operation_timeout =
242            ctx.next_operation_timeout()
243                .context(error::ExceededDeadlineSnafu {
244                    operation: "Upgrade region",
245                })?;
246        let upgrade_instruction = self
247            .build_upgrade_region_instruction(ctx, operation_timeout)
248            .await?;
249
250        let pc = &ctx.persistent_ctx;
251        let region_ids = &pc.region_ids;
252        let candidate = &pc.to_peer;
253
254        let tracing_ctx = TracingContext::from_current_span();
255        let msg = MailboxMessage::json_message(
256            &format!("Upgrade candidate regions: {:?}", region_ids),
257            &format!("Metasrv@{}", ctx.server_addr()),
258            &format!("Datanode-{}@{}", candidate.id, candidate.addr),
259            common_time::util::current_time_millis(),
260            &upgrade_instruction,
261            Some(tracing_ctx.to_w3c()),
262        )
263        .with_context(|_| error::SerializeToJsonSnafu {
264            input: upgrade_instruction.to_string(),
265        })?;
266
267        let ch = Channel::Datanode(candidate.id);
268        let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
269
270        let now = Instant::now();
271        match receiver.await {
272            Ok(msg) => {
273                let reply = HeartbeatMailbox::json_reply(&msg)?;
274                info!(
275                    "Received upgrade region reply: {:?}, regions: {:?}, elapsed: {:?}",
276                    reply,
277                    region_ids,
278                    now.elapsed()
279                );
280                let InstructionReply::UpgradeRegions(UpgradeRegionsReply { replies }) = reply
281                else {
282                    return error::UnexpectedInstructionReplySnafu {
283                        mailbox_message: msg.to_string(),
284                        reason: "Unexpected reply of the upgrade region instruction",
285                    }
286                    .fail();
287                };
288                for reply in replies {
289                    self.handle_upgrade_region_reply(ctx, &reply, &now)?;
290                }
291                Ok(())
292            }
293            Err(error::Error::MailboxTimeout { .. }) => {
294                let reason = format!(
295                    "Mailbox received timeout for upgrade candidate regions {region_ids:?} on datanode {:?}, elapsed: {:?}",
296                    candidate,
297                    now.elapsed()
298                );
299                error::RetryLaterSnafu { reason }.fail()
300            }
301            Err(err) => Err(err),
302        }
303    }
304
305    /// Upgrades a candidate region.
306    ///
307    /// Returns true if the candidate region is upgraded successfully.
308    async fn upgrade_region_with_retry(
309        &self,
310        ctx: &mut Context,
311        procedure_ctx: &ProcedureContext,
312        topics: HashSet<String>,
313    ) -> bool {
314        let mut retry = 0;
315        let mut upgraded = false;
316
317        let mut guards = Vec::with_capacity(topics.len());
318        loop {
319            let timer = Instant::now();
320            // If using Kafka WAL, acquire a read lock on the topic to prevent WAL pruning during the upgrade.
321            for topic in &topics {
322                guards.push(
323                    procedure_ctx
324                        .provider
325                        .acquire_lock(&(RemoteWalLock::Read(topic.clone()).into()))
326                        .await,
327                );
328            }
329
330            if let Err(err) = self.upgrade_region(ctx).await {
331                retry += 1;
332                ctx.update_operations_elapsed(timer);
333                if matches!(err, error::Error::ExceededDeadline { .. }) {
334                    error!("Failed to upgrade region, exceeded deadline");
335                    break;
336                } else if err.is_retryable() && retry < self.optimistic_retry {
337                    error!("Failed to upgrade region, error: {err:?}, retry later");
338                    sleep(self.retry_initial_interval).await;
339                } else {
340                    error!("Failed to upgrade region, error: {err:?}");
341                    break;
342                }
343            } else {
344                ctx.update_operations_elapsed(timer);
345                upgraded = true;
346                break;
347            }
348        }
349
350        upgraded
351    }
352}
353
354#[cfg(test)]
355mod tests {
356    use std::assert_matches::assert_matches;
357    use std::collections::HashMap;
358
359    use common_meta::key::table_route::TableRouteValue;
360    use common_meta::key::test_utils::new_test_table_info;
361    use common_meta::peer::Peer;
362    use common_meta::rpc::router::{Region, RegionRoute};
363    use store_api::storage::RegionId;
364
365    use super::*;
366    use crate::error::Error;
367    use crate::procedure::region_migration::manager::RegionMigrationTriggerReason;
368    use crate::procedure::region_migration::test_util::{TestingEnv, new_procedure_context};
369    use crate::procedure::region_migration::{ContextFactory, PersistentContext};
370    use crate::procedure::test_util::{
371        new_close_region_reply, new_upgrade_region_reply, send_mock_reply,
372    };
373
374    fn new_persistent_context() -> PersistentContext {
375        PersistentContext::new(
376            vec![("greptime".into(), "public".into())],
377            Peer::empty(1),
378            Peer::empty(2),
379            vec![RegionId::new(1024, 1)],
380            Duration::from_millis(1000),
381            RegionMigrationTriggerReason::Manual,
382        )
383    }
384
385    async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap<u32, String>) {
386        let region_id = ctx.persistent_ctx.region_ids[0];
387        let table_info = new_test_table_info(region_id.table_id());
388        let region_routes = vec![RegionRoute {
389            region: Region::new_test(region_id),
390            leader_peer: Some(ctx.persistent_ctx.from_peer.clone()),
391            follower_peers: vec![ctx.persistent_ctx.to_peer.clone()],
392            ..Default::default()
393        }];
394        ctx.table_metadata_manager
395            .create_table_metadata(
396                table_info,
397                TableRouteValue::physical(region_routes),
398                wal_options,
399            )
400            .await
401            .unwrap();
402    }
403
404    #[tokio::test]
405    async fn test_datanode_is_unreachable() {
406        let state = UpgradeCandidateRegion::default();
407        let persistent_context = new_persistent_context();
408        let env = TestingEnv::new();
409        let mut ctx = env.context_factory().new_context(persistent_context);
410        prepare_table_metadata(&ctx, HashMap::default()).await;
411        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
412
413        assert_matches!(err, Error::PusherNotFound { .. });
414        assert!(!err.is_retryable());
415    }
416
417    #[tokio::test]
418    async fn test_pusher_dropped() {
419        let state = UpgradeCandidateRegion::default();
420        let persistent_context = new_persistent_context();
421        let to_peer_id = persistent_context.to_peer.id;
422
423        let mut env = TestingEnv::new();
424        let mut ctx = env.context_factory().new_context(persistent_context);
425        prepare_table_metadata(&ctx, HashMap::default()).await;
426        let mailbox_ctx = env.mailbox_context();
427
428        let (tx, rx) = tokio::sync::mpsc::channel(1);
429
430        mailbox_ctx
431            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
432            .await;
433
434        drop(rx);
435
436        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
437
438        assert_matches!(err, Error::PushMessage { .. });
439        assert!(!err.is_retryable());
440    }
441
442    #[tokio::test]
443    async fn test_procedure_exceeded_deadline() {
444        let state = UpgradeCandidateRegion::default();
445        let persistent_context = new_persistent_context();
446        let env = TestingEnv::new();
447        let mut ctx = env.context_factory().new_context(persistent_context);
448        prepare_table_metadata(&ctx, HashMap::default()).await;
449        ctx.volatile_ctx.metrics.operations_elapsed =
450            ctx.persistent_ctx.timeout + Duration::from_secs(1);
451
452        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
453
454        assert_matches!(err, Error::ExceededDeadline { .. });
455        assert!(!err.is_retryable());
456    }
457
458    #[tokio::test]
459    async fn test_unexpected_instruction_reply() {
460        let state = UpgradeCandidateRegion::default();
461        let persistent_context = new_persistent_context();
462        let to_peer_id = persistent_context.to_peer.id;
463
464        let mut env = TestingEnv::new();
465        let mut ctx = env.context_factory().new_context(persistent_context);
466        prepare_table_metadata(&ctx, HashMap::default()).await;
467        let mailbox_ctx = env.mailbox_context();
468        let mailbox = mailbox_ctx.mailbox().clone();
469
470        let (tx, rx) = tokio::sync::mpsc::channel(1);
471
472        mailbox_ctx
473            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
474            .await;
475
476        send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
477
478        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
479        assert_matches!(err, Error::UnexpectedInstructionReply { .. });
480        assert!(!err.is_retryable());
481    }
482
483    #[tokio::test]
484    async fn test_upgrade_region_failed() {
485        let state = UpgradeCandidateRegion::default();
486        let persistent_context = new_persistent_context();
487        let to_peer_id = persistent_context.to_peer.id;
488
489        let mut env = TestingEnv::new();
490        let mut ctx = env.context_factory().new_context(persistent_context);
491        prepare_table_metadata(&ctx, HashMap::default()).await;
492        let mailbox_ctx = env.mailbox_context();
493        let mailbox = mailbox_ctx.mailbox().clone();
494
495        let (tx, rx) = tokio::sync::mpsc::channel(1);
496
497        mailbox_ctx
498            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
499            .await;
500
501        // A reply contains an error.
502        send_mock_reply(mailbox, rx, |id| {
503            Ok(new_upgrade_region_reply(
504                id,
505                true,
506                true,
507                Some("test mocked".to_string()),
508            ))
509        });
510
511        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
512
513        assert_matches!(err, Error::RetryLater { .. });
514        assert!(err.is_retryable());
515        assert!(format!("{err:?}").contains("test mocked"));
516    }
517
518    #[tokio::test]
519    async fn test_upgrade_region_not_found() {
520        let state = UpgradeCandidateRegion::default();
521        let persistent_context = new_persistent_context();
522        let to_peer_id = persistent_context.to_peer.id;
523
524        let mut env = TestingEnv::new();
525        let mut ctx = env.context_factory().new_context(persistent_context);
526        prepare_table_metadata(&ctx, HashMap::default()).await;
527        let mailbox_ctx = env.mailbox_context();
528        let mailbox = mailbox_ctx.mailbox().clone();
529
530        let (tx, rx) = tokio::sync::mpsc::channel(1);
531
532        mailbox_ctx
533            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
534            .await;
535
536        send_mock_reply(mailbox, rx, |id| {
537            Ok(new_upgrade_region_reply(id, true, false, None))
538        });
539
540        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
541
542        assert_matches!(err, Error::Unexpected { .. });
543        assert!(!err.is_retryable());
544        assert!(err.to_string().contains("doesn't exist"));
545    }
546
547    #[tokio::test]
548    async fn test_upgrade_region_require_ready() {
549        let mut state = UpgradeCandidateRegion {
550            require_ready: true,
551            ..Default::default()
552        };
553
554        let persistent_context = new_persistent_context();
555        let to_peer_id = persistent_context.to_peer.id;
556
557        let mut env = TestingEnv::new();
558        let mut ctx = env.context_factory().new_context(persistent_context);
559        prepare_table_metadata(&ctx, HashMap::default()).await;
560        let mailbox_ctx = env.mailbox_context();
561        let mailbox = mailbox_ctx.mailbox().clone();
562
563        let (tx, rx) = tokio::sync::mpsc::channel(1);
564
565        mailbox_ctx
566            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
567            .await;
568
569        send_mock_reply(mailbox, rx, |id| {
570            Ok(new_upgrade_region_reply(id, false, true, None))
571        });
572
573        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
574
575        assert_matches!(err, Error::RetryLater { .. });
576        assert!(err.is_retryable());
577        assert!(format!("{err:?}").contains("still replaying the wal"));
578
579        // Sets the `require_ready` to false.
580        state.require_ready = false;
581
582        let mailbox = mailbox_ctx.mailbox().clone();
583        let (tx, rx) = tokio::sync::mpsc::channel(1);
584
585        mailbox_ctx
586            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
587            .await;
588
589        send_mock_reply(mailbox, rx, |id| {
590            Ok(new_upgrade_region_reply(id, false, true, None))
591        });
592
593        state.upgrade_region(&mut ctx).await.unwrap();
594    }
595
596    #[tokio::test]
597    async fn test_upgrade_region_with_retry_ok() {
598        let mut state = Box::<UpgradeCandidateRegion>::default();
599        state.retry_initial_interval = Duration::from_millis(100);
600        let persistent_context = new_persistent_context();
601        let to_peer_id = persistent_context.to_peer.id;
602
603        let mut env = TestingEnv::new();
604        let mut ctx = env.context_factory().new_context(persistent_context);
605        prepare_table_metadata(&ctx, HashMap::default()).await;
606        let mailbox_ctx = env.mailbox_context();
607        let mailbox = mailbox_ctx.mailbox().clone();
608
609        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
610
611        mailbox_ctx
612            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
613            .await;
614
615        common_runtime::spawn_global(async move {
616            let resp = rx.recv().await.unwrap().unwrap();
617            let reply_id = resp.mailbox_message.unwrap().id;
618            mailbox
619                .on_recv(
620                    reply_id,
621                    Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
622                )
623                .await
624                .unwrap();
625
626            // retry: 1
627            let resp = rx.recv().await.unwrap().unwrap();
628            let reply_id = resp.mailbox_message.unwrap().id;
629            mailbox
630                .on_recv(
631                    reply_id,
632                    Ok(new_upgrade_region_reply(reply_id, false, true, None)),
633                )
634                .await
635                .unwrap();
636
637            // retry: 2
638            let resp = rx.recv().await.unwrap().unwrap();
639            let reply_id = resp.mailbox_message.unwrap().id;
640            mailbox
641                .on_recv(
642                    reply_id,
643                    Ok(new_upgrade_region_reply(reply_id, true, true, None)),
644                )
645                .await
646                .unwrap();
647        });
648
649        let procedure_ctx = new_procedure_context();
650        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
651
652        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
653
654        assert_matches!(update_metadata, UpdateMetadata::Upgrade);
655    }
656
657    #[tokio::test]
658    async fn test_upgrade_region_with_retry_failed() {
659        let mut state = Box::<UpgradeCandidateRegion>::default();
660        state.retry_initial_interval = Duration::from_millis(100);
661        let persistent_context = new_persistent_context();
662        let to_peer_id = persistent_context.to_peer.id;
663
664        let mut env = TestingEnv::new();
665        let mut ctx = env.context_factory().new_context(persistent_context);
666        prepare_table_metadata(&ctx, HashMap::default()).await;
667        let mailbox_ctx = env.mailbox_context();
668        let mailbox = mailbox_ctx.mailbox().clone();
669
670        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
671
672        mailbox_ctx
673            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
674            .await;
675
676        common_runtime::spawn_global(async move {
677            let resp = rx.recv().await.unwrap().unwrap();
678            let reply_id = resp.mailbox_message.unwrap().id;
679            mailbox
680                .on_recv(
681                    reply_id,
682                    Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
683                )
684                .await
685                .unwrap();
686
687            // retry: 1
688            let resp = rx.recv().await.unwrap().unwrap();
689            let reply_id = resp.mailbox_message.unwrap().id;
690            mailbox
691                .on_recv(
692                    reply_id,
693                    Ok(new_upgrade_region_reply(reply_id, false, true, None)),
694                )
695                .await
696                .unwrap();
697
698            // retry: 2
699            let resp = rx.recv().await.unwrap().unwrap();
700            let reply_id = resp.mailbox_message.unwrap().id;
701            mailbox
702                .on_recv(
703                    reply_id,
704                    Ok(new_upgrade_region_reply(reply_id, false, false, None)),
705                )
706                .await
707                .unwrap();
708        });
709        let procedure_ctx = new_procedure_context();
710        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
711
712        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
713        assert_matches!(update_metadata, UpdateMetadata::Rollback);
714    }
715
716    #[tokio::test]
717    async fn test_upgrade_region_procedure_exceeded_deadline() {
718        let mut state = Box::<UpgradeCandidateRegion>::default();
719        state.retry_initial_interval = Duration::from_millis(100);
720        let persistent_context = new_persistent_context();
721        let to_peer_id = persistent_context.to_peer.id;
722
723        let mut env = TestingEnv::new();
724        let mut ctx = env.context_factory().new_context(persistent_context);
725        prepare_table_metadata(&ctx, HashMap::default()).await;
726        let mailbox_ctx = env.mailbox_context();
727        let mailbox = mailbox_ctx.mailbox().clone();
728        ctx.volatile_ctx.metrics.operations_elapsed =
729            ctx.persistent_ctx.timeout + Duration::from_secs(1);
730
731        let (tx, rx) = tokio::sync::mpsc::channel(1);
732        mailbox_ctx
733            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
734            .await;
735
736        send_mock_reply(mailbox, rx, |id| {
737            Ok(new_upgrade_region_reply(id, false, true, None))
738        });
739        let procedure_ctx = new_procedure_context();
740        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
741        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
742        assert_matches!(update_metadata, UpdateMetadata::Rollback);
743    }
744}