meta_srv/procedure/region_migration/
upgrade_candidate_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::time::Duration;
17
18use api::v1::meta::MailboxMessage;
19use common_meta::ddl::utils::parse_region_wal_options;
20use common_meta::instruction::{Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply};
21use common_meta::lock_key::RemoteWalLock;
22use common_meta::wal_options_allocator::extract_topic_from_wal_options;
23use common_procedure::{Context as ProcedureContext, Status};
24use common_telemetry::{error, warn};
25use common_wal::options::WalOptions;
26use serde::{Deserialize, Serialize};
27use snafu::{ensure, OptionExt, ResultExt};
28use tokio::time::{sleep, Instant};
29
30use crate::error::{self, Result};
31use crate::handler::HeartbeatMailbox;
32use crate::procedure::region_migration::update_metadata::UpdateMetadata;
33use crate::procedure::region_migration::{Context, State};
34use crate::service::mailbox::Channel;
35
36#[derive(Debug, Serialize, Deserialize)]
37pub struct UpgradeCandidateRegion {
38    // The optimistic retry times.
39    pub(crate) optimistic_retry: usize,
40    // The retry initial interval.
41    pub(crate) retry_initial_interval: Duration,
42    // If it's true it requires the candidate region MUST replay the WAL to the latest entry id.
43    // Otherwise, it will rollback to the old leader region.
44    pub(crate) require_ready: bool,
45}
46
47impl Default for UpgradeCandidateRegion {
48    fn default() -> Self {
49        Self {
50            optimistic_retry: 3,
51            retry_initial_interval: Duration::from_millis(500),
52            require_ready: true,
53        }
54    }
55}
56
57#[async_trait::async_trait]
58#[typetag::serde]
59impl State for UpgradeCandidateRegion {
60    async fn next(
61        &mut self,
62        ctx: &mut Context,
63        procedure_ctx: &ProcedureContext,
64    ) -> Result<(Box<dyn State>, Status)> {
65        let now = Instant::now();
66
67        let region_wal_option = self.get_region_wal_option(ctx).await?;
68        let region_id = ctx.persistent_ctx.region_id;
69        if region_wal_option.is_none() {
70            warn!(
71                "Region {} wal options not found, during upgrade candidate region",
72                region_id
73            );
74        }
75
76        if self
77            .upgrade_region_with_retry(ctx, procedure_ctx, region_wal_option.as_ref())
78            .await
79        {
80            ctx.update_upgrade_candidate_region_elapsed(now);
81            Ok((Box::new(UpdateMetadata::Upgrade), Status::executing(false)))
82        } else {
83            ctx.update_upgrade_candidate_region_elapsed(now);
84            Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false)))
85        }
86    }
87
88    fn as_any(&self) -> &dyn Any {
89        self
90    }
91}
92
93impl UpgradeCandidateRegion {
94    async fn get_region_wal_option(&self, ctx: &mut Context) -> Result<Option<WalOptions>> {
95        let region_id = ctx.persistent_ctx.region_id;
96        match ctx.get_from_peer_datanode_table_value().await {
97            Ok(datanode_table_value) => {
98                let region_wal_options =
99                    parse_region_wal_options(&datanode_table_value.region_info.region_wal_options)
100                        .context(error::ParseWalOptionsSnafu)?;
101                Ok(region_wal_options.get(&region_id.region_number()).cloned())
102            }
103            Err(error::Error::DatanodeTableNotFound { datanode_id, .. }) => {
104                warn!(
105                    "Datanode table not found, during upgrade candidate region, the target region might already been migrated, region_id: {}, datanode_id: {}",
106                    region_id, datanode_id
107                );
108                Ok(None)
109            }
110            Err(e) => Err(e),
111        }
112    }
113
114    /// Builds upgrade region instruction.
115    async fn build_upgrade_region_instruction(
116        &self,
117        ctx: &mut Context,
118        replay_timeout: Duration,
119    ) -> Result<Instruction> {
120        let pc = &ctx.persistent_ctx;
121        let region_id = pc.region_id;
122        let last_entry_id = ctx.volatile_ctx.leader_region_last_entry_id;
123        let metadata_last_entry_id = ctx.volatile_ctx.leader_region_metadata_last_entry_id;
124        // Try our best to retrieve replay checkpoint.
125        let datanode_table_value = ctx.get_from_peer_datanode_table_value().await.ok();
126        let checkpoint = if let Some(topic) = datanode_table_value.as_ref().and_then(|v| {
127            extract_topic_from_wal_options(region_id, &v.region_info.region_wal_options)
128        }) {
129            ctx.fetch_replay_checkpoint(&topic).await.ok().flatten()
130        } else {
131            None
132        };
133
134        let upgrade_instruction = Instruction::UpgradeRegion(
135            UpgradeRegion {
136                region_id,
137                last_entry_id,
138                metadata_last_entry_id,
139                replay_timeout: Some(replay_timeout),
140                location_id: Some(ctx.persistent_ctx.from_peer.id),
141                replay_entry_id: None,
142                metadata_replay_entry_id: None,
143            }
144            .with_replay_entry_id(checkpoint.map(|c| c.entry_id))
145            .with_metadata_replay_entry_id(checkpoint.and_then(|c| c.metadata_entry_id)),
146        );
147
148        Ok(upgrade_instruction)
149    }
150
151    /// Tries to upgrade a candidate region.
152    ///
153    /// Retry:
154    /// - If `require_ready` is true, but the candidate region returns `ready` is false.
155    /// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout.
156    ///
157    /// Abort:
158    /// - The candidate region doesn't exist.
159    /// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable.
160    /// - [PushMessage](error::Error::PushMessage), The receiver is dropped.
161    /// - [MailboxReceiver](error::Error::MailboxReceiver), The sender is dropped without sending (impossible).
162    /// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply) (impossible).
163    /// - [ExceededDeadline](error::Error::ExceededDeadline)
164    /// - Invalid JSON (impossible).
165    async fn upgrade_region(&self, ctx: &mut Context) -> Result<()> {
166        let operation_timeout =
167            ctx.next_operation_timeout()
168                .context(error::ExceededDeadlineSnafu {
169                    operation: "Upgrade region",
170                })?;
171        let upgrade_instruction = self
172            .build_upgrade_region_instruction(ctx, operation_timeout)
173            .await?;
174
175        let pc = &ctx.persistent_ctx;
176        let region_id = pc.region_id;
177        let candidate = &pc.to_peer;
178
179        let msg = MailboxMessage::json_message(
180            &format!("Upgrade candidate region: {}", region_id),
181            &format!("Metasrv@{}", ctx.server_addr()),
182            &format!("Datanode-{}@{}", candidate.id, candidate.addr),
183            common_time::util::current_time_millis(),
184            &upgrade_instruction,
185        )
186        .with_context(|_| error::SerializeToJsonSnafu {
187            input: upgrade_instruction.to_string(),
188        })?;
189
190        let ch = Channel::Datanode(candidate.id);
191        let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
192
193        match receiver.await {
194            Ok(msg) => {
195                let reply = HeartbeatMailbox::json_reply(&msg)?;
196                let InstructionReply::UpgradeRegion(UpgradeRegionReply {
197                    ready,
198                    exists,
199                    error,
200                }) = reply
201                else {
202                    return error::UnexpectedInstructionReplySnafu {
203                        mailbox_message: msg.to_string(),
204                        reason: "Unexpected reply of the upgrade region instruction",
205                    }
206                    .fail();
207                };
208
209                // Notes: The order of handling is important.
210                if error.is_some() {
211                    return error::RetryLaterSnafu {
212                        reason: format!(
213                            "Failed to upgrade the region {} on datanode {:?}, error: {:?}",
214                            region_id, candidate, error
215                        ),
216                    }
217                    .fail();
218                }
219
220                ensure!(
221                    exists,
222                    error::UnexpectedSnafu {
223                        violated: format!(
224                            "Candidate region {} doesn't exist on datanode {:?}",
225                            region_id, candidate
226                        )
227                    }
228                );
229
230                if self.require_ready && !ready {
231                    return error::RetryLaterSnafu {
232                        reason: format!(
233                            "Candidate region {} still replaying the wal on datanode {:?}",
234                            region_id, candidate
235                        ),
236                    }
237                    .fail();
238                }
239
240                Ok(())
241            }
242            Err(error::Error::MailboxTimeout { .. }) => {
243                let reason = format!(
244                        "Mailbox received timeout for upgrade candidate region {region_id} on datanode {:?}", 
245                        candidate,
246                    );
247                error::RetryLaterSnafu { reason }.fail()
248            }
249            Err(err) => Err(err),
250        }
251    }
252
253    /// Upgrades a candidate region.
254    ///
255    /// Returns true if the candidate region is upgraded successfully.
256    async fn upgrade_region_with_retry(
257        &self,
258        ctx: &mut Context,
259        procedure_ctx: &ProcedureContext,
260        wal_options: Option<&WalOptions>,
261    ) -> bool {
262        let mut retry = 0;
263        let mut upgraded = false;
264
265        loop {
266            let timer = Instant::now();
267            // If using Kafka WAL, acquire a read lock on the topic to prevent WAL pruning during the upgrade.
268            let _guard = if let Some(WalOptions::Kafka(kafka_wal_options)) = wal_options {
269                Some(
270                    procedure_ctx
271                        .provider
272                        .acquire_lock(
273                            &(RemoteWalLock::Read(kafka_wal_options.topic.clone()).into()),
274                        )
275                        .await,
276                )
277            } else {
278                None
279            };
280            if let Err(err) = self.upgrade_region(ctx).await {
281                retry += 1;
282                ctx.update_operations_elapsed(timer);
283                if matches!(err, error::Error::ExceededDeadline { .. }) {
284                    error!("Failed to upgrade region, exceeded deadline");
285                    break;
286                } else if err.is_retryable() && retry < self.optimistic_retry {
287                    error!("Failed to upgrade region, error: {err:?}, retry later");
288                    sleep(self.retry_initial_interval).await;
289                } else {
290                    error!("Failed to upgrade region, error: {err:?}");
291                    break;
292                }
293            } else {
294                ctx.update_operations_elapsed(timer);
295                upgraded = true;
296                break;
297            }
298        }
299
300        upgraded
301    }
302}
303
304#[cfg(test)]
305mod tests {
306    use std::assert_matches::assert_matches;
307    use std::collections::HashMap;
308
309    use common_meta::key::table_route::TableRouteValue;
310    use common_meta::key::test_utils::new_test_table_info;
311    use common_meta::peer::Peer;
312    use common_meta::rpc::router::{Region, RegionRoute};
313    use store_api::storage::RegionId;
314
315    use super::*;
316    use crate::error::Error;
317    use crate::procedure::region_migration::manager::RegionMigrationTriggerReason;
318    use crate::procedure::region_migration::test_util::{new_procedure_context, TestingEnv};
319    use crate::procedure::region_migration::{ContextFactory, PersistentContext};
320    use crate::procedure::test_util::{
321        new_close_region_reply, new_upgrade_region_reply, send_mock_reply,
322    };
323
324    fn new_persistent_context() -> PersistentContext {
325        PersistentContext {
326            catalog: "greptime".into(),
327            schema: "public".into(),
328            from_peer: Peer::empty(1),
329            to_peer: Peer::empty(2),
330            region_id: RegionId::new(1024, 1),
331            timeout: Duration::from_millis(1000),
332            trigger_reason: RegionMigrationTriggerReason::Manual,
333        }
334    }
335
336    async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap<u32, String>) {
337        let table_info =
338            new_test_table_info(ctx.persistent_ctx.region_id.table_id(), vec![1]).into();
339        let region_routes = vec![RegionRoute {
340            region: Region::new_test(ctx.persistent_ctx.region_id),
341            leader_peer: Some(ctx.persistent_ctx.from_peer.clone()),
342            follower_peers: vec![ctx.persistent_ctx.to_peer.clone()],
343            ..Default::default()
344        }];
345        ctx.table_metadata_manager
346            .create_table_metadata(
347                table_info,
348                TableRouteValue::physical(region_routes),
349                wal_options,
350            )
351            .await
352            .unwrap();
353    }
354
355    #[tokio::test]
356    async fn test_datanode_is_unreachable() {
357        let state = UpgradeCandidateRegion::default();
358        let persistent_context = new_persistent_context();
359        let env = TestingEnv::new();
360        let mut ctx = env.context_factory().new_context(persistent_context);
361        prepare_table_metadata(&ctx, HashMap::default()).await;
362        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
363
364        assert_matches!(err, Error::PusherNotFound { .. });
365        assert!(!err.is_retryable());
366    }
367
368    #[tokio::test]
369    async fn test_pusher_dropped() {
370        let state = UpgradeCandidateRegion::default();
371        let persistent_context = new_persistent_context();
372        let to_peer_id = persistent_context.to_peer.id;
373
374        let mut env = TestingEnv::new();
375        let mut ctx = env.context_factory().new_context(persistent_context);
376        prepare_table_metadata(&ctx, HashMap::default()).await;
377        let mailbox_ctx = env.mailbox_context();
378
379        let (tx, rx) = tokio::sync::mpsc::channel(1);
380
381        mailbox_ctx
382            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
383            .await;
384
385        drop(rx);
386
387        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
388
389        assert_matches!(err, Error::PushMessage { .. });
390        assert!(!err.is_retryable());
391    }
392
393    #[tokio::test]
394    async fn test_procedure_exceeded_deadline() {
395        let state = UpgradeCandidateRegion::default();
396        let persistent_context = new_persistent_context();
397        let env = TestingEnv::new();
398        let mut ctx = env.context_factory().new_context(persistent_context);
399        prepare_table_metadata(&ctx, HashMap::default()).await;
400        ctx.volatile_ctx.metrics.operations_elapsed =
401            ctx.persistent_ctx.timeout + Duration::from_secs(1);
402
403        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
404
405        assert_matches!(err, Error::ExceededDeadline { .. });
406        assert!(!err.is_retryable());
407    }
408
409    #[tokio::test]
410    async fn test_unexpected_instruction_reply() {
411        let state = UpgradeCandidateRegion::default();
412        let persistent_context = new_persistent_context();
413        let to_peer_id = persistent_context.to_peer.id;
414
415        let mut env = TestingEnv::new();
416        let mut ctx = env.context_factory().new_context(persistent_context);
417        prepare_table_metadata(&ctx, HashMap::default()).await;
418        let mailbox_ctx = env.mailbox_context();
419        let mailbox = mailbox_ctx.mailbox().clone();
420
421        let (tx, rx) = tokio::sync::mpsc::channel(1);
422
423        mailbox_ctx
424            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
425            .await;
426
427        send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
428
429        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
430        assert_matches!(err, Error::UnexpectedInstructionReply { .. });
431        assert!(!err.is_retryable());
432    }
433
434    #[tokio::test]
435    async fn test_upgrade_region_failed() {
436        let state = UpgradeCandidateRegion::default();
437        let persistent_context = new_persistent_context();
438        let to_peer_id = persistent_context.to_peer.id;
439
440        let mut env = TestingEnv::new();
441        let mut ctx = env.context_factory().new_context(persistent_context);
442        prepare_table_metadata(&ctx, HashMap::default()).await;
443        let mailbox_ctx = env.mailbox_context();
444        let mailbox = mailbox_ctx.mailbox().clone();
445
446        let (tx, rx) = tokio::sync::mpsc::channel(1);
447
448        mailbox_ctx
449            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
450            .await;
451
452        // A reply contains an error.
453        send_mock_reply(mailbox, rx, |id| {
454            Ok(new_upgrade_region_reply(
455                id,
456                true,
457                true,
458                Some("test mocked".to_string()),
459            ))
460        });
461
462        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
463
464        assert_matches!(err, Error::RetryLater { .. });
465        assert!(err.is_retryable());
466        assert!(format!("{err:?}").contains("test mocked"));
467    }
468
469    #[tokio::test]
470    async fn test_upgrade_region_not_found() {
471        let state = UpgradeCandidateRegion::default();
472        let persistent_context = new_persistent_context();
473        let to_peer_id = persistent_context.to_peer.id;
474
475        let mut env = TestingEnv::new();
476        let mut ctx = env.context_factory().new_context(persistent_context);
477        prepare_table_metadata(&ctx, HashMap::default()).await;
478        let mailbox_ctx = env.mailbox_context();
479        let mailbox = mailbox_ctx.mailbox().clone();
480
481        let (tx, rx) = tokio::sync::mpsc::channel(1);
482
483        mailbox_ctx
484            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
485            .await;
486
487        send_mock_reply(mailbox, rx, |id| {
488            Ok(new_upgrade_region_reply(id, true, false, None))
489        });
490
491        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
492
493        assert_matches!(err, Error::Unexpected { .. });
494        assert!(!err.is_retryable());
495        assert!(err.to_string().contains("doesn't exist"));
496    }
497
498    #[tokio::test]
499    async fn test_upgrade_region_require_ready() {
500        let mut state = UpgradeCandidateRegion {
501            require_ready: true,
502            ..Default::default()
503        };
504
505        let persistent_context = new_persistent_context();
506        let to_peer_id = persistent_context.to_peer.id;
507
508        let mut env = TestingEnv::new();
509        let mut ctx = env.context_factory().new_context(persistent_context);
510        prepare_table_metadata(&ctx, HashMap::default()).await;
511        let mailbox_ctx = env.mailbox_context();
512        let mailbox = mailbox_ctx.mailbox().clone();
513
514        let (tx, rx) = tokio::sync::mpsc::channel(1);
515
516        mailbox_ctx
517            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
518            .await;
519
520        send_mock_reply(mailbox, rx, |id| {
521            Ok(new_upgrade_region_reply(id, false, true, None))
522        });
523
524        let err = state.upgrade_region(&mut ctx).await.unwrap_err();
525
526        assert_matches!(err, Error::RetryLater { .. });
527        assert!(err.is_retryable());
528        assert!(format!("{err:?}").contains("still replaying the wal"));
529
530        // Sets the `require_ready` to false.
531        state.require_ready = false;
532
533        let mailbox = mailbox_ctx.mailbox().clone();
534        let (tx, rx) = tokio::sync::mpsc::channel(1);
535
536        mailbox_ctx
537            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
538            .await;
539
540        send_mock_reply(mailbox, rx, |id| {
541            Ok(new_upgrade_region_reply(id, false, true, None))
542        });
543
544        state.upgrade_region(&mut ctx).await.unwrap();
545    }
546
547    #[tokio::test]
548    async fn test_upgrade_region_with_retry_ok() {
549        let mut state = Box::<UpgradeCandidateRegion>::default();
550        state.retry_initial_interval = Duration::from_millis(100);
551        let persistent_context = new_persistent_context();
552        let to_peer_id = persistent_context.to_peer.id;
553
554        let mut env = TestingEnv::new();
555        let mut ctx = env.context_factory().new_context(persistent_context);
556        prepare_table_metadata(&ctx, HashMap::default()).await;
557        let mailbox_ctx = env.mailbox_context();
558        let mailbox = mailbox_ctx.mailbox().clone();
559
560        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
561
562        mailbox_ctx
563            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
564            .await;
565
566        common_runtime::spawn_global(async move {
567            let resp = rx.recv().await.unwrap().unwrap();
568            let reply_id = resp.mailbox_message.unwrap().id;
569            mailbox
570                .on_recv(
571                    reply_id,
572                    Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
573                )
574                .await
575                .unwrap();
576
577            // retry: 1
578            let resp = rx.recv().await.unwrap().unwrap();
579            let reply_id = resp.mailbox_message.unwrap().id;
580            mailbox
581                .on_recv(
582                    reply_id,
583                    Ok(new_upgrade_region_reply(reply_id, false, true, None)),
584                )
585                .await
586                .unwrap();
587
588            // retry: 2
589            let resp = rx.recv().await.unwrap().unwrap();
590            let reply_id = resp.mailbox_message.unwrap().id;
591            mailbox
592                .on_recv(
593                    reply_id,
594                    Ok(new_upgrade_region_reply(reply_id, true, true, None)),
595                )
596                .await
597                .unwrap();
598        });
599
600        let procedure_ctx = new_procedure_context();
601        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
602
603        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
604
605        assert_matches!(update_metadata, UpdateMetadata::Upgrade);
606    }
607
608    #[tokio::test]
609    async fn test_upgrade_region_with_retry_failed() {
610        let mut state = Box::<UpgradeCandidateRegion>::default();
611        state.retry_initial_interval = Duration::from_millis(100);
612        let persistent_context = new_persistent_context();
613        let to_peer_id = persistent_context.to_peer.id;
614
615        let mut env = TestingEnv::new();
616        let mut ctx = env.context_factory().new_context(persistent_context);
617        prepare_table_metadata(&ctx, HashMap::default()).await;
618        let mailbox_ctx = env.mailbox_context();
619        let mailbox = mailbox_ctx.mailbox().clone();
620
621        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
622
623        mailbox_ctx
624            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
625            .await;
626
627        common_runtime::spawn_global(async move {
628            let resp = rx.recv().await.unwrap().unwrap();
629            let reply_id = resp.mailbox_message.unwrap().id;
630            mailbox
631                .on_recv(
632                    reply_id,
633                    Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
634                )
635                .await
636                .unwrap();
637
638            // retry: 1
639            let resp = rx.recv().await.unwrap().unwrap();
640            let reply_id = resp.mailbox_message.unwrap().id;
641            mailbox
642                .on_recv(
643                    reply_id,
644                    Ok(new_upgrade_region_reply(reply_id, false, true, None)),
645                )
646                .await
647                .unwrap();
648
649            // retry: 2
650            let resp = rx.recv().await.unwrap().unwrap();
651            let reply_id = resp.mailbox_message.unwrap().id;
652            mailbox
653                .on_recv(
654                    reply_id,
655                    Ok(new_upgrade_region_reply(reply_id, false, false, None)),
656                )
657                .await
658                .unwrap();
659        });
660        let procedure_ctx = new_procedure_context();
661        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
662
663        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
664        assert_matches!(update_metadata, UpdateMetadata::Rollback);
665    }
666
667    #[tokio::test]
668    async fn test_upgrade_region_procedure_exceeded_deadline() {
669        let mut state = Box::<UpgradeCandidateRegion>::default();
670        state.retry_initial_interval = Duration::from_millis(100);
671        let persistent_context = new_persistent_context();
672        let to_peer_id = persistent_context.to_peer.id;
673
674        let mut env = TestingEnv::new();
675        let mut ctx = env.context_factory().new_context(persistent_context);
676        prepare_table_metadata(&ctx, HashMap::default()).await;
677        let mailbox_ctx = env.mailbox_context();
678        let mailbox = mailbox_ctx.mailbox().clone();
679        ctx.volatile_ctx.metrics.operations_elapsed =
680            ctx.persistent_ctx.timeout + Duration::from_secs(1);
681
682        let (tx, rx) = tokio::sync::mpsc::channel(1);
683        mailbox_ctx
684            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
685            .await;
686
687        send_mock_reply(mailbox, rx, |id| {
688            Ok(new_upgrade_region_reply(id, false, true, None))
689        });
690        let procedure_ctx = new_procedure_context();
691        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
692        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
693        assert_matches!(update_metadata, UpdateMetadata::Rollback);
694    }
695}