Skip to main content

meta_srv/procedure/region_migration/
upgrade_candidate_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashSet;
17use std::time::Duration;
18
19use api::v1::meta::MailboxMessage;
20use common_meta::instruction::{
21    Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply, UpgradeRegionsReply,
22};
23use common_meta::lock_key::RemoteWalLock;
24use common_meta::wal_provider::extract_topic_from_wal_options;
25use common_procedure::{Context as ProcedureContext, Status};
26use common_telemetry::tracing_context::TracingContext;
27use common_telemetry::{error, info};
28use common_wal::options::WalOptions;
29use serde::{Deserialize, Serialize};
30use snafu::{OptionExt, ResultExt, ensure};
31use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
32use tokio::time::{Instant, sleep};
33
34use crate::error::{self, Result};
35use crate::handler::HeartbeatMailbox;
36use crate::procedure::region_migration::update_metadata::UpdateMetadata;
37use crate::procedure::region_migration::{Context, State};
38use crate::procedure::utils::instruction_error_result;
39use crate::service::mailbox::Channel;
40
41#[derive(Debug, Serialize, Deserialize)]
42pub struct UpgradeCandidateRegion {
43    // The optimistic retry times.
44    pub(crate) optimistic_retry: usize,
45    // The retry initial interval.
46    pub(crate) retry_initial_interval: Duration,
47    // If it's true it requires the candidate region MUST replay the WAL to the latest entry id.
48    // Otherwise, it will rollback to the old leader region.
49    pub(crate) require_ready: bool,
50}
51
52impl Default for UpgradeCandidateRegion {
53    fn default() -> Self {
54        Self {
55            optimistic_retry: 3,
56            retry_initial_interval: Duration::from_millis(500),
57            require_ready: true,
58        }
59    }
60}
61
62#[async_trait::async_trait]
63#[typetag::serde]
64impl State for UpgradeCandidateRegion {
65    async fn next(
66        &mut self,
67        ctx: &mut Context,
68        procedure_ctx: &ProcedureContext,
69    ) -> Result<(Box<dyn State>, Status)> {
70        let now = Instant::now();
71
72        let topics = self.get_kafka_topics(ctx).await?;
73        if self
74            .upgrade_region_with_retry(ctx, procedure_ctx, topics)
75            .await
76        {
77            ctx.update_upgrade_candidate_region_elapsed(now);
78            Ok((Box::new(UpdateMetadata::Upgrade), Status::executing(false)))
79        } else {
80            ctx.update_upgrade_candidate_region_elapsed(now);
81            Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false)))
82        }
83    }
84
85    fn as_any(&self) -> &dyn Any {
86        self
87    }
88}
89
90impl UpgradeCandidateRegion {
91    async fn get_kafka_topics(&self, ctx: &mut Context) -> Result<HashSet<String>> {
92        let table_regions = ctx.persistent_ctx.table_regions();
93        let datanode_table_values = ctx.get_from_peer_datanode_table_values().await?;
94        let mut topics = HashSet::new();
95        for (table_id, regions) in table_regions {
96            let Some(datanode_table_value) = datanode_table_values.get(&table_id) else {
97                continue;
98            };
99
100            let region_wal_options = &datanode_table_value.region_info.region_wal_options;
101
102            for region_id in regions {
103                let Some(WalOptions::Kafka(kafka_wal_options)) =
104                    region_wal_options.get(&region_id.region_number())
105                else {
106                    continue;
107                };
108                if !topics.contains(&kafka_wal_options.topic) {
109                    topics.insert(kafka_wal_options.topic.clone());
110                }
111            }
112        }
113
114        Ok(topics)
115    }
116
117    /// Builds upgrade region instruction.
118    async fn build_upgrade_region_instruction(
119        &self,
120        ctx: &mut Context,
121        replay_timeout: Duration,
122    ) -> Result<Instruction> {
123        let region_ids = ctx.persistent_ctx.region_ids.clone();
124        let datanode_table_values = ctx.get_from_peer_datanode_table_values().await?;
125        let mut region_topic = Vec::with_capacity(region_ids.len());
126        for region_id in region_ids.iter() {
127            let table_id = region_id.table_id();
128            if let Some(datanode_table_value) = datanode_table_values.get(&table_id)
129                && let Some(topic) = extract_topic_from_wal_options(
130                    *region_id,
131                    &datanode_table_value.region_info.region_wal_options,
132                )
133            {
134                let is_metric_engine =
135                    datanode_table_value.region_info.engine == METRIC_ENGINE_NAME;
136                region_topic.push((*region_id, topic, is_metric_engine));
137            }
138        }
139
140        let replay_checkpoints = ctx
141            .get_replay_checkpoints_with_topic_pruned_entry_ids(&region_topic)
142            .await?;
143        // Build upgrade regions instruction.
144        let mut upgrade_regions = Vec::with_capacity(region_ids.len());
145        for region_id in region_ids {
146            let last_entry_id = ctx
147                .volatile_ctx
148                .leader_region_last_entry_ids
149                .get(&region_id)
150                .copied();
151            let metadata_last_entry_id = ctx
152                .volatile_ctx
153                .leader_region_metadata_last_entry_ids
154                .get(&region_id)
155                .copied();
156            let checkpoint = replay_checkpoints.get(&region_id).copied();
157            upgrade_regions.push(UpgradeRegion {
158                region_id,
159                last_entry_id,
160                metadata_last_entry_id,
161                replay_timeout,
162                location_id: Some(ctx.persistent_ctx.from_peer.id),
163                replay_entry_id: checkpoint.map(|c| c.entry_id),
164                metadata_replay_entry_id: checkpoint.and_then(|c| c.metadata_entry_id),
165            });
166        }
167
168        Ok(Instruction::UpgradeRegions(upgrade_regions))
169    }
170
171    fn handle_upgrade_region_reply(
172        &self,
173        ctx: &mut Context,
174        UpgradeRegionReply {
175            region_id,
176            ready,
177            exists,
178            error,
179        }: &UpgradeRegionReply,
180        now: &Instant,
181    ) -> Result<()> {
182        let candidate = &ctx.persistent_ctx.to_peer;
183        if let Some(error) = error {
184            return instruction_error_result(
185                error,
186                format!(
187                    "Failed to upgrade the region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
188                    region_id,
189                    candidate,
190                    error,
191                    now.elapsed()
192                ),
193            );
194        }
195
196        ensure!(
197            exists,
198            error::UnexpectedSnafu {
199                violated: format!(
200                    "Candidate region {} doesn't exist on datanode {:?}",
201                    region_id, candidate
202                )
203            }
204        );
205
206        if self.require_ready && !ready {
207            return error::RetryLaterSnafu {
208                reason: format!(
209                    "Candidate region {} still replaying the wal on datanode {:?}, elapsed: {:?}",
210                    region_id,
211                    candidate,
212                    now.elapsed()
213                ),
214            }
215            .fail();
216        }
217
218        Ok(())
219    }
220
221    /// Tries to upgrade a candidate region.
222    ///
223    /// Retry:
224    /// - If `require_ready` is true, but the candidate region returns `ready` is false.
225    /// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout.
226    ///
227    /// Abort:
228    /// - The candidate region doesn't exist.
229    /// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable.
230    /// - [PushMessage](error::Error::PushMessage), The receiver is dropped.
231    /// - [MailboxReceiver](error::Error::MailboxReceiver), The sender is dropped without sending (impossible).
232    /// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply) (impossible).
233    /// - [ExceededDeadline](error::Error::ExceededDeadline)
234    /// - Invalid JSON (impossible).
235    async fn upgrade_region(&self, ctx: &mut Context) -> Result<()> {
236        let operation_timeout =
237            ctx.next_operation_timeout()
238                .context(error::ExceededDeadlineSnafu {
239                    operation: "Upgrade region",
240                })?;
241        let upgrade_instruction = self
242            .build_upgrade_region_instruction(ctx, operation_timeout)
243            .await?;
244
245        let pc = &ctx.persistent_ctx;
246        let region_ids = &pc.region_ids;
247        let candidate = &pc.to_peer;
248
249        let tracing_ctx = TracingContext::from_current_span();
250        let msg = MailboxMessage::json_message(
251            &format!("Upgrade candidate regions: {:?}", region_ids),
252            &format!("Metasrv@{}", ctx.server_addr()),
253            &format!("Datanode-{}@{}", candidate.id, candidate.addr),
254            common_time::util::current_time_millis(),
255            &upgrade_instruction,
256            Some(tracing_ctx.to_w3c()),
257        )
258        .with_context(|_| error::SerializeToJsonSnafu {
259            input: upgrade_instruction.to_string(),
260        })?;
261
262        let ch = Channel::Datanode(candidate.id);
263        let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
264
265        let now = Instant::now();
266        match receiver.await {
267            Ok(msg) => {
268                let reply = HeartbeatMailbox::json_reply(&msg)?;
269                info!(
270                    "Received upgrade region reply: {:?}, regions: {:?}, elapsed: {:?}",
271                    reply,
272                    region_ids,
273                    now.elapsed()
274                );
275                let InstructionReply::UpgradeRegions(UpgradeRegionsReply { replies }) = reply
276                else {
277                    return error::UnexpectedInstructionReplySnafu {
278                        mailbox_message: msg.to_string(),
279                        reason: "Unexpected reply of the upgrade region instruction",
280                    }
281                    .fail();
282                };
283                for reply in replies {
284                    self.handle_upgrade_region_reply(ctx, &reply, &now)?;
285                }
286                Ok(())
287            }
288            Err(error::Error::MailboxTimeout { .. }) => {
289                let reason = format!(
290                    "Mailbox received timeout for upgrade candidate regions {region_ids:?} on datanode {:?}, elapsed: {:?}",
291                    candidate,
292                    now.elapsed()
293                );
294                error::RetryLaterSnafu { reason }.fail()
295            }
296            Err(err) => Err(err),
297        }
298    }
299
300    /// Upgrades a candidate region.
301    ///
302    /// Returns true if the candidate region is upgraded successfully.
303    async fn upgrade_region_with_retry(
304        &self,
305        ctx: &mut Context,
306        procedure_ctx: &ProcedureContext,
307        topics: HashSet<String>,
308    ) -> bool {
309        let mut retry = 0;
310        let mut upgraded = false;
311
312        let mut guards = Vec::with_capacity(topics.len());
313        loop {
314            let timer = Instant::now();
315            // If using Kafka WAL, acquire a read lock on the topic to prevent WAL pruning during the upgrade.
316            for topic in &topics {
317                guards.push(
318                    procedure_ctx
319                        .provider
320                        .acquire_lock(&(RemoteWalLock::Read(topic.clone()).into()))
321                        .await,
322                );
323            }
324
325            if let Err(err) = self.upgrade_region(ctx).await {
326                retry += 1;
327                ctx.update_operations_elapsed(timer);
328                if matches!(err, error::Error::ExceededDeadline { .. }) {
329                    error!("Failed to upgrade region, exceeded deadline");
330                    break;
331                } else if err.is_retryable() && retry < self.optimistic_retry {
332                    error!("Failed to upgrade region, error: {err:?}, retry later");
333                    sleep(self.retry_initial_interval).await;
334                } else {
335                    error!("Failed to upgrade region, error: {err:?}");
336                    break;
337                }
338            } else {
339                ctx.update_operations_elapsed(timer);
340                upgraded = true;
341                break;
342            }
343        }
344
345        upgraded
346    }
347}
348
349#[cfg(test)]
350mod tests {
351    use std::assert_matches;
352    use std::collections::HashMap;
353
354    use common_meta::key::table_route::TableRouteValue;
355    use common_meta::key::test_utils::new_test_table_info;
356    use common_meta::key::topic_name::TopicNameKey;
357    use common_meta::key::topic_region::{ReplayCheckpoint, TopicRegionKey, TopicRegionValue};
358    use common_meta::peer::Peer;
359    use common_meta::rpc::router::{Region, RegionRoute};
360    use common_meta::wal_provider::RegionWalOptions;
361    use common_wal::options::KafkaWalOptions;
362    use store_api::storage::RegionId;
363
364    use super::*;
365    use crate::error::Error;
366    use crate::procedure::region_migration::manager::RegionMigrationTriggerReason;
367    use crate::procedure::region_migration::test_util::{TestingEnv, new_procedure_context};
368    use crate::procedure::region_migration::{ContextFactory, PersistentContext};
369    use crate::procedure::test_util::{
370        new_close_region_reply, new_upgrade_region_reply, send_mock_reply,
371    };
372
373    fn new_persistent_context() -> PersistentContext {
374        PersistentContext::new(
375            vec![("greptime".into(), "public".into())],
376            Peer::empty(1),
377            Peer::empty(2),
378            vec![RegionId::new(1024, 1)],
379            Duration::from_millis(1000),
380            RegionMigrationTriggerReason::Manual,
381        )
382    }
383
384    fn kafka_wal_options(topic: &str) -> RegionWalOptions {
385        RegionWalOptions::from([(
386            1,
387            WalOptions::Kafka(KafkaWalOptions::new(topic.to_string())),
388        )])
389    }
390
391    async fn prepare_table_metadata(ctx: &Context, wal_options: RegionWalOptions) {
392        prepare_table_metadata_with_engine(ctx, wal_options, "engine").await;
393    }
394
395    async fn prepare_table_metadata_with_engine(
396        ctx: &Context,
397        wal_options: RegionWalOptions,
398        engine: &str,
399    ) {
400        let region_id = ctx.persistent_ctx.region_ids[0];
401        let mut table_info = new_test_table_info(region_id.table_id());
402        table_info.meta.engine = engine.to_string();
403        let region_routes = vec![RegionRoute {
404            region: Region::new_test(region_id),
405            leader_peer: Some(ctx.persistent_ctx.from_peer.clone()),
406            follower_peers: vec![ctx.persistent_ctx.to_peer.clone()],
407            ..Default::default()
408        }];
409        ctx.table_metadata_manager
410            .create_table_metadata(
411                table_info,
412                TableRouteValue::physical(region_routes),
413                wal_options,
414            )
415            .await
416            .unwrap();
417    }
418
419    #[tokio::test]
420    async fn test_build_upgrade_region_instruction_merges_topic_pruned_entry_id() {
421        let state = UpgradeCandidateRegion::default();
422        let persistent_context = new_persistent_context();
423        let env = TestingEnv::new();
424        let mut ctx = env.context_factory().new_context(persistent_context);
425        let region_id = ctx.persistent_ctx.region_ids[0];
426        let topic = "test_topic";
427        prepare_table_metadata(&ctx, kafka_wal_options(topic)).await;
428        ctx.table_metadata_manager
429            .topic_region_manager()
430            .batch_put(&[(
431                TopicRegionKey::new(region_id, topic),
432                Some(TopicRegionValue::new(Some(ReplayCheckpoint::new(10, None)))),
433            )])
434            .await
435            .unwrap();
436        ctx.table_metadata_manager
437            .topic_name_manager()
438            .batch_put(vec![TopicNameKey::new(topic)])
439            .await
440            .unwrap();
441        let prev = ctx
442            .table_metadata_manager
443            .topic_name_manager()
444            .get(topic)
445            .await
446            .unwrap();
447        ctx.table_metadata_manager
448            .topic_name_manager()
449            .update(topic, 20, prev)
450            .await
451            .unwrap();
452
453        let instruction = state
454            .build_upgrade_region_instruction(&mut ctx, Duration::from_secs(1))
455            .await
456            .unwrap();
457        let Instruction::UpgradeRegions(upgrade_regions) = instruction else {
458            unreachable!()
459        };
460
461        assert_eq!(upgrade_regions.len(), 1);
462        assert_eq!(upgrade_regions[0].replay_entry_id, Some(20));
463        assert_eq!(upgrade_regions[0].metadata_replay_entry_id, None);
464    }
465
466    #[tokio::test]
467    async fn test_build_upgrade_region_instruction_merges_metric_metadata_pruned_entry_id() {
468        let state = UpgradeCandidateRegion::default();
469        let persistent_context = new_persistent_context();
470        let env = TestingEnv::new();
471        let mut ctx = env.context_factory().new_context(persistent_context);
472        let region_id = ctx.persistent_ctx.region_ids[0];
473        let topic = "test_topic";
474        prepare_table_metadata_with_engine(&ctx, kafka_wal_options(topic), METRIC_ENGINE_NAME)
475            .await;
476        ctx.table_metadata_manager
477            .topic_region_manager()
478            .batch_put(&[(
479                TopicRegionKey::new(region_id, topic),
480                Some(TopicRegionValue::new(Some(ReplayCheckpoint::new(
481                    10,
482                    Some(5),
483                )))),
484            )])
485            .await
486            .unwrap();
487        ctx.table_metadata_manager
488            .topic_name_manager()
489            .batch_put(vec![TopicNameKey::new(topic)])
490            .await
491            .unwrap();
492        let prev = ctx
493            .table_metadata_manager
494            .topic_name_manager()
495            .get(topic)
496            .await
497            .unwrap();
498        ctx.table_metadata_manager
499            .topic_name_manager()
500            .update(topic, 20, prev)
501            .await
502            .unwrap();
503
504        let instruction = state
505            .build_upgrade_region_instruction(&mut ctx, Duration::from_secs(1))
506            .await
507            .unwrap();
508        let Instruction::UpgradeRegions(upgrade_regions) = instruction else {
509            unreachable!()
510        };
511
512        assert_eq!(upgrade_regions.len(), 1);
513        assert_eq!(upgrade_regions[0].replay_entry_id, Some(20));
514        assert_eq!(upgrade_regions[0].metadata_replay_entry_id, Some(20));
515    }
516
517    #[tokio::test]
518    async fn test_datanode_is_unreachable() {
519        let state = UpgradeCandidateRegion::default();
520        let persistent_context = new_persistent_context();
521        let env = TestingEnv::new();
522        let mut ctx = env.context_factory().new_context(persistent_context);
523        prepare_table_metadata(&ctx, HashMap::default()).await;
524        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
525
526        assert_matches!(err, Error::PusherNotFound { .. });
527        assert!(!err.is_retryable());
528    }
529
530    #[tokio::test]
531    async fn test_pusher_dropped() {
532        let state = UpgradeCandidateRegion::default();
533        let persistent_context = new_persistent_context();
534        let to_peer_id = persistent_context.to_peer.id;
535
536        let mut env = TestingEnv::new();
537        let mut ctx = env.context_factory().new_context(persistent_context);
538        prepare_table_metadata(&ctx, HashMap::default()).await;
539        let mailbox_ctx = env.mailbox_context();
540
541        let (tx, rx) = tokio::sync::mpsc::channel(1);
542
543        mailbox_ctx
544            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
545            .await;
546
547        drop(rx);
548
549        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
550
551        assert_matches!(err, Error::PushMessage { .. });
552        assert!(!err.is_retryable());
553    }
554
555    #[tokio::test]
556    async fn test_procedure_exceeded_deadline() {
557        let state = UpgradeCandidateRegion::default();
558        let persistent_context = new_persistent_context();
559        let env = TestingEnv::new();
560        let mut ctx = env.context_factory().new_context(persistent_context);
561        prepare_table_metadata(&ctx, HashMap::default()).await;
562        ctx.volatile_ctx.metrics.operations_elapsed =
563            ctx.persistent_ctx.timeout + Duration::from_secs(1);
564
565        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
566
567        assert_matches!(err, Error::ExceededDeadline { .. });
568        assert!(!err.is_retryable());
569    }
570
571    #[tokio::test]
572    async fn test_unexpected_instruction_reply() {
573        let state = UpgradeCandidateRegion::default();
574        let persistent_context = new_persistent_context();
575        let to_peer_id = persistent_context.to_peer.id;
576
577        let mut env = TestingEnv::new();
578        let mut ctx = env.context_factory().new_context(persistent_context);
579        prepare_table_metadata(&ctx, HashMap::default()).await;
580        let mailbox_ctx = env.mailbox_context();
581        let mailbox = mailbox_ctx.mailbox().clone();
582
583        let (tx, rx) = tokio::sync::mpsc::channel(1);
584
585        mailbox_ctx
586            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
587            .await;
588
589        send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
590
591        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
592        assert_matches!(err, Error::UnexpectedInstructionReply { .. });
593        assert!(!err.is_retryable());
594    }
595
596    #[tokio::test]
597    async fn test_upgrade_region_failed() {
598        let state = UpgradeCandidateRegion::default();
599        let persistent_context = new_persistent_context();
600        let to_peer_id = persistent_context.to_peer.id;
601
602        let mut env = TestingEnv::new();
603        let mut ctx = env.context_factory().new_context(persistent_context);
604        prepare_table_metadata(&ctx, HashMap::default()).await;
605        let mailbox_ctx = env.mailbox_context();
606        let mailbox = mailbox_ctx.mailbox().clone();
607
608        let (tx, rx) = tokio::sync::mpsc::channel(1);
609
610        mailbox_ctx
611            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
612            .await;
613
614        // A reply contains an error.
615        send_mock_reply(mailbox, rx, |id| {
616            Ok(new_upgrade_region_reply(
617                id,
618                true,
619                true,
620                Some("test mocked".to_string()),
621            ))
622        });
623
624        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
625
626        assert_matches!(err, Error::RetryLater { .. });
627        assert!(err.is_retryable());
628        assert!(format!("{err:?}").contains("test mocked"));
629    }
630
631    #[tokio::test]
632    async fn test_upgrade_region_not_found() {
633        let state = UpgradeCandidateRegion::default();
634        let persistent_context = new_persistent_context();
635        let to_peer_id = persistent_context.to_peer.id;
636
637        let mut env = TestingEnv::new();
638        let mut ctx = env.context_factory().new_context(persistent_context);
639        prepare_table_metadata(&ctx, HashMap::default()).await;
640        let mailbox_ctx = env.mailbox_context();
641        let mailbox = mailbox_ctx.mailbox().clone();
642
643        let (tx, rx) = tokio::sync::mpsc::channel(1);
644
645        mailbox_ctx
646            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
647            .await;
648
649        send_mock_reply(mailbox, rx, |id| {
650            Ok(new_upgrade_region_reply(id, true, false, None))
651        });
652
653        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
654
655        assert_matches!(err, Error::Unexpected { .. });
656        assert!(!err.is_retryable());
657        assert!(err.to_string().contains("doesn't exist"));
658    }
659
660    #[tokio::test]
661    async fn test_upgrade_region_require_ready() {
662        let mut state = UpgradeCandidateRegion {
663            require_ready: true,
664            ..Default::default()
665        };
666
667        let persistent_context = new_persistent_context();
668        let to_peer_id = persistent_context.to_peer.id;
669
670        let mut env = TestingEnv::new();
671        let mut ctx = env.context_factory().new_context(persistent_context);
672        prepare_table_metadata(&ctx, HashMap::default()).await;
673        let mailbox_ctx = env.mailbox_context();
674        let mailbox = mailbox_ctx.mailbox().clone();
675
676        let (tx, rx) = tokio::sync::mpsc::channel(1);
677
678        mailbox_ctx
679            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
680            .await;
681
682        send_mock_reply(mailbox, rx, |id| {
683            Ok(new_upgrade_region_reply(id, false, true, None))
684        });
685
686        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
687
688        assert_matches!(err, Error::RetryLater { .. });
689        assert!(err.is_retryable());
690        assert!(format!("{err:?}").contains("still replaying the wal"));
691
692        // Sets the `require_ready` to false.
693        state.require_ready = false;
694
695        let mailbox = mailbox_ctx.mailbox().clone();
696        let (tx, rx) = tokio::sync::mpsc::channel(1);
697
698        mailbox_ctx
699            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
700            .await;
701
702        send_mock_reply(mailbox, rx, |id| {
703            Ok(new_upgrade_region_reply(id, false, true, None))
704        });
705
706        state.upgrade_region(&mut ctx).await.unwrap();
707    }
708
709    #[tokio::test]
710    async fn test_upgrade_region_with_retry_ok() {
711        let mut state = Box::<UpgradeCandidateRegion>::default();
712        state.retry_initial_interval = Duration::from_millis(100);
713        let persistent_context = new_persistent_context();
714        let to_peer_id = persistent_context.to_peer.id;
715
716        let mut env = TestingEnv::new();
717        let mut ctx = env.context_factory().new_context(persistent_context);
718        prepare_table_metadata(&ctx, HashMap::default()).await;
719        let mailbox_ctx = env.mailbox_context();
720        let mailbox = mailbox_ctx.mailbox().clone();
721
722        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
723
724        mailbox_ctx
725            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
726            .await;
727
728        common_runtime::spawn_global(async move {
729            let resp = rx.recv().await.unwrap().unwrap();
730            let reply_id = resp.mailbox_message.unwrap().id;
731            mailbox
732                .on_recv(
733                    reply_id,
734                    Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
735                )
736                .await
737                .unwrap();
738
739            // retry: 1
740            let resp = rx.recv().await.unwrap().unwrap();
741            let reply_id = resp.mailbox_message.unwrap().id;
742            mailbox
743                .on_recv(
744                    reply_id,
745                    Ok(new_upgrade_region_reply(reply_id, false, true, None)),
746                )
747                .await
748                .unwrap();
749
750            // retry: 2
751            let resp = rx.recv().await.unwrap().unwrap();
752            let reply_id = resp.mailbox_message.unwrap().id;
753            mailbox
754                .on_recv(
755                    reply_id,
756                    Ok(new_upgrade_region_reply(reply_id, true, true, None)),
757                )
758                .await
759                .unwrap();
760        });
761
762        let procedure_ctx = new_procedure_context();
763        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
764
765        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
766
767        assert_matches!(update_metadata, UpdateMetadata::Upgrade);
768    }
769
770    #[tokio::test]
771    async fn test_upgrade_region_with_retry_failed() {
772        let mut state = Box::<UpgradeCandidateRegion>::default();
773        state.retry_initial_interval = Duration::from_millis(100);
774        let persistent_context = new_persistent_context();
775        let to_peer_id = persistent_context.to_peer.id;
776
777        let mut env = TestingEnv::new();
778        let mut ctx = env.context_factory().new_context(persistent_context);
779        prepare_table_metadata(&ctx, HashMap::default()).await;
780        let mailbox_ctx = env.mailbox_context();
781        let mailbox = mailbox_ctx.mailbox().clone();
782
783        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
784
785        mailbox_ctx
786            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
787            .await;
788
789        common_runtime::spawn_global(async move {
790            let resp = rx.recv().await.unwrap().unwrap();
791            let reply_id = resp.mailbox_message.unwrap().id;
792            mailbox
793                .on_recv(
794                    reply_id,
795                    Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
796                )
797                .await
798                .unwrap();
799
800            // retry: 1
801            let resp = rx.recv().await.unwrap().unwrap();
802            let reply_id = resp.mailbox_message.unwrap().id;
803            mailbox
804                .on_recv(
805                    reply_id,
806                    Ok(new_upgrade_region_reply(reply_id, false, true, None)),
807                )
808                .await
809                .unwrap();
810
811            // retry: 2
812            let resp = rx.recv().await.unwrap().unwrap();
813            let reply_id = resp.mailbox_message.unwrap().id;
814            mailbox
815                .on_recv(
816                    reply_id,
817                    Ok(new_upgrade_region_reply(reply_id, false, false, None)),
818                )
819                .await
820                .unwrap();
821        });
822        let procedure_ctx = new_procedure_context();
823        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
824
825        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
826        assert_matches!(update_metadata, UpdateMetadata::Rollback);
827    }
828
829    #[tokio::test]
830    async fn test_upgrade_region_procedure_exceeded_deadline() {
831        let mut state = Box::<UpgradeCandidateRegion>::default();
832        state.retry_initial_interval = Duration::from_millis(100);
833        let persistent_context = new_persistent_context();
834        let to_peer_id = persistent_context.to_peer.id;
835
836        let mut env = TestingEnv::new();
837        let mut ctx = env.context_factory().new_context(persistent_context);
838        prepare_table_metadata(&ctx, HashMap::default()).await;
839        let mailbox_ctx = env.mailbox_context();
840        let mailbox = mailbox_ctx.mailbox().clone();
841        ctx.volatile_ctx.metrics.operations_elapsed =
842            ctx.persistent_ctx.timeout + Duration::from_secs(1);
843
844        let (tx, rx) = tokio::sync::mpsc::channel(1);
845        mailbox_ctx
846            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
847            .await;
848
849        send_mock_reply(mailbox, rx, |id| {
850            Ok(new_upgrade_region_reply(id, false, true, None))
851        });
852        let procedure_ctx = new_procedure_context();
853        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
854        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
855        assert_matches!(update_metadata, UpdateMetadata::Rollback);
856    }
857}