1use std::any::Any;
16use std::collections::HashSet;
17use std::time::Duration;
18
19use api::v1::meta::MailboxMessage;
20use common_meta::ddl::utils::parse_region_wal_options;
21use common_meta::instruction::{
22 Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply, UpgradeRegionsReply,
23};
24use common_meta::key::topic_region::TopicRegionKey;
25use common_meta::lock_key::RemoteWalLock;
26use common_meta::wal_options_allocator::extract_topic_from_wal_options;
27use common_procedure::{Context as ProcedureContext, Status};
28use common_telemetry::{error, info};
29use common_wal::options::WalOptions;
30use serde::{Deserialize, Serialize};
31use snafu::{OptionExt, ResultExt, ensure};
32use tokio::time::{Instant, sleep};
33
34use crate::error::{self, Result};
35use crate::handler::HeartbeatMailbox;
36use crate::procedure::region_migration::update_metadata::UpdateMetadata;
37use crate::procedure::region_migration::{Context, State};
38use crate::service::mailbox::Channel;
39
40#[derive(Debug, Serialize, Deserialize)]
41pub struct UpgradeCandidateRegion {
42 pub(crate) optimistic_retry: usize,
44 pub(crate) retry_initial_interval: Duration,
46 pub(crate) require_ready: bool,
49}
50
51impl Default for UpgradeCandidateRegion {
52 fn default() -> Self {
53 Self {
54 optimistic_retry: 3,
55 retry_initial_interval: Duration::from_millis(500),
56 require_ready: true,
57 }
58 }
59}
60
61#[async_trait::async_trait]
62#[typetag::serde]
63impl State for UpgradeCandidateRegion {
64 async fn next(
65 &mut self,
66 ctx: &mut Context,
67 procedure_ctx: &ProcedureContext,
68 ) -> Result<(Box<dyn State>, Status)> {
69 let now = Instant::now();
70
71 let topics = self.get_kafka_topics(ctx).await?;
72 if self
73 .upgrade_region_with_retry(ctx, procedure_ctx, topics)
74 .await
75 {
76 ctx.update_upgrade_candidate_region_elapsed(now);
77 Ok((Box::new(UpdateMetadata::Upgrade), Status::executing(false)))
78 } else {
79 ctx.update_upgrade_candidate_region_elapsed(now);
80 Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false)))
81 }
82 }
83
84 fn as_any(&self) -> &dyn Any {
85 self
86 }
87}
88
89impl UpgradeCandidateRegion {
90 async fn get_kafka_topics(&self, ctx: &mut Context) -> Result<HashSet<String>> {
91 let table_regions = ctx.persistent_ctx.table_regions();
92 let datanode_table_values = ctx.get_from_peer_datanode_table_values().await?;
93 let mut topics = HashSet::new();
94 for (table_id, regions) in table_regions {
95 let Some(datanode_table_value) = datanode_table_values.get(&table_id) else {
96 continue;
97 };
98
99 let region_wal_options =
100 parse_region_wal_options(&datanode_table_value.region_info.region_wal_options)
101 .context(error::ParseWalOptionsSnafu)?;
102
103 for region_id in regions {
104 let Some(WalOptions::Kafka(kafka_wal_options)) =
105 region_wal_options.get(®ion_id.region_number())
106 else {
107 continue;
108 };
109 if !topics.contains(&kafka_wal_options.topic) {
110 topics.insert(kafka_wal_options.topic.clone());
111 }
112 }
113 }
114
115 Ok(topics)
116 }
117
118 async fn build_upgrade_region_instruction(
120 &self,
121 ctx: &mut Context,
122 replay_timeout: Duration,
123 ) -> Result<Instruction> {
124 let region_ids = ctx.persistent_ctx.region_ids.clone();
125 let datanode_table_values = ctx.get_from_peer_datanode_table_values().await?;
126 let mut region_topic = Vec::with_capacity(region_ids.len());
127 for region_id in region_ids.iter() {
128 let table_id = region_id.table_id();
129 if let Some(datanode_table_value) = datanode_table_values.get(&table_id)
130 && let Some(topic) = extract_topic_from_wal_options(
131 *region_id,
132 &datanode_table_value.region_info.region_wal_options,
133 )
134 {
135 region_topic.push((*region_id, topic));
136 }
137 }
138
139 let replay_checkpoints = ctx
140 .get_replay_checkpoints(
141 region_topic
142 .iter()
143 .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic))
144 .collect(),
145 )
146 .await?;
147 let mut upgrade_regions = Vec::with_capacity(region_ids.len());
149 for region_id in region_ids {
150 let last_entry_id = ctx
151 .volatile_ctx
152 .leader_region_last_entry_ids
153 .get(®ion_id)
154 .copied();
155 let metadata_last_entry_id = ctx
156 .volatile_ctx
157 .leader_region_metadata_last_entry_ids
158 .get(®ion_id)
159 .copied();
160 let checkpoint = replay_checkpoints.get(®ion_id).copied();
161 upgrade_regions.push(UpgradeRegion {
162 region_id,
163 last_entry_id,
164 metadata_last_entry_id,
165 replay_timeout,
166 location_id: Some(ctx.persistent_ctx.from_peer.id),
167 replay_entry_id: checkpoint.map(|c| c.entry_id),
168 metadata_replay_entry_id: checkpoint.and_then(|c| c.metadata_entry_id),
169 });
170 }
171
172 Ok(Instruction::UpgradeRegions(upgrade_regions))
173 }
174
175 fn handle_upgrade_region_reply(
176 &self,
177 ctx: &mut Context,
178 UpgradeRegionReply {
179 region_id,
180 ready,
181 exists,
182 error,
183 }: &UpgradeRegionReply,
184 now: &Instant,
185 ) -> Result<()> {
186 let candidate = &ctx.persistent_ctx.to_peer;
187 if error.is_some() {
188 return error::RetryLaterSnafu {
189 reason: format!(
190 "Failed to upgrade the region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
191 region_id,
192 candidate,
193 error,
194 now.elapsed()
195 ),
196 }
197 .fail();
198 }
199
200 ensure!(
201 exists,
202 error::UnexpectedSnafu {
203 violated: format!(
204 "Candidate region {} doesn't exist on datanode {:?}",
205 region_id, candidate
206 )
207 }
208 );
209
210 if self.require_ready && !ready {
211 return error::RetryLaterSnafu {
212 reason: format!(
213 "Candidate region {} still replaying the wal on datanode {:?}, elapsed: {:?}",
214 region_id,
215 candidate,
216 now.elapsed()
217 ),
218 }
219 .fail();
220 }
221
222 Ok(())
223 }
224
225 async fn upgrade_region(&self, ctx: &mut Context) -> Result<()> {
240 let operation_timeout =
241 ctx.next_operation_timeout()
242 .context(error::ExceededDeadlineSnafu {
243 operation: "Upgrade region",
244 })?;
245 let upgrade_instruction = self
246 .build_upgrade_region_instruction(ctx, operation_timeout)
247 .await?;
248
249 let pc = &ctx.persistent_ctx;
250 let region_ids = &pc.region_ids;
251 let candidate = &pc.to_peer;
252
253 let msg = MailboxMessage::json_message(
254 &format!("Upgrade candidate regions: {:?}", region_ids),
255 &format!("Metasrv@{}", ctx.server_addr()),
256 &format!("Datanode-{}@{}", candidate.id, candidate.addr),
257 common_time::util::current_time_millis(),
258 &upgrade_instruction,
259 )
260 .with_context(|_| error::SerializeToJsonSnafu {
261 input: upgrade_instruction.to_string(),
262 })?;
263
264 let ch = Channel::Datanode(candidate.id);
265 let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
266
267 let now = Instant::now();
268 match receiver.await {
269 Ok(msg) => {
270 let reply = HeartbeatMailbox::json_reply(&msg)?;
271 info!(
272 "Received upgrade region reply: {:?}, regions: {:?}, elapsed: {:?}",
273 reply,
274 region_ids,
275 now.elapsed()
276 );
277 let InstructionReply::UpgradeRegions(UpgradeRegionsReply { replies }) = reply
278 else {
279 return error::UnexpectedInstructionReplySnafu {
280 mailbox_message: msg.to_string(),
281 reason: "Unexpected reply of the upgrade region instruction",
282 }
283 .fail();
284 };
285 for reply in replies {
286 self.handle_upgrade_region_reply(ctx, &reply, &now)?;
287 }
288 Ok(())
289 }
290 Err(error::Error::MailboxTimeout { .. }) => {
291 let reason = format!(
292 "Mailbox received timeout for upgrade candidate regions {region_ids:?} on datanode {:?}, elapsed: {:?}",
293 candidate,
294 now.elapsed()
295 );
296 error::RetryLaterSnafu { reason }.fail()
297 }
298 Err(err) => Err(err),
299 }
300 }
301
302 async fn upgrade_region_with_retry(
306 &self,
307 ctx: &mut Context,
308 procedure_ctx: &ProcedureContext,
309 topics: HashSet<String>,
310 ) -> bool {
311 let mut retry = 0;
312 let mut upgraded = false;
313
314 let mut guards = Vec::with_capacity(topics.len());
315 loop {
316 let timer = Instant::now();
317 for topic in &topics {
319 guards.push(
320 procedure_ctx
321 .provider
322 .acquire_lock(&(RemoteWalLock::Read(topic.clone()).into()))
323 .await,
324 );
325 }
326
327 if let Err(err) = self.upgrade_region(ctx).await {
328 retry += 1;
329 ctx.update_operations_elapsed(timer);
330 if matches!(err, error::Error::ExceededDeadline { .. }) {
331 error!("Failed to upgrade region, exceeded deadline");
332 break;
333 } else if err.is_retryable() && retry < self.optimistic_retry {
334 error!("Failed to upgrade region, error: {err:?}, retry later");
335 sleep(self.retry_initial_interval).await;
336 } else {
337 error!("Failed to upgrade region, error: {err:?}");
338 break;
339 }
340 } else {
341 ctx.update_operations_elapsed(timer);
342 upgraded = true;
343 break;
344 }
345 }
346
347 upgraded
348 }
349}
350
351#[cfg(test)]
352mod tests {
353 use std::assert_matches::assert_matches;
354 use std::collections::HashMap;
355
356 use common_meta::key::table_route::TableRouteValue;
357 use common_meta::key::test_utils::new_test_table_info;
358 use common_meta::peer::Peer;
359 use common_meta::rpc::router::{Region, RegionRoute};
360 use store_api::storage::RegionId;
361
362 use super::*;
363 use crate::error::Error;
364 use crate::procedure::region_migration::manager::RegionMigrationTriggerReason;
365 use crate::procedure::region_migration::test_util::{TestingEnv, new_procedure_context};
366 use crate::procedure::region_migration::{ContextFactory, PersistentContext};
367 use crate::procedure::test_util::{
368 new_close_region_reply, new_upgrade_region_reply, send_mock_reply,
369 };
370
371 fn new_persistent_context() -> PersistentContext {
372 PersistentContext {
373 catalog: "greptime".into(),
374 schema: "public".into(),
375 from_peer: Peer::empty(1),
376 to_peer: Peer::empty(2),
377 region_ids: vec![RegionId::new(1024, 1)],
378 timeout: Duration::from_millis(1000),
379 trigger_reason: RegionMigrationTriggerReason::Manual,
380 }
381 }
382
383 async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap<u32, String>) {
384 let region_id = ctx.persistent_ctx.region_ids[0];
385 let table_info = new_test_table_info(region_id.table_id(), vec![1]).into();
386 let region_routes = vec![RegionRoute {
387 region: Region::new_test(region_id),
388 leader_peer: Some(ctx.persistent_ctx.from_peer.clone()),
389 follower_peers: vec![ctx.persistent_ctx.to_peer.clone()],
390 ..Default::default()
391 }];
392 ctx.table_metadata_manager
393 .create_table_metadata(
394 table_info,
395 TableRouteValue::physical(region_routes),
396 wal_options,
397 )
398 .await
399 .unwrap();
400 }
401
402 #[tokio::test]
403 async fn test_datanode_is_unreachable() {
404 let state = UpgradeCandidateRegion::default();
405 let persistent_context = new_persistent_context();
406 let env = TestingEnv::new();
407 let mut ctx = env.context_factory().new_context(persistent_context);
408 prepare_table_metadata(&ctx, HashMap::default()).await;
409 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
410
411 assert_matches!(err, Error::PusherNotFound { .. });
412 assert!(!err.is_retryable());
413 }
414
415 #[tokio::test]
416 async fn test_pusher_dropped() {
417 let state = UpgradeCandidateRegion::default();
418 let persistent_context = new_persistent_context();
419 let to_peer_id = persistent_context.to_peer.id;
420
421 let mut env = TestingEnv::new();
422 let mut ctx = env.context_factory().new_context(persistent_context);
423 prepare_table_metadata(&ctx, HashMap::default()).await;
424 let mailbox_ctx = env.mailbox_context();
425
426 let (tx, rx) = tokio::sync::mpsc::channel(1);
427
428 mailbox_ctx
429 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
430 .await;
431
432 drop(rx);
433
434 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
435
436 assert_matches!(err, Error::PushMessage { .. });
437 assert!(!err.is_retryable());
438 }
439
440 #[tokio::test]
441 async fn test_procedure_exceeded_deadline() {
442 let state = UpgradeCandidateRegion::default();
443 let persistent_context = new_persistent_context();
444 let env = TestingEnv::new();
445 let mut ctx = env.context_factory().new_context(persistent_context);
446 prepare_table_metadata(&ctx, HashMap::default()).await;
447 ctx.volatile_ctx.metrics.operations_elapsed =
448 ctx.persistent_ctx.timeout + Duration::from_secs(1);
449
450 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
451
452 assert_matches!(err, Error::ExceededDeadline { .. });
453 assert!(!err.is_retryable());
454 }
455
456 #[tokio::test]
457 async fn test_unexpected_instruction_reply() {
458 let state = UpgradeCandidateRegion::default();
459 let persistent_context = new_persistent_context();
460 let to_peer_id = persistent_context.to_peer.id;
461
462 let mut env = TestingEnv::new();
463 let mut ctx = env.context_factory().new_context(persistent_context);
464 prepare_table_metadata(&ctx, HashMap::default()).await;
465 let mailbox_ctx = env.mailbox_context();
466 let mailbox = mailbox_ctx.mailbox().clone();
467
468 let (tx, rx) = tokio::sync::mpsc::channel(1);
469
470 mailbox_ctx
471 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
472 .await;
473
474 send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
475
476 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
477 assert_matches!(err, Error::UnexpectedInstructionReply { .. });
478 assert!(!err.is_retryable());
479 }
480
481 #[tokio::test]
482 async fn test_upgrade_region_failed() {
483 let state = UpgradeCandidateRegion::default();
484 let persistent_context = new_persistent_context();
485 let to_peer_id = persistent_context.to_peer.id;
486
487 let mut env = TestingEnv::new();
488 let mut ctx = env.context_factory().new_context(persistent_context);
489 prepare_table_metadata(&ctx, HashMap::default()).await;
490 let mailbox_ctx = env.mailbox_context();
491 let mailbox = mailbox_ctx.mailbox().clone();
492
493 let (tx, rx) = tokio::sync::mpsc::channel(1);
494
495 mailbox_ctx
496 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
497 .await;
498
499 send_mock_reply(mailbox, rx, |id| {
501 Ok(new_upgrade_region_reply(
502 id,
503 true,
504 true,
505 Some("test mocked".to_string()),
506 ))
507 });
508
509 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
510
511 assert_matches!(err, Error::RetryLater { .. });
512 assert!(err.is_retryable());
513 assert!(format!("{err:?}").contains("test mocked"));
514 }
515
516 #[tokio::test]
517 async fn test_upgrade_region_not_found() {
518 let state = UpgradeCandidateRegion::default();
519 let persistent_context = new_persistent_context();
520 let to_peer_id = persistent_context.to_peer.id;
521
522 let mut env = TestingEnv::new();
523 let mut ctx = env.context_factory().new_context(persistent_context);
524 prepare_table_metadata(&ctx, HashMap::default()).await;
525 let mailbox_ctx = env.mailbox_context();
526 let mailbox = mailbox_ctx.mailbox().clone();
527
528 let (tx, rx) = tokio::sync::mpsc::channel(1);
529
530 mailbox_ctx
531 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
532 .await;
533
534 send_mock_reply(mailbox, rx, |id| {
535 Ok(new_upgrade_region_reply(id, true, false, None))
536 });
537
538 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
539
540 assert_matches!(err, Error::Unexpected { .. });
541 assert!(!err.is_retryable());
542 assert!(err.to_string().contains("doesn't exist"));
543 }
544
545 #[tokio::test]
546 async fn test_upgrade_region_require_ready() {
547 let mut state = UpgradeCandidateRegion {
548 require_ready: true,
549 ..Default::default()
550 };
551
552 let persistent_context = new_persistent_context();
553 let to_peer_id = persistent_context.to_peer.id;
554
555 let mut env = TestingEnv::new();
556 let mut ctx = env.context_factory().new_context(persistent_context);
557 prepare_table_metadata(&ctx, HashMap::default()).await;
558 let mailbox_ctx = env.mailbox_context();
559 let mailbox = mailbox_ctx.mailbox().clone();
560
561 let (tx, rx) = tokio::sync::mpsc::channel(1);
562
563 mailbox_ctx
564 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
565 .await;
566
567 send_mock_reply(mailbox, rx, |id| {
568 Ok(new_upgrade_region_reply(id, false, true, None))
569 });
570
571 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
572
573 assert_matches!(err, Error::RetryLater { .. });
574 assert!(err.is_retryable());
575 assert!(format!("{err:?}").contains("still replaying the wal"));
576
577 state.require_ready = false;
579
580 let mailbox = mailbox_ctx.mailbox().clone();
581 let (tx, rx) = tokio::sync::mpsc::channel(1);
582
583 mailbox_ctx
584 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
585 .await;
586
587 send_mock_reply(mailbox, rx, |id| {
588 Ok(new_upgrade_region_reply(id, false, true, None))
589 });
590
591 state.upgrade_region(&mut ctx).await.unwrap();
592 }
593
594 #[tokio::test]
595 async fn test_upgrade_region_with_retry_ok() {
596 let mut state = Box::<UpgradeCandidateRegion>::default();
597 state.retry_initial_interval = Duration::from_millis(100);
598 let persistent_context = new_persistent_context();
599 let to_peer_id = persistent_context.to_peer.id;
600
601 let mut env = TestingEnv::new();
602 let mut ctx = env.context_factory().new_context(persistent_context);
603 prepare_table_metadata(&ctx, HashMap::default()).await;
604 let mailbox_ctx = env.mailbox_context();
605 let mailbox = mailbox_ctx.mailbox().clone();
606
607 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
608
609 mailbox_ctx
610 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
611 .await;
612
613 common_runtime::spawn_global(async move {
614 let resp = rx.recv().await.unwrap().unwrap();
615 let reply_id = resp.mailbox_message.unwrap().id;
616 mailbox
617 .on_recv(
618 reply_id,
619 Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
620 )
621 .await
622 .unwrap();
623
624 let resp = rx.recv().await.unwrap().unwrap();
626 let reply_id = resp.mailbox_message.unwrap().id;
627 mailbox
628 .on_recv(
629 reply_id,
630 Ok(new_upgrade_region_reply(reply_id, false, true, None)),
631 )
632 .await
633 .unwrap();
634
635 let resp = rx.recv().await.unwrap().unwrap();
637 let reply_id = resp.mailbox_message.unwrap().id;
638 mailbox
639 .on_recv(
640 reply_id,
641 Ok(new_upgrade_region_reply(reply_id, true, true, None)),
642 )
643 .await
644 .unwrap();
645 });
646
647 let procedure_ctx = new_procedure_context();
648 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
649
650 let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
651
652 assert_matches!(update_metadata, UpdateMetadata::Upgrade);
653 }
654
655 #[tokio::test]
656 async fn test_upgrade_region_with_retry_failed() {
657 let mut state = Box::<UpgradeCandidateRegion>::default();
658 state.retry_initial_interval = Duration::from_millis(100);
659 let persistent_context = new_persistent_context();
660 let to_peer_id = persistent_context.to_peer.id;
661
662 let mut env = TestingEnv::new();
663 let mut ctx = env.context_factory().new_context(persistent_context);
664 prepare_table_metadata(&ctx, HashMap::default()).await;
665 let mailbox_ctx = env.mailbox_context();
666 let mailbox = mailbox_ctx.mailbox().clone();
667
668 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
669
670 mailbox_ctx
671 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
672 .await;
673
674 common_runtime::spawn_global(async move {
675 let resp = rx.recv().await.unwrap().unwrap();
676 let reply_id = resp.mailbox_message.unwrap().id;
677 mailbox
678 .on_recv(
679 reply_id,
680 Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
681 )
682 .await
683 .unwrap();
684
685 let resp = rx.recv().await.unwrap().unwrap();
687 let reply_id = resp.mailbox_message.unwrap().id;
688 mailbox
689 .on_recv(
690 reply_id,
691 Ok(new_upgrade_region_reply(reply_id, false, true, None)),
692 )
693 .await
694 .unwrap();
695
696 let resp = rx.recv().await.unwrap().unwrap();
698 let reply_id = resp.mailbox_message.unwrap().id;
699 mailbox
700 .on_recv(
701 reply_id,
702 Ok(new_upgrade_region_reply(reply_id, false, false, None)),
703 )
704 .await
705 .unwrap();
706 });
707 let procedure_ctx = new_procedure_context();
708 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
709
710 let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
711 assert_matches!(update_metadata, UpdateMetadata::Rollback);
712 }
713
714 #[tokio::test]
715 async fn test_upgrade_region_procedure_exceeded_deadline() {
716 let mut state = Box::<UpgradeCandidateRegion>::default();
717 state.retry_initial_interval = Duration::from_millis(100);
718 let persistent_context = new_persistent_context();
719 let to_peer_id = persistent_context.to_peer.id;
720
721 let mut env = TestingEnv::new();
722 let mut ctx = env.context_factory().new_context(persistent_context);
723 prepare_table_metadata(&ctx, HashMap::default()).await;
724 let mailbox_ctx = env.mailbox_context();
725 let mailbox = mailbox_ctx.mailbox().clone();
726 ctx.volatile_ctx.metrics.operations_elapsed =
727 ctx.persistent_ctx.timeout + Duration::from_secs(1);
728
729 let (tx, rx) = tokio::sync::mpsc::channel(1);
730 mailbox_ctx
731 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
732 .await;
733
734 send_mock_reply(mailbox, rx, |id| {
735 Ok(new_upgrade_region_reply(id, false, true, None))
736 });
737 let procedure_ctx = new_procedure_context();
738 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
739 let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
740 assert_matches!(update_metadata, UpdateMetadata::Rollback);
741 }
742}