meta_srv/procedure/region_migration/
upgrade_candidate_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashSet;
17use std::time::Duration;
18
19use api::v1::meta::MailboxMessage;
20use common_meta::ddl::utils::parse_region_wal_options;
21use common_meta::instruction::{
22    Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply, UpgradeRegionsReply,
23};
24use common_meta::key::topic_region::TopicRegionKey;
25use common_meta::lock_key::RemoteWalLock;
26use common_meta::wal_options_allocator::extract_topic_from_wal_options;
27use common_procedure::{Context as ProcedureContext, Status};
28use common_telemetry::{error, info};
29use common_wal::options::WalOptions;
30use serde::{Deserialize, Serialize};
31use snafu::{OptionExt, ResultExt, ensure};
32use tokio::time::{Instant, sleep};
33
34use crate::error::{self, Result};
35use crate::handler::HeartbeatMailbox;
36use crate::procedure::region_migration::update_metadata::UpdateMetadata;
37use crate::procedure::region_migration::{Context, State};
38use crate::service::mailbox::Channel;
39
40#[derive(Debug, Serialize, Deserialize)]
41pub struct UpgradeCandidateRegion {
42    // The optimistic retry times.
43    pub(crate) optimistic_retry: usize,
44    // The retry initial interval.
45    pub(crate) retry_initial_interval: Duration,
46    // If it's true it requires the candidate region MUST replay the WAL to the latest entry id.
47    // Otherwise, it will rollback to the old leader region.
48    pub(crate) require_ready: bool,
49}
50
51impl Default for UpgradeCandidateRegion {
52    fn default() -> Self {
53        Self {
54            optimistic_retry: 3,
55            retry_initial_interval: Duration::from_millis(500),
56            require_ready: true,
57        }
58    }
59}
60
61#[async_trait::async_trait]
62#[typetag::serde]
63impl State for UpgradeCandidateRegion {
64    async fn next(
65        &mut self,
66        ctx: &mut Context,
67        procedure_ctx: &ProcedureContext,
68    ) -> Result<(Box<dyn State>, Status)> {
69        let now = Instant::now();
70
71        let topics = self.get_kafka_topics(ctx).await?;
72        if self
73            .upgrade_region_with_retry(ctx, procedure_ctx, topics)
74            .await
75        {
76            ctx.update_upgrade_candidate_region_elapsed(now);
77            Ok((Box::new(UpdateMetadata::Upgrade), Status::executing(false)))
78        } else {
79            ctx.update_upgrade_candidate_region_elapsed(now);
80            Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false)))
81        }
82    }
83
84    fn as_any(&self) -> &dyn Any {
85        self
86    }
87}
88
89impl UpgradeCandidateRegion {
90    async fn get_kafka_topics(&self, ctx: &mut Context) -> Result<HashSet<String>> {
91        let table_regions = ctx.persistent_ctx.table_regions();
92        let datanode_table_values = ctx.get_from_peer_datanode_table_values().await?;
93        let mut topics = HashSet::new();
94        for (table_id, regions) in table_regions {
95            let Some(datanode_table_value) = datanode_table_values.get(&table_id) else {
96                continue;
97            };
98
99            let region_wal_options =
100                parse_region_wal_options(&datanode_table_value.region_info.region_wal_options)
101                    .context(error::ParseWalOptionsSnafu)?;
102
103            for region_id in regions {
104                let Some(WalOptions::Kafka(kafka_wal_options)) =
105                    region_wal_options.get(&region_id.region_number())
106                else {
107                    continue;
108                };
109                if !topics.contains(&kafka_wal_options.topic) {
110                    topics.insert(kafka_wal_options.topic.clone());
111                }
112            }
113        }
114
115        Ok(topics)
116    }
117
118    /// Builds upgrade region instruction.
119    async fn build_upgrade_region_instruction(
120        &self,
121        ctx: &mut Context,
122        replay_timeout: Duration,
123    ) -> Result<Instruction> {
124        let region_ids = ctx.persistent_ctx.region_ids.clone();
125        let datanode_table_values = ctx.get_from_peer_datanode_table_values().await?;
126        let mut region_topic = Vec::with_capacity(region_ids.len());
127        for region_id in region_ids.iter() {
128            let table_id = region_id.table_id();
129            if let Some(datanode_table_value) = datanode_table_values.get(&table_id)
130                && let Some(topic) = extract_topic_from_wal_options(
131                    *region_id,
132                    &datanode_table_value.region_info.region_wal_options,
133                )
134            {
135                region_topic.push((*region_id, topic));
136            }
137        }
138
139        let replay_checkpoints = ctx
140            .get_replay_checkpoints(
141                region_topic
142                    .iter()
143                    .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic))
144                    .collect(),
145            )
146            .await?;
147        // Build upgrade regions instruction.
148        let mut upgrade_regions = Vec::with_capacity(region_ids.len());
149        for region_id in region_ids {
150            let last_entry_id = ctx
151                .volatile_ctx
152                .leader_region_last_entry_ids
153                .get(&region_id)
154                .copied();
155            let metadata_last_entry_id = ctx
156                .volatile_ctx
157                .leader_region_metadata_last_entry_ids
158                .get(&region_id)
159                .copied();
160            let checkpoint = replay_checkpoints.get(&region_id).copied();
161            upgrade_regions.push(UpgradeRegion {
162                region_id,
163                last_entry_id,
164                metadata_last_entry_id,
165                replay_timeout,
166                location_id: Some(ctx.persistent_ctx.from_peer.id),
167                replay_entry_id: checkpoint.map(|c| c.entry_id),
168                metadata_replay_entry_id: checkpoint.and_then(|c| c.metadata_entry_id),
169            });
170        }
171
172        Ok(Instruction::UpgradeRegions(upgrade_regions))
173    }
174
175    fn handle_upgrade_region_reply(
176        &self,
177        ctx: &mut Context,
178        UpgradeRegionReply {
179            region_id,
180            ready,
181            exists,
182            error,
183        }: &UpgradeRegionReply,
184        now: &Instant,
185    ) -> Result<()> {
186        let candidate = &ctx.persistent_ctx.to_peer;
187        if error.is_some() {
188            return error::RetryLaterSnafu {
189                reason: format!(
190                    "Failed to upgrade the region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
191                    region_id,
192                    candidate,
193                    error,
194                    now.elapsed()
195                ),
196            }
197            .fail();
198        }
199
200        ensure!(
201            exists,
202            error::UnexpectedSnafu {
203                violated: format!(
204                    "Candidate region {} doesn't exist on datanode {:?}",
205                    region_id, candidate
206                )
207            }
208        );
209
210        if self.require_ready && !ready {
211            return error::RetryLaterSnafu {
212                reason: format!(
213                    "Candidate region {} still replaying the wal on datanode {:?}, elapsed: {:?}",
214                    region_id,
215                    candidate,
216                    now.elapsed()
217                ),
218            }
219            .fail();
220        }
221
222        Ok(())
223    }
224
225    /// Tries to upgrade a candidate region.
226    ///
227    /// Retry:
228    /// - If `require_ready` is true, but the candidate region returns `ready` is false.
229    /// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout.
230    ///
231    /// Abort:
232    /// - The candidate region doesn't exist.
233    /// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable.
234    /// - [PushMessage](error::Error::PushMessage), The receiver is dropped.
235    /// - [MailboxReceiver](error::Error::MailboxReceiver), The sender is dropped without sending (impossible).
236    /// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply) (impossible).
237    /// - [ExceededDeadline](error::Error::ExceededDeadline)
238    /// - Invalid JSON (impossible).
239    async fn upgrade_region(&self, ctx: &mut Context) -> Result<()> {
240        let operation_timeout =
241            ctx.next_operation_timeout()
242                .context(error::ExceededDeadlineSnafu {
243                    operation: "Upgrade region",
244                })?;
245        let upgrade_instruction = self
246            .build_upgrade_region_instruction(ctx, operation_timeout)
247            .await?;
248
249        let pc = &ctx.persistent_ctx;
250        let region_ids = &pc.region_ids;
251        let candidate = &pc.to_peer;
252
253        let msg = MailboxMessage::json_message(
254            &format!("Upgrade candidate regions: {:?}", region_ids),
255            &format!("Metasrv@{}", ctx.server_addr()),
256            &format!("Datanode-{}@{}", candidate.id, candidate.addr),
257            common_time::util::current_time_millis(),
258            &upgrade_instruction,
259        )
260        .with_context(|_| error::SerializeToJsonSnafu {
261            input: upgrade_instruction.to_string(),
262        })?;
263
264        let ch = Channel::Datanode(candidate.id);
265        let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
266
267        let now = Instant::now();
268        match receiver.await {
269            Ok(msg) => {
270                let reply = HeartbeatMailbox::json_reply(&msg)?;
271                info!(
272                    "Received upgrade region reply: {:?}, regions: {:?}, elapsed: {:?}",
273                    reply,
274                    region_ids,
275                    now.elapsed()
276                );
277                let InstructionReply::UpgradeRegions(UpgradeRegionsReply { replies }) = reply
278                else {
279                    return error::UnexpectedInstructionReplySnafu {
280                        mailbox_message: msg.to_string(),
281                        reason: "Unexpected reply of the upgrade region instruction",
282                    }
283                    .fail();
284                };
285                for reply in replies {
286                    self.handle_upgrade_region_reply(ctx, &reply, &now)?;
287                }
288                Ok(())
289            }
290            Err(error::Error::MailboxTimeout { .. }) => {
291                let reason = format!(
292                    "Mailbox received timeout for upgrade candidate regions {region_ids:?} on datanode {:?}, elapsed: {:?}",
293                    candidate,
294                    now.elapsed()
295                );
296                error::RetryLaterSnafu { reason }.fail()
297            }
298            Err(err) => Err(err),
299        }
300    }
301
302    /// Upgrades a candidate region.
303    ///
304    /// Returns true if the candidate region is upgraded successfully.
305    async fn upgrade_region_with_retry(
306        &self,
307        ctx: &mut Context,
308        procedure_ctx: &ProcedureContext,
309        topics: HashSet<String>,
310    ) -> bool {
311        let mut retry = 0;
312        let mut upgraded = false;
313
314        let mut guards = Vec::with_capacity(topics.len());
315        loop {
316            let timer = Instant::now();
317            // If using Kafka WAL, acquire a read lock on the topic to prevent WAL pruning during the upgrade.
318            for topic in &topics {
319                guards.push(
320                    procedure_ctx
321                        .provider
322                        .acquire_lock(&(RemoteWalLock::Read(topic.clone()).into()))
323                        .await,
324                );
325            }
326
327            if let Err(err) = self.upgrade_region(ctx).await {
328                retry += 1;
329                ctx.update_operations_elapsed(timer);
330                if matches!(err, error::Error::ExceededDeadline { .. }) {
331                    error!("Failed to upgrade region, exceeded deadline");
332                    break;
333                } else if err.is_retryable() && retry < self.optimistic_retry {
334                    error!("Failed to upgrade region, error: {err:?}, retry later");
335                    sleep(self.retry_initial_interval).await;
336                } else {
337                    error!("Failed to upgrade region, error: {err:?}");
338                    break;
339                }
340            } else {
341                ctx.update_operations_elapsed(timer);
342                upgraded = true;
343                break;
344            }
345        }
346
347        upgraded
348    }
349}
350
351#[cfg(test)]
352mod tests {
353    use std::assert_matches::assert_matches;
354    use std::collections::HashMap;
355
356    use common_meta::key::table_route::TableRouteValue;
357    use common_meta::key::test_utils::new_test_table_info;
358    use common_meta::peer::Peer;
359    use common_meta::rpc::router::{Region, RegionRoute};
360    use store_api::storage::RegionId;
361
362    use super::*;
363    use crate::error::Error;
364    use crate::procedure::region_migration::manager::RegionMigrationTriggerReason;
365    use crate::procedure::region_migration::test_util::{TestingEnv, new_procedure_context};
366    use crate::procedure::region_migration::{ContextFactory, PersistentContext};
367    use crate::procedure::test_util::{
368        new_close_region_reply, new_upgrade_region_reply, send_mock_reply,
369    };
370
371    fn new_persistent_context() -> PersistentContext {
372        PersistentContext {
373            catalog: "greptime".into(),
374            schema: "public".into(),
375            from_peer: Peer::empty(1),
376            to_peer: Peer::empty(2),
377            region_ids: vec![RegionId::new(1024, 1)],
378            timeout: Duration::from_millis(1000),
379            trigger_reason: RegionMigrationTriggerReason::Manual,
380        }
381    }
382
383    async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap<u32, String>) {
384        let region_id = ctx.persistent_ctx.region_ids[0];
385        let table_info = new_test_table_info(region_id.table_id(), vec![1]).into();
386        let region_routes = vec![RegionRoute {
387            region: Region::new_test(region_id),
388            leader_peer: Some(ctx.persistent_ctx.from_peer.clone()),
389            follower_peers: vec![ctx.persistent_ctx.to_peer.clone()],
390            ..Default::default()
391        }];
392        ctx.table_metadata_manager
393            .create_table_metadata(
394                table_info,
395                TableRouteValue::physical(region_routes),
396                wal_options,
397            )
398            .await
399            .unwrap();
400    }
401
402    #[tokio::test]
403    async fn test_datanode_is_unreachable() {
404        let state = UpgradeCandidateRegion::default();
405        let persistent_context = new_persistent_context();
406        let env = TestingEnv::new();
407        let mut ctx = env.context_factory().new_context(persistent_context);
408        prepare_table_metadata(&ctx, HashMap::default()).await;
409        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
410
411        assert_matches!(err, Error::PusherNotFound { .. });
412        assert!(!err.is_retryable());
413    }
414
415    #[tokio::test]
416    async fn test_pusher_dropped() {
417        let state = UpgradeCandidateRegion::default();
418        let persistent_context = new_persistent_context();
419        let to_peer_id = persistent_context.to_peer.id;
420
421        let mut env = TestingEnv::new();
422        let mut ctx = env.context_factory().new_context(persistent_context);
423        prepare_table_metadata(&ctx, HashMap::default()).await;
424        let mailbox_ctx = env.mailbox_context();
425
426        let (tx, rx) = tokio::sync::mpsc::channel(1);
427
428        mailbox_ctx
429            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
430            .await;
431
432        drop(rx);
433
434        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
435
436        assert_matches!(err, Error::PushMessage { .. });
437        assert!(!err.is_retryable());
438    }
439
440    #[tokio::test]
441    async fn test_procedure_exceeded_deadline() {
442        let state = UpgradeCandidateRegion::default();
443        let persistent_context = new_persistent_context();
444        let env = TestingEnv::new();
445        let mut ctx = env.context_factory().new_context(persistent_context);
446        prepare_table_metadata(&ctx, HashMap::default()).await;
447        ctx.volatile_ctx.metrics.operations_elapsed =
448            ctx.persistent_ctx.timeout + Duration::from_secs(1);
449
450        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
451
452        assert_matches!(err, Error::ExceededDeadline { .. });
453        assert!(!err.is_retryable());
454    }
455
456    #[tokio::test]
457    async fn test_unexpected_instruction_reply() {
458        let state = UpgradeCandidateRegion::default();
459        let persistent_context = new_persistent_context();
460        let to_peer_id = persistent_context.to_peer.id;
461
462        let mut env = TestingEnv::new();
463        let mut ctx = env.context_factory().new_context(persistent_context);
464        prepare_table_metadata(&ctx, HashMap::default()).await;
465        let mailbox_ctx = env.mailbox_context();
466        let mailbox = mailbox_ctx.mailbox().clone();
467
468        let (tx, rx) = tokio::sync::mpsc::channel(1);
469
470        mailbox_ctx
471            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
472            .await;
473
474        send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
475
476        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
477        assert_matches!(err, Error::UnexpectedInstructionReply { .. });
478        assert!(!err.is_retryable());
479    }
480
481    #[tokio::test]
482    async fn test_upgrade_region_failed() {
483        let state = UpgradeCandidateRegion::default();
484        let persistent_context = new_persistent_context();
485        let to_peer_id = persistent_context.to_peer.id;
486
487        let mut env = TestingEnv::new();
488        let mut ctx = env.context_factory().new_context(persistent_context);
489        prepare_table_metadata(&ctx, HashMap::default()).await;
490        let mailbox_ctx = env.mailbox_context();
491        let mailbox = mailbox_ctx.mailbox().clone();
492
493        let (tx, rx) = tokio::sync::mpsc::channel(1);
494
495        mailbox_ctx
496            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
497            .await;
498
499        // A reply contains an error.
500        send_mock_reply(mailbox, rx, |id| {
501            Ok(new_upgrade_region_reply(
502                id,
503                true,
504                true,
505                Some("test mocked".to_string()),
506            ))
507        });
508
509        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
510
511        assert_matches!(err, Error::RetryLater { .. });
512        assert!(err.is_retryable());
513        assert!(format!("{err:?}").contains("test mocked"));
514    }
515
516    #[tokio::test]
517    async fn test_upgrade_region_not_found() {
518        let state = UpgradeCandidateRegion::default();
519        let persistent_context = new_persistent_context();
520        let to_peer_id = persistent_context.to_peer.id;
521
522        let mut env = TestingEnv::new();
523        let mut ctx = env.context_factory().new_context(persistent_context);
524        prepare_table_metadata(&ctx, HashMap::default()).await;
525        let mailbox_ctx = env.mailbox_context();
526        let mailbox = mailbox_ctx.mailbox().clone();
527
528        let (tx, rx) = tokio::sync::mpsc::channel(1);
529
530        mailbox_ctx
531            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
532            .await;
533
534        send_mock_reply(mailbox, rx, |id| {
535            Ok(new_upgrade_region_reply(id, true, false, None))
536        });
537
538        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
539
540        assert_matches!(err, Error::Unexpected { .. });
541        assert!(!err.is_retryable());
542        assert!(err.to_string().contains("doesn't exist"));
543    }
544
545    #[tokio::test]
546    async fn test_upgrade_region_require_ready() {
547        let mut state = UpgradeCandidateRegion {
548            require_ready: true,
549            ..Default::default()
550        };
551
552        let persistent_context = new_persistent_context();
553        let to_peer_id = persistent_context.to_peer.id;
554
555        let mut env = TestingEnv::new();
556        let mut ctx = env.context_factory().new_context(persistent_context);
557        prepare_table_metadata(&ctx, HashMap::default()).await;
558        let mailbox_ctx = env.mailbox_context();
559        let mailbox = mailbox_ctx.mailbox().clone();
560
561        let (tx, rx) = tokio::sync::mpsc::channel(1);
562
563        mailbox_ctx
564            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
565            .await;
566
567        send_mock_reply(mailbox, rx, |id| {
568            Ok(new_upgrade_region_reply(id, false, true, None))
569        });
570
571        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
572
573        assert_matches!(err, Error::RetryLater { .. });
574        assert!(err.is_retryable());
575        assert!(format!("{err:?}").contains("still replaying the wal"));
576
577        // Sets the `require_ready` to false.
578        state.require_ready = false;
579
580        let mailbox = mailbox_ctx.mailbox().clone();
581        let (tx, rx) = tokio::sync::mpsc::channel(1);
582
583        mailbox_ctx
584            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
585            .await;
586
587        send_mock_reply(mailbox, rx, |id| {
588            Ok(new_upgrade_region_reply(id, false, true, None))
589        });
590
591        state.upgrade_region(&mut ctx).await.unwrap();
592    }
593
594    #[tokio::test]
595    async fn test_upgrade_region_with_retry_ok() {
596        let mut state = Box::<UpgradeCandidateRegion>::default();
597        state.retry_initial_interval = Duration::from_millis(100);
598        let persistent_context = new_persistent_context();
599        let to_peer_id = persistent_context.to_peer.id;
600
601        let mut env = TestingEnv::new();
602        let mut ctx = env.context_factory().new_context(persistent_context);
603        prepare_table_metadata(&ctx, HashMap::default()).await;
604        let mailbox_ctx = env.mailbox_context();
605        let mailbox = mailbox_ctx.mailbox().clone();
606
607        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
608
609        mailbox_ctx
610            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
611            .await;
612
613        common_runtime::spawn_global(async move {
614            let resp = rx.recv().await.unwrap().unwrap();
615            let reply_id = resp.mailbox_message.unwrap().id;
616            mailbox
617                .on_recv(
618                    reply_id,
619                    Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
620                )
621                .await
622                .unwrap();
623
624            // retry: 1
625            let resp = rx.recv().await.unwrap().unwrap();
626            let reply_id = resp.mailbox_message.unwrap().id;
627            mailbox
628                .on_recv(
629                    reply_id,
630                    Ok(new_upgrade_region_reply(reply_id, false, true, None)),
631                )
632                .await
633                .unwrap();
634
635            // retry: 2
636            let resp = rx.recv().await.unwrap().unwrap();
637            let reply_id = resp.mailbox_message.unwrap().id;
638            mailbox
639                .on_recv(
640                    reply_id,
641                    Ok(new_upgrade_region_reply(reply_id, true, true, None)),
642                )
643                .await
644                .unwrap();
645        });
646
647        let procedure_ctx = new_procedure_context();
648        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
649
650        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
651
652        assert_matches!(update_metadata, UpdateMetadata::Upgrade);
653    }
654
655    #[tokio::test]
656    async fn test_upgrade_region_with_retry_failed() {
657        let mut state = Box::<UpgradeCandidateRegion>::default();
658        state.retry_initial_interval = Duration::from_millis(100);
659        let persistent_context = new_persistent_context();
660        let to_peer_id = persistent_context.to_peer.id;
661
662        let mut env = TestingEnv::new();
663        let mut ctx = env.context_factory().new_context(persistent_context);
664        prepare_table_metadata(&ctx, HashMap::default()).await;
665        let mailbox_ctx = env.mailbox_context();
666        let mailbox = mailbox_ctx.mailbox().clone();
667
668        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
669
670        mailbox_ctx
671            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
672            .await;
673
674        common_runtime::spawn_global(async move {
675            let resp = rx.recv().await.unwrap().unwrap();
676            let reply_id = resp.mailbox_message.unwrap().id;
677            mailbox
678                .on_recv(
679                    reply_id,
680                    Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
681                )
682                .await
683                .unwrap();
684
685            // retry: 1
686            let resp = rx.recv().await.unwrap().unwrap();
687            let reply_id = resp.mailbox_message.unwrap().id;
688            mailbox
689                .on_recv(
690                    reply_id,
691                    Ok(new_upgrade_region_reply(reply_id, false, true, None)),
692                )
693                .await
694                .unwrap();
695
696            // retry: 2
697            let resp = rx.recv().await.unwrap().unwrap();
698            let reply_id = resp.mailbox_message.unwrap().id;
699            mailbox
700                .on_recv(
701                    reply_id,
702                    Ok(new_upgrade_region_reply(reply_id, false, false, None)),
703                )
704                .await
705                .unwrap();
706        });
707        let procedure_ctx = new_procedure_context();
708        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
709
710        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
711        assert_matches!(update_metadata, UpdateMetadata::Rollback);
712    }
713
714    #[tokio::test]
715    async fn test_upgrade_region_procedure_exceeded_deadline() {
716        let mut state = Box::<UpgradeCandidateRegion>::default();
717        state.retry_initial_interval = Duration::from_millis(100);
718        let persistent_context = new_persistent_context();
719        let to_peer_id = persistent_context.to_peer.id;
720
721        let mut env = TestingEnv::new();
722        let mut ctx = env.context_factory().new_context(persistent_context);
723        prepare_table_metadata(&ctx, HashMap::default()).await;
724        let mailbox_ctx = env.mailbox_context();
725        let mailbox = mailbox_ctx.mailbox().clone();
726        ctx.volatile_ctx.metrics.operations_elapsed =
727            ctx.persistent_ctx.timeout + Duration::from_secs(1);
728
729        let (tx, rx) = tokio::sync::mpsc::channel(1);
730        mailbox_ctx
731            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
732            .await;
733
734        send_mock_reply(mailbox, rx, |id| {
735            Ok(new_upgrade_region_reply(id, false, true, None))
736        });
737        let procedure_ctx = new_procedure_context();
738        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
739        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
740        assert_matches!(update_metadata, UpdateMetadata::Rollback);
741    }
742}