meta_srv/procedure/region_migration/
upgrade_candidate_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashSet;
17use std::time::Duration;
18
19use api::v1::meta::MailboxMessage;
20use common_meta::ddl::utils::parse_region_wal_options;
21use common_meta::instruction::{
22    Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply, UpgradeRegionsReply,
23};
24use common_meta::key::topic_region::TopicRegionKey;
25use common_meta::lock_key::RemoteWalLock;
26use common_meta::wal_options_allocator::extract_topic_from_wal_options;
27use common_procedure::{Context as ProcedureContext, Status};
28use common_telemetry::{error, info};
29use common_wal::options::WalOptions;
30use serde::{Deserialize, Serialize};
31use snafu::{OptionExt, ResultExt, ensure};
32use tokio::time::{Instant, sleep};
33
34use crate::error::{self, Result};
35use crate::handler::HeartbeatMailbox;
36use crate::procedure::region_migration::update_metadata::UpdateMetadata;
37use crate::procedure::region_migration::{Context, State};
38use crate::service::mailbox::Channel;
39
40#[derive(Debug, Serialize, Deserialize)]
41pub struct UpgradeCandidateRegion {
42    // The optimistic retry times.
43    pub(crate) optimistic_retry: usize,
44    // The retry initial interval.
45    pub(crate) retry_initial_interval: Duration,
46    // If it's true it requires the candidate region MUST replay the WAL to the latest entry id.
47    // Otherwise, it will rollback to the old leader region.
48    pub(crate) require_ready: bool,
49}
50
51impl Default for UpgradeCandidateRegion {
52    fn default() -> Self {
53        Self {
54            optimistic_retry: 3,
55            retry_initial_interval: Duration::from_millis(500),
56            require_ready: true,
57        }
58    }
59}
60
61#[async_trait::async_trait]
62#[typetag::serde]
63impl State for UpgradeCandidateRegion {
64    async fn next(
65        &mut self,
66        ctx: &mut Context,
67        procedure_ctx: &ProcedureContext,
68    ) -> Result<(Box<dyn State>, Status)> {
69        let now = Instant::now();
70
71        let topics = self.get_kafka_topics(ctx).await?;
72        if self
73            .upgrade_region_with_retry(ctx, procedure_ctx, topics)
74            .await
75        {
76            ctx.update_upgrade_candidate_region_elapsed(now);
77            Ok((Box::new(UpdateMetadata::Upgrade), Status::executing(false)))
78        } else {
79            ctx.update_upgrade_candidate_region_elapsed(now);
80            Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false)))
81        }
82    }
83
84    fn as_any(&self) -> &dyn Any {
85        self
86    }
87}
88
89impl UpgradeCandidateRegion {
90    async fn get_kafka_topics(&self, ctx: &mut Context) -> Result<HashSet<String>> {
91        let table_regions = ctx.persistent_ctx.table_regions();
92        let datanode_table_values = ctx.get_from_peer_datanode_table_values().await?;
93        let mut topics = HashSet::new();
94        for (table_id, regions) in table_regions {
95            let Some(datanode_table_value) = datanode_table_values.get(&table_id) else {
96                continue;
97            };
98
99            let region_wal_options =
100                parse_region_wal_options(&datanode_table_value.region_info.region_wal_options)
101                    .context(error::ParseWalOptionsSnafu)?;
102
103            for region_id in regions {
104                let Some(WalOptions::Kafka(kafka_wal_options)) =
105                    region_wal_options.get(&region_id.region_number())
106                else {
107                    continue;
108                };
109                if !topics.contains(&kafka_wal_options.topic) {
110                    topics.insert(kafka_wal_options.topic.clone());
111                }
112            }
113        }
114
115        Ok(topics)
116    }
117
118    /// Builds upgrade region instruction.
119    async fn build_upgrade_region_instruction(
120        &self,
121        ctx: &mut Context,
122        replay_timeout: Duration,
123    ) -> Result<Instruction> {
124        let region_ids = ctx.persistent_ctx.region_ids.clone();
125        let datanode_table_values = ctx.get_from_peer_datanode_table_values().await?;
126        let mut region_topic = Vec::with_capacity(region_ids.len());
127        for region_id in region_ids.iter() {
128            let table_id = region_id.table_id();
129            if let Some(datanode_table_value) = datanode_table_values.get(&table_id)
130                && let Some(topic) = extract_topic_from_wal_options(
131                    *region_id,
132                    &datanode_table_value.region_info.region_wal_options,
133                )
134            {
135                region_topic.push((*region_id, topic));
136            }
137        }
138
139        let replay_checkpoints = ctx
140            .get_replay_checkpoints(
141                region_topic
142                    .iter()
143                    .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic))
144                    .collect(),
145            )
146            .await?;
147        // Build upgrade regions instruction.
148        let mut upgrade_regions = Vec::with_capacity(region_ids.len());
149        for region_id in region_ids {
150            let last_entry_id = ctx
151                .volatile_ctx
152                .leader_region_last_entry_ids
153                .get(&region_id)
154                .copied();
155            let metadata_last_entry_id = ctx
156                .volatile_ctx
157                .leader_region_metadata_last_entry_ids
158                .get(&region_id)
159                .copied();
160            let checkpoint = replay_checkpoints.get(&region_id).copied();
161            upgrade_regions.push(UpgradeRegion {
162                region_id,
163                last_entry_id,
164                metadata_last_entry_id,
165                replay_timeout,
166                location_id: Some(ctx.persistent_ctx.from_peer.id),
167                replay_entry_id: checkpoint.map(|c| c.entry_id),
168                metadata_replay_entry_id: checkpoint.and_then(|c| c.metadata_entry_id),
169            });
170        }
171
172        Ok(Instruction::UpgradeRegions(upgrade_regions))
173    }
174
175    fn handle_upgrade_region_reply(
176        &self,
177        ctx: &mut Context,
178        UpgradeRegionReply {
179            region_id,
180            ready,
181            exists,
182            error,
183        }: &UpgradeRegionReply,
184        now: &Instant,
185    ) -> Result<()> {
186        let candidate = &ctx.persistent_ctx.to_peer;
187        if error.is_some() {
188            return error::RetryLaterSnafu {
189                reason: format!(
190                    "Failed to upgrade the region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
191                    region_id,
192                    candidate,
193                    error,
194                    now.elapsed()
195                ),
196            }
197            .fail();
198        }
199
200        ensure!(
201            exists,
202            error::UnexpectedSnafu {
203                violated: format!(
204                    "Candidate region {} doesn't exist on datanode {:?}",
205                    region_id, candidate
206                )
207            }
208        );
209
210        if self.require_ready && !ready {
211            return error::RetryLaterSnafu {
212                reason: format!(
213                    "Candidate region {} still replaying the wal on datanode {:?}, elapsed: {:?}",
214                    region_id,
215                    candidate,
216                    now.elapsed()
217                ),
218            }
219            .fail();
220        }
221
222        Ok(())
223    }
224
225    /// Tries to upgrade a candidate region.
226    ///
227    /// Retry:
228    /// - If `require_ready` is true, but the candidate region returns `ready` is false.
229    /// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout.
230    ///
231    /// Abort:
232    /// - The candidate region doesn't exist.
233    /// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable.
234    /// - [PushMessage](error::Error::PushMessage), The receiver is dropped.
235    /// - [MailboxReceiver](error::Error::MailboxReceiver), The sender is dropped without sending (impossible).
236    /// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply) (impossible).
237    /// - [ExceededDeadline](error::Error::ExceededDeadline)
238    /// - Invalid JSON (impossible).
239    async fn upgrade_region(&self, ctx: &mut Context) -> Result<()> {
240        let operation_timeout =
241            ctx.next_operation_timeout()
242                .context(error::ExceededDeadlineSnafu {
243                    operation: "Upgrade region",
244                })?;
245        let upgrade_instruction = self
246            .build_upgrade_region_instruction(ctx, operation_timeout)
247            .await?;
248
249        let pc = &ctx.persistent_ctx;
250        let region_ids = &pc.region_ids;
251        let candidate = &pc.to_peer;
252
253        let msg = MailboxMessage::json_message(
254            &format!("Upgrade candidate regions: {:?}", region_ids),
255            &format!("Metasrv@{}", ctx.server_addr()),
256            &format!("Datanode-{}@{}", candidate.id, candidate.addr),
257            common_time::util::current_time_millis(),
258            &upgrade_instruction,
259        )
260        .with_context(|_| error::SerializeToJsonSnafu {
261            input: upgrade_instruction.to_string(),
262        })?;
263
264        let ch = Channel::Datanode(candidate.id);
265        let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
266
267        let now = Instant::now();
268        match receiver.await {
269            Ok(msg) => {
270                let reply = HeartbeatMailbox::json_reply(&msg)?;
271                info!(
272                    "Received upgrade region reply: {:?}, regions: {:?}, elapsed: {:?}",
273                    reply,
274                    region_ids,
275                    now.elapsed()
276                );
277                let InstructionReply::UpgradeRegions(UpgradeRegionsReply { replies }) = reply
278                else {
279                    return error::UnexpectedInstructionReplySnafu {
280                        mailbox_message: msg.to_string(),
281                        reason: "Unexpected reply of the upgrade region instruction",
282                    }
283                    .fail();
284                };
285                for reply in replies {
286                    self.handle_upgrade_region_reply(ctx, &reply, &now)?;
287                }
288                Ok(())
289            }
290            Err(error::Error::MailboxTimeout { .. }) => {
291                let reason = format!(
292                    "Mailbox received timeout for upgrade candidate regions {region_ids:?} on datanode {:?}, elapsed: {:?}",
293                    candidate,
294                    now.elapsed()
295                );
296                error::RetryLaterSnafu { reason }.fail()
297            }
298            Err(err) => Err(err),
299        }
300    }
301
302    /// Upgrades a candidate region.
303    ///
304    /// Returns true if the candidate region is upgraded successfully.
305    async fn upgrade_region_with_retry(
306        &self,
307        ctx: &mut Context,
308        procedure_ctx: &ProcedureContext,
309        topics: HashSet<String>,
310    ) -> bool {
311        let mut retry = 0;
312        let mut upgraded = false;
313
314        let mut guards = Vec::with_capacity(topics.len());
315        loop {
316            let timer = Instant::now();
317            // If using Kafka WAL, acquire a read lock on the topic to prevent WAL pruning during the upgrade.
318            for topic in &topics {
319                guards.push(
320                    procedure_ctx
321                        .provider
322                        .acquire_lock(&(RemoteWalLock::Read(topic.clone()).into()))
323                        .await,
324                );
325            }
326
327            if let Err(err) = self.upgrade_region(ctx).await {
328                retry += 1;
329                ctx.update_operations_elapsed(timer);
330                if matches!(err, error::Error::ExceededDeadline { .. }) {
331                    error!("Failed to upgrade region, exceeded deadline");
332                    break;
333                } else if err.is_retryable() && retry < self.optimistic_retry {
334                    error!("Failed to upgrade region, error: {err:?}, retry later");
335                    sleep(self.retry_initial_interval).await;
336                } else {
337                    error!("Failed to upgrade region, error: {err:?}");
338                    break;
339                }
340            } else {
341                ctx.update_operations_elapsed(timer);
342                upgraded = true;
343                break;
344            }
345        }
346
347        upgraded
348    }
349}
350
351#[cfg(test)]
352mod tests {
353    use std::assert_matches::assert_matches;
354    use std::collections::HashMap;
355
356    use common_meta::key::table_route::TableRouteValue;
357    use common_meta::key::test_utils::new_test_table_info;
358    use common_meta::peer::Peer;
359    use common_meta::rpc::router::{Region, RegionRoute};
360    use store_api::storage::RegionId;
361
362    use super::*;
363    use crate::error::Error;
364    use crate::procedure::region_migration::manager::RegionMigrationTriggerReason;
365    use crate::procedure::region_migration::test_util::{TestingEnv, new_procedure_context};
366    use crate::procedure::region_migration::{ContextFactory, PersistentContext};
367    use crate::procedure::test_util::{
368        new_close_region_reply, new_upgrade_region_reply, send_mock_reply,
369    };
370
371    fn new_persistent_context() -> PersistentContext {
372        PersistentContext::new(
373            vec![("greptime".into(), "public".into())],
374            Peer::empty(1),
375            Peer::empty(2),
376            vec![RegionId::new(1024, 1)],
377            Duration::from_millis(1000),
378            RegionMigrationTriggerReason::Manual,
379        )
380    }
381
382    async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap<u32, String>) {
383        let region_id = ctx.persistent_ctx.region_ids[0];
384        let table_info = new_test_table_info(region_id.table_id(), vec![1]).into();
385        let region_routes = vec![RegionRoute {
386            region: Region::new_test(region_id),
387            leader_peer: Some(ctx.persistent_ctx.from_peer.clone()),
388            follower_peers: vec![ctx.persistent_ctx.to_peer.clone()],
389            ..Default::default()
390        }];
391        ctx.table_metadata_manager
392            .create_table_metadata(
393                table_info,
394                TableRouteValue::physical(region_routes),
395                wal_options,
396            )
397            .await
398            .unwrap();
399    }
400
401    #[tokio::test]
402    async fn test_datanode_is_unreachable() {
403        let state = UpgradeCandidateRegion::default();
404        let persistent_context = new_persistent_context();
405        let env = TestingEnv::new();
406        let mut ctx = env.context_factory().new_context(persistent_context);
407        prepare_table_metadata(&ctx, HashMap::default()).await;
408        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
409
410        assert_matches!(err, Error::PusherNotFound { .. });
411        assert!(!err.is_retryable());
412    }
413
414    #[tokio::test]
415    async fn test_pusher_dropped() {
416        let state = UpgradeCandidateRegion::default();
417        let persistent_context = new_persistent_context();
418        let to_peer_id = persistent_context.to_peer.id;
419
420        let mut env = TestingEnv::new();
421        let mut ctx = env.context_factory().new_context(persistent_context);
422        prepare_table_metadata(&ctx, HashMap::default()).await;
423        let mailbox_ctx = env.mailbox_context();
424
425        let (tx, rx) = tokio::sync::mpsc::channel(1);
426
427        mailbox_ctx
428            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
429            .await;
430
431        drop(rx);
432
433        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
434
435        assert_matches!(err, Error::PushMessage { .. });
436        assert!(!err.is_retryable());
437    }
438
439    #[tokio::test]
440    async fn test_procedure_exceeded_deadline() {
441        let state = UpgradeCandidateRegion::default();
442        let persistent_context = new_persistent_context();
443        let env = TestingEnv::new();
444        let mut ctx = env.context_factory().new_context(persistent_context);
445        prepare_table_metadata(&ctx, HashMap::default()).await;
446        ctx.volatile_ctx.metrics.operations_elapsed =
447            ctx.persistent_ctx.timeout + Duration::from_secs(1);
448
449        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
450
451        assert_matches!(err, Error::ExceededDeadline { .. });
452        assert!(!err.is_retryable());
453    }
454
455    #[tokio::test]
456    async fn test_unexpected_instruction_reply() {
457        let state = UpgradeCandidateRegion::default();
458        let persistent_context = new_persistent_context();
459        let to_peer_id = persistent_context.to_peer.id;
460
461        let mut env = TestingEnv::new();
462        let mut ctx = env.context_factory().new_context(persistent_context);
463        prepare_table_metadata(&ctx, HashMap::default()).await;
464        let mailbox_ctx = env.mailbox_context();
465        let mailbox = mailbox_ctx.mailbox().clone();
466
467        let (tx, rx) = tokio::sync::mpsc::channel(1);
468
469        mailbox_ctx
470            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
471            .await;
472
473        send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
474
475        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
476        assert_matches!(err, Error::UnexpectedInstructionReply { .. });
477        assert!(!err.is_retryable());
478    }
479
480    #[tokio::test]
481    async fn test_upgrade_region_failed() {
482        let state = UpgradeCandidateRegion::default();
483        let persistent_context = new_persistent_context();
484        let to_peer_id = persistent_context.to_peer.id;
485
486        let mut env = TestingEnv::new();
487        let mut ctx = env.context_factory().new_context(persistent_context);
488        prepare_table_metadata(&ctx, HashMap::default()).await;
489        let mailbox_ctx = env.mailbox_context();
490        let mailbox = mailbox_ctx.mailbox().clone();
491
492        let (tx, rx) = tokio::sync::mpsc::channel(1);
493
494        mailbox_ctx
495            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
496            .await;
497
498        // A reply contains an error.
499        send_mock_reply(mailbox, rx, |id| {
500            Ok(new_upgrade_region_reply(
501                id,
502                true,
503                true,
504                Some("test mocked".to_string()),
505            ))
506        });
507
508        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
509
510        assert_matches!(err, Error::RetryLater { .. });
511        assert!(err.is_retryable());
512        assert!(format!("{err:?}").contains("test mocked"));
513    }
514
515    #[tokio::test]
516    async fn test_upgrade_region_not_found() {
517        let state = UpgradeCandidateRegion::default();
518        let persistent_context = new_persistent_context();
519        let to_peer_id = persistent_context.to_peer.id;
520
521        let mut env = TestingEnv::new();
522        let mut ctx = env.context_factory().new_context(persistent_context);
523        prepare_table_metadata(&ctx, HashMap::default()).await;
524        let mailbox_ctx = env.mailbox_context();
525        let mailbox = mailbox_ctx.mailbox().clone();
526
527        let (tx, rx) = tokio::sync::mpsc::channel(1);
528
529        mailbox_ctx
530            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
531            .await;
532
533        send_mock_reply(mailbox, rx, |id| {
534            Ok(new_upgrade_region_reply(id, true, false, None))
535        });
536
537        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
538
539        assert_matches!(err, Error::Unexpected { .. });
540        assert!(!err.is_retryable());
541        assert!(err.to_string().contains("doesn't exist"));
542    }
543
544    #[tokio::test]
545    async fn test_upgrade_region_require_ready() {
546        let mut state = UpgradeCandidateRegion {
547            require_ready: true,
548            ..Default::default()
549        };
550
551        let persistent_context = new_persistent_context();
552        let to_peer_id = persistent_context.to_peer.id;
553
554        let mut env = TestingEnv::new();
555        let mut ctx = env.context_factory().new_context(persistent_context);
556        prepare_table_metadata(&ctx, HashMap::default()).await;
557        let mailbox_ctx = env.mailbox_context();
558        let mailbox = mailbox_ctx.mailbox().clone();
559
560        let (tx, rx) = tokio::sync::mpsc::channel(1);
561
562        mailbox_ctx
563            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
564            .await;
565
566        send_mock_reply(mailbox, rx, |id| {
567            Ok(new_upgrade_region_reply(id, false, true, None))
568        });
569
570        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
571
572        assert_matches!(err, Error::RetryLater { .. });
573        assert!(err.is_retryable());
574        assert!(format!("{err:?}").contains("still replaying the wal"));
575
576        // Sets the `require_ready` to false.
577        state.require_ready = false;
578
579        let mailbox = mailbox_ctx.mailbox().clone();
580        let (tx, rx) = tokio::sync::mpsc::channel(1);
581
582        mailbox_ctx
583            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
584            .await;
585
586        send_mock_reply(mailbox, rx, |id| {
587            Ok(new_upgrade_region_reply(id, false, true, None))
588        });
589
590        state.upgrade_region(&mut ctx).await.unwrap();
591    }
592
593    #[tokio::test]
594    async fn test_upgrade_region_with_retry_ok() {
595        let mut state = Box::<UpgradeCandidateRegion>::default();
596        state.retry_initial_interval = Duration::from_millis(100);
597        let persistent_context = new_persistent_context();
598        let to_peer_id = persistent_context.to_peer.id;
599
600        let mut env = TestingEnv::new();
601        let mut ctx = env.context_factory().new_context(persistent_context);
602        prepare_table_metadata(&ctx, HashMap::default()).await;
603        let mailbox_ctx = env.mailbox_context();
604        let mailbox = mailbox_ctx.mailbox().clone();
605
606        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
607
608        mailbox_ctx
609            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
610            .await;
611
612        common_runtime::spawn_global(async move {
613            let resp = rx.recv().await.unwrap().unwrap();
614            let reply_id = resp.mailbox_message.unwrap().id;
615            mailbox
616                .on_recv(
617                    reply_id,
618                    Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
619                )
620                .await
621                .unwrap();
622
623            // retry: 1
624            let resp = rx.recv().await.unwrap().unwrap();
625            let reply_id = resp.mailbox_message.unwrap().id;
626            mailbox
627                .on_recv(
628                    reply_id,
629                    Ok(new_upgrade_region_reply(reply_id, false, true, None)),
630                )
631                .await
632                .unwrap();
633
634            // retry: 2
635            let resp = rx.recv().await.unwrap().unwrap();
636            let reply_id = resp.mailbox_message.unwrap().id;
637            mailbox
638                .on_recv(
639                    reply_id,
640                    Ok(new_upgrade_region_reply(reply_id, true, true, None)),
641                )
642                .await
643                .unwrap();
644        });
645
646        let procedure_ctx = new_procedure_context();
647        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
648
649        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
650
651        assert_matches!(update_metadata, UpdateMetadata::Upgrade);
652    }
653
654    #[tokio::test]
655    async fn test_upgrade_region_with_retry_failed() {
656        let mut state = Box::<UpgradeCandidateRegion>::default();
657        state.retry_initial_interval = Duration::from_millis(100);
658        let persistent_context = new_persistent_context();
659        let to_peer_id = persistent_context.to_peer.id;
660
661        let mut env = TestingEnv::new();
662        let mut ctx = env.context_factory().new_context(persistent_context);
663        prepare_table_metadata(&ctx, HashMap::default()).await;
664        let mailbox_ctx = env.mailbox_context();
665        let mailbox = mailbox_ctx.mailbox().clone();
666
667        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
668
669        mailbox_ctx
670            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
671            .await;
672
673        common_runtime::spawn_global(async move {
674            let resp = rx.recv().await.unwrap().unwrap();
675            let reply_id = resp.mailbox_message.unwrap().id;
676            mailbox
677                .on_recv(
678                    reply_id,
679                    Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
680                )
681                .await
682                .unwrap();
683
684            // retry: 1
685            let resp = rx.recv().await.unwrap().unwrap();
686            let reply_id = resp.mailbox_message.unwrap().id;
687            mailbox
688                .on_recv(
689                    reply_id,
690                    Ok(new_upgrade_region_reply(reply_id, false, true, None)),
691                )
692                .await
693                .unwrap();
694
695            // retry: 2
696            let resp = rx.recv().await.unwrap().unwrap();
697            let reply_id = resp.mailbox_message.unwrap().id;
698            mailbox
699                .on_recv(
700                    reply_id,
701                    Ok(new_upgrade_region_reply(reply_id, false, false, None)),
702                )
703                .await
704                .unwrap();
705        });
706        let procedure_ctx = new_procedure_context();
707        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
708
709        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
710        assert_matches!(update_metadata, UpdateMetadata::Rollback);
711    }
712
713    #[tokio::test]
714    async fn test_upgrade_region_procedure_exceeded_deadline() {
715        let mut state = Box::<UpgradeCandidateRegion>::default();
716        state.retry_initial_interval = Duration::from_millis(100);
717        let persistent_context = new_persistent_context();
718        let to_peer_id = persistent_context.to_peer.id;
719
720        let mut env = TestingEnv::new();
721        let mut ctx = env.context_factory().new_context(persistent_context);
722        prepare_table_metadata(&ctx, HashMap::default()).await;
723        let mailbox_ctx = env.mailbox_context();
724        let mailbox = mailbox_ctx.mailbox().clone();
725        ctx.volatile_ctx.metrics.operations_elapsed =
726            ctx.persistent_ctx.timeout + Duration::from_secs(1);
727
728        let (tx, rx) = tokio::sync::mpsc::channel(1);
729        mailbox_ctx
730            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
731            .await;
732
733        send_mock_reply(mailbox, rx, |id| {
734            Ok(new_upgrade_region_reply(id, false, true, None))
735        });
736        let procedure_ctx = new_procedure_context();
737        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
738        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
739        assert_matches!(update_metadata, UpdateMetadata::Rollback);
740    }
741}