1use std::any::Any;
16use std::collections::HashSet;
17use std::time::Duration;
18
19use api::v1::meta::MailboxMessage;
20use common_meta::ddl::utils::parse_region_wal_options;
21use common_meta::instruction::{
22 Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply, UpgradeRegionsReply,
23};
24use common_meta::key::topic_region::TopicRegionKey;
25use common_meta::lock_key::RemoteWalLock;
26use common_meta::wal_provider::extract_topic_from_wal_options;
27use common_procedure::{Context as ProcedureContext, Status};
28use common_telemetry::tracing_context::TracingContext;
29use common_telemetry::{error, info};
30use common_wal::options::WalOptions;
31use serde::{Deserialize, Serialize};
32use snafu::{OptionExt, ResultExt, ensure};
33use tokio::time::{Instant, sleep};
34
35use crate::error::{self, Result};
36use crate::handler::HeartbeatMailbox;
37use crate::procedure::region_migration::update_metadata::UpdateMetadata;
38use crate::procedure::region_migration::{Context, State};
39use crate::service::mailbox::Channel;
40
41#[derive(Debug, Serialize, Deserialize)]
42pub struct UpgradeCandidateRegion {
43 pub(crate) optimistic_retry: usize,
45 pub(crate) retry_initial_interval: Duration,
47 pub(crate) require_ready: bool,
50}
51
52impl Default for UpgradeCandidateRegion {
53 fn default() -> Self {
54 Self {
55 optimistic_retry: 3,
56 retry_initial_interval: Duration::from_millis(500),
57 require_ready: true,
58 }
59 }
60}
61
62#[async_trait::async_trait]
63#[typetag::serde]
64impl State for UpgradeCandidateRegion {
65 async fn next(
66 &mut self,
67 ctx: &mut Context,
68 procedure_ctx: &ProcedureContext,
69 ) -> Result<(Box<dyn State>, Status)> {
70 let now = Instant::now();
71
72 let topics = self.get_kafka_topics(ctx).await?;
73 if self
74 .upgrade_region_with_retry(ctx, procedure_ctx, topics)
75 .await
76 {
77 ctx.update_upgrade_candidate_region_elapsed(now);
78 Ok((Box::new(UpdateMetadata::Upgrade), Status::executing(false)))
79 } else {
80 ctx.update_upgrade_candidate_region_elapsed(now);
81 Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false)))
82 }
83 }
84
85 fn as_any(&self) -> &dyn Any {
86 self
87 }
88}
89
90impl UpgradeCandidateRegion {
91 async fn get_kafka_topics(&self, ctx: &mut Context) -> Result<HashSet<String>> {
92 let table_regions = ctx.persistent_ctx.table_regions();
93 let datanode_table_values = ctx.get_from_peer_datanode_table_values().await?;
94 let mut topics = HashSet::new();
95 for (table_id, regions) in table_regions {
96 let Some(datanode_table_value) = datanode_table_values.get(&table_id) else {
97 continue;
98 };
99
100 let region_wal_options =
101 parse_region_wal_options(&datanode_table_value.region_info.region_wal_options)
102 .context(error::ParseWalOptionsSnafu)?;
103
104 for region_id in regions {
105 let Some(WalOptions::Kafka(kafka_wal_options)) =
106 region_wal_options.get(®ion_id.region_number())
107 else {
108 continue;
109 };
110 if !topics.contains(&kafka_wal_options.topic) {
111 topics.insert(kafka_wal_options.topic.clone());
112 }
113 }
114 }
115
116 Ok(topics)
117 }
118
119 async fn build_upgrade_region_instruction(
121 &self,
122 ctx: &mut Context,
123 replay_timeout: Duration,
124 ) -> Result<Instruction> {
125 let region_ids = ctx.persistent_ctx.region_ids.clone();
126 let datanode_table_values = ctx.get_from_peer_datanode_table_values().await?;
127 let mut region_topic = Vec::with_capacity(region_ids.len());
128 for region_id in region_ids.iter() {
129 let table_id = region_id.table_id();
130 if let Some(datanode_table_value) = datanode_table_values.get(&table_id)
131 && let Some(topic) = extract_topic_from_wal_options(
132 *region_id,
133 &datanode_table_value.region_info.region_wal_options,
134 )
135 {
136 region_topic.push((*region_id, topic));
137 }
138 }
139
140 let replay_checkpoints = ctx
141 .get_replay_checkpoints(
142 region_topic
143 .iter()
144 .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic))
145 .collect(),
146 )
147 .await?;
148 let mut upgrade_regions = Vec::with_capacity(region_ids.len());
150 for region_id in region_ids {
151 let last_entry_id = ctx
152 .volatile_ctx
153 .leader_region_last_entry_ids
154 .get(®ion_id)
155 .copied();
156 let metadata_last_entry_id = ctx
157 .volatile_ctx
158 .leader_region_metadata_last_entry_ids
159 .get(®ion_id)
160 .copied();
161 let checkpoint = replay_checkpoints.get(®ion_id).copied();
162 upgrade_regions.push(UpgradeRegion {
163 region_id,
164 last_entry_id,
165 metadata_last_entry_id,
166 replay_timeout,
167 location_id: Some(ctx.persistent_ctx.from_peer.id),
168 replay_entry_id: checkpoint.map(|c| c.entry_id),
169 metadata_replay_entry_id: checkpoint.and_then(|c| c.metadata_entry_id),
170 });
171 }
172
173 Ok(Instruction::UpgradeRegions(upgrade_regions))
174 }
175
176 fn handle_upgrade_region_reply(
177 &self,
178 ctx: &mut Context,
179 UpgradeRegionReply {
180 region_id,
181 ready,
182 exists,
183 error,
184 }: &UpgradeRegionReply,
185 now: &Instant,
186 ) -> Result<()> {
187 let candidate = &ctx.persistent_ctx.to_peer;
188 if error.is_some() {
189 return error::RetryLaterSnafu {
190 reason: format!(
191 "Failed to upgrade the region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
192 region_id,
193 candidate,
194 error,
195 now.elapsed()
196 ),
197 }
198 .fail();
199 }
200
201 ensure!(
202 exists,
203 error::UnexpectedSnafu {
204 violated: format!(
205 "Candidate region {} doesn't exist on datanode {:?}",
206 region_id, candidate
207 )
208 }
209 );
210
211 if self.require_ready && !ready {
212 return error::RetryLaterSnafu {
213 reason: format!(
214 "Candidate region {} still replaying the wal on datanode {:?}, elapsed: {:?}",
215 region_id,
216 candidate,
217 now.elapsed()
218 ),
219 }
220 .fail();
221 }
222
223 Ok(())
224 }
225
226 async fn upgrade_region(&self, ctx: &mut Context) -> Result<()> {
241 let operation_timeout =
242 ctx.next_operation_timeout()
243 .context(error::ExceededDeadlineSnafu {
244 operation: "Upgrade region",
245 })?;
246 let upgrade_instruction = self
247 .build_upgrade_region_instruction(ctx, operation_timeout)
248 .await?;
249
250 let pc = &ctx.persistent_ctx;
251 let region_ids = &pc.region_ids;
252 let candidate = &pc.to_peer;
253
254 let tracing_ctx = TracingContext::from_current_span();
255 let msg = MailboxMessage::json_message(
256 &format!("Upgrade candidate regions: {:?}", region_ids),
257 &format!("Metasrv@{}", ctx.server_addr()),
258 &format!("Datanode-{}@{}", candidate.id, candidate.addr),
259 common_time::util::current_time_millis(),
260 &upgrade_instruction,
261 Some(tracing_ctx.to_w3c()),
262 )
263 .with_context(|_| error::SerializeToJsonSnafu {
264 input: upgrade_instruction.to_string(),
265 })?;
266
267 let ch = Channel::Datanode(candidate.id);
268 let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
269
270 let now = Instant::now();
271 match receiver.await {
272 Ok(msg) => {
273 let reply = HeartbeatMailbox::json_reply(&msg)?;
274 info!(
275 "Received upgrade region reply: {:?}, regions: {:?}, elapsed: {:?}",
276 reply,
277 region_ids,
278 now.elapsed()
279 );
280 let InstructionReply::UpgradeRegions(UpgradeRegionsReply { replies }) = reply
281 else {
282 return error::UnexpectedInstructionReplySnafu {
283 mailbox_message: msg.to_string(),
284 reason: "Unexpected reply of the upgrade region instruction",
285 }
286 .fail();
287 };
288 for reply in replies {
289 self.handle_upgrade_region_reply(ctx, &reply, &now)?;
290 }
291 Ok(())
292 }
293 Err(error::Error::MailboxTimeout { .. }) => {
294 let reason = format!(
295 "Mailbox received timeout for upgrade candidate regions {region_ids:?} on datanode {:?}, elapsed: {:?}",
296 candidate,
297 now.elapsed()
298 );
299 error::RetryLaterSnafu { reason }.fail()
300 }
301 Err(err) => Err(err),
302 }
303 }
304
305 async fn upgrade_region_with_retry(
309 &self,
310 ctx: &mut Context,
311 procedure_ctx: &ProcedureContext,
312 topics: HashSet<String>,
313 ) -> bool {
314 let mut retry = 0;
315 let mut upgraded = false;
316
317 let mut guards = Vec::with_capacity(topics.len());
318 loop {
319 let timer = Instant::now();
320 for topic in &topics {
322 guards.push(
323 procedure_ctx
324 .provider
325 .acquire_lock(&(RemoteWalLock::Read(topic.clone()).into()))
326 .await,
327 );
328 }
329
330 if let Err(err) = self.upgrade_region(ctx).await {
331 retry += 1;
332 ctx.update_operations_elapsed(timer);
333 if matches!(err, error::Error::ExceededDeadline { .. }) {
334 error!("Failed to upgrade region, exceeded deadline");
335 break;
336 } else if err.is_retryable() && retry < self.optimistic_retry {
337 error!("Failed to upgrade region, error: {err:?}, retry later");
338 sleep(self.retry_initial_interval).await;
339 } else {
340 error!("Failed to upgrade region, error: {err:?}");
341 break;
342 }
343 } else {
344 ctx.update_operations_elapsed(timer);
345 upgraded = true;
346 break;
347 }
348 }
349
350 upgraded
351 }
352}
353
354#[cfg(test)]
355mod tests {
356 use std::assert_matches::assert_matches;
357 use std::collections::HashMap;
358
359 use common_meta::key::table_route::TableRouteValue;
360 use common_meta::key::test_utils::new_test_table_info;
361 use common_meta::peer::Peer;
362 use common_meta::rpc::router::{Region, RegionRoute};
363 use store_api::storage::RegionId;
364
365 use super::*;
366 use crate::error::Error;
367 use crate::procedure::region_migration::manager::RegionMigrationTriggerReason;
368 use crate::procedure::region_migration::test_util::{TestingEnv, new_procedure_context};
369 use crate::procedure::region_migration::{ContextFactory, PersistentContext};
370 use crate::procedure::test_util::{
371 new_close_region_reply, new_upgrade_region_reply, send_mock_reply,
372 };
373
374 fn new_persistent_context() -> PersistentContext {
375 PersistentContext::new(
376 vec![("greptime".into(), "public".into())],
377 Peer::empty(1),
378 Peer::empty(2),
379 vec![RegionId::new(1024, 1)],
380 Duration::from_millis(1000),
381 RegionMigrationTriggerReason::Manual,
382 )
383 }
384
385 async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap<u32, String>) {
386 let region_id = ctx.persistent_ctx.region_ids[0];
387 let table_info = new_test_table_info(region_id.table_id());
388 let region_routes = vec![RegionRoute {
389 region: Region::new_test(region_id),
390 leader_peer: Some(ctx.persistent_ctx.from_peer.clone()),
391 follower_peers: vec![ctx.persistent_ctx.to_peer.clone()],
392 ..Default::default()
393 }];
394 ctx.table_metadata_manager
395 .create_table_metadata(
396 table_info,
397 TableRouteValue::physical(region_routes),
398 wal_options,
399 )
400 .await
401 .unwrap();
402 }
403
404 #[tokio::test]
405 async fn test_datanode_is_unreachable() {
406 let state = UpgradeCandidateRegion::default();
407 let persistent_context = new_persistent_context();
408 let env = TestingEnv::new();
409 let mut ctx = env.context_factory().new_context(persistent_context);
410 prepare_table_metadata(&ctx, HashMap::default()).await;
411 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
412
413 assert_matches!(err, Error::PusherNotFound { .. });
414 assert!(!err.is_retryable());
415 }
416
417 #[tokio::test]
418 async fn test_pusher_dropped() {
419 let state = UpgradeCandidateRegion::default();
420 let persistent_context = new_persistent_context();
421 let to_peer_id = persistent_context.to_peer.id;
422
423 let mut env = TestingEnv::new();
424 let mut ctx = env.context_factory().new_context(persistent_context);
425 prepare_table_metadata(&ctx, HashMap::default()).await;
426 let mailbox_ctx = env.mailbox_context();
427
428 let (tx, rx) = tokio::sync::mpsc::channel(1);
429
430 mailbox_ctx
431 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
432 .await;
433
434 drop(rx);
435
436 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
437
438 assert_matches!(err, Error::PushMessage { .. });
439 assert!(!err.is_retryable());
440 }
441
442 #[tokio::test]
443 async fn test_procedure_exceeded_deadline() {
444 let state = UpgradeCandidateRegion::default();
445 let persistent_context = new_persistent_context();
446 let env = TestingEnv::new();
447 let mut ctx = env.context_factory().new_context(persistent_context);
448 prepare_table_metadata(&ctx, HashMap::default()).await;
449 ctx.volatile_ctx.metrics.operations_elapsed =
450 ctx.persistent_ctx.timeout + Duration::from_secs(1);
451
452 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
453
454 assert_matches!(err, Error::ExceededDeadline { .. });
455 assert!(!err.is_retryable());
456 }
457
458 #[tokio::test]
459 async fn test_unexpected_instruction_reply() {
460 let state = UpgradeCandidateRegion::default();
461 let persistent_context = new_persistent_context();
462 let to_peer_id = persistent_context.to_peer.id;
463
464 let mut env = TestingEnv::new();
465 let mut ctx = env.context_factory().new_context(persistent_context);
466 prepare_table_metadata(&ctx, HashMap::default()).await;
467 let mailbox_ctx = env.mailbox_context();
468 let mailbox = mailbox_ctx.mailbox().clone();
469
470 let (tx, rx) = tokio::sync::mpsc::channel(1);
471
472 mailbox_ctx
473 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
474 .await;
475
476 send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
477
478 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
479 assert_matches!(err, Error::UnexpectedInstructionReply { .. });
480 assert!(!err.is_retryable());
481 }
482
483 #[tokio::test]
484 async fn test_upgrade_region_failed() {
485 let state = UpgradeCandidateRegion::default();
486 let persistent_context = new_persistent_context();
487 let to_peer_id = persistent_context.to_peer.id;
488
489 let mut env = TestingEnv::new();
490 let mut ctx = env.context_factory().new_context(persistent_context);
491 prepare_table_metadata(&ctx, HashMap::default()).await;
492 let mailbox_ctx = env.mailbox_context();
493 let mailbox = mailbox_ctx.mailbox().clone();
494
495 let (tx, rx) = tokio::sync::mpsc::channel(1);
496
497 mailbox_ctx
498 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
499 .await;
500
501 send_mock_reply(mailbox, rx, |id| {
503 Ok(new_upgrade_region_reply(
504 id,
505 true,
506 true,
507 Some("test mocked".to_string()),
508 ))
509 });
510
511 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
512
513 assert_matches!(err, Error::RetryLater { .. });
514 assert!(err.is_retryable());
515 assert!(format!("{err:?}").contains("test mocked"));
516 }
517
518 #[tokio::test]
519 async fn test_upgrade_region_not_found() {
520 let state = UpgradeCandidateRegion::default();
521 let persistent_context = new_persistent_context();
522 let to_peer_id = persistent_context.to_peer.id;
523
524 let mut env = TestingEnv::new();
525 let mut ctx = env.context_factory().new_context(persistent_context);
526 prepare_table_metadata(&ctx, HashMap::default()).await;
527 let mailbox_ctx = env.mailbox_context();
528 let mailbox = mailbox_ctx.mailbox().clone();
529
530 let (tx, rx) = tokio::sync::mpsc::channel(1);
531
532 mailbox_ctx
533 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
534 .await;
535
536 send_mock_reply(mailbox, rx, |id| {
537 Ok(new_upgrade_region_reply(id, true, false, None))
538 });
539
540 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
541
542 assert_matches!(err, Error::Unexpected { .. });
543 assert!(!err.is_retryable());
544 assert!(err.to_string().contains("doesn't exist"));
545 }
546
547 #[tokio::test]
548 async fn test_upgrade_region_require_ready() {
549 let mut state = UpgradeCandidateRegion {
550 require_ready: true,
551 ..Default::default()
552 };
553
554 let persistent_context = new_persistent_context();
555 let to_peer_id = persistent_context.to_peer.id;
556
557 let mut env = TestingEnv::new();
558 let mut ctx = env.context_factory().new_context(persistent_context);
559 prepare_table_metadata(&ctx, HashMap::default()).await;
560 let mailbox_ctx = env.mailbox_context();
561 let mailbox = mailbox_ctx.mailbox().clone();
562
563 let (tx, rx) = tokio::sync::mpsc::channel(1);
564
565 mailbox_ctx
566 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
567 .await;
568
569 send_mock_reply(mailbox, rx, |id| {
570 Ok(new_upgrade_region_reply(id, false, true, None))
571 });
572
573 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
574
575 assert_matches!(err, Error::RetryLater { .. });
576 assert!(err.is_retryable());
577 assert!(format!("{err:?}").contains("still replaying the wal"));
578
579 state.require_ready = false;
581
582 let mailbox = mailbox_ctx.mailbox().clone();
583 let (tx, rx) = tokio::sync::mpsc::channel(1);
584
585 mailbox_ctx
586 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
587 .await;
588
589 send_mock_reply(mailbox, rx, |id| {
590 Ok(new_upgrade_region_reply(id, false, true, None))
591 });
592
593 state.upgrade_region(&mut ctx).await.unwrap();
594 }
595
596 #[tokio::test]
597 async fn test_upgrade_region_with_retry_ok() {
598 let mut state = Box::<UpgradeCandidateRegion>::default();
599 state.retry_initial_interval = Duration::from_millis(100);
600 let persistent_context = new_persistent_context();
601 let to_peer_id = persistent_context.to_peer.id;
602
603 let mut env = TestingEnv::new();
604 let mut ctx = env.context_factory().new_context(persistent_context);
605 prepare_table_metadata(&ctx, HashMap::default()).await;
606 let mailbox_ctx = env.mailbox_context();
607 let mailbox = mailbox_ctx.mailbox().clone();
608
609 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
610
611 mailbox_ctx
612 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
613 .await;
614
615 common_runtime::spawn_global(async move {
616 let resp = rx.recv().await.unwrap().unwrap();
617 let reply_id = resp.mailbox_message.unwrap().id;
618 mailbox
619 .on_recv(
620 reply_id,
621 Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
622 )
623 .await
624 .unwrap();
625
626 let resp = rx.recv().await.unwrap().unwrap();
628 let reply_id = resp.mailbox_message.unwrap().id;
629 mailbox
630 .on_recv(
631 reply_id,
632 Ok(new_upgrade_region_reply(reply_id, false, true, None)),
633 )
634 .await
635 .unwrap();
636
637 let resp = rx.recv().await.unwrap().unwrap();
639 let reply_id = resp.mailbox_message.unwrap().id;
640 mailbox
641 .on_recv(
642 reply_id,
643 Ok(new_upgrade_region_reply(reply_id, true, true, None)),
644 )
645 .await
646 .unwrap();
647 });
648
649 let procedure_ctx = new_procedure_context();
650 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
651
652 let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
653
654 assert_matches!(update_metadata, UpdateMetadata::Upgrade);
655 }
656
657 #[tokio::test]
658 async fn test_upgrade_region_with_retry_failed() {
659 let mut state = Box::<UpgradeCandidateRegion>::default();
660 state.retry_initial_interval = Duration::from_millis(100);
661 let persistent_context = new_persistent_context();
662 let to_peer_id = persistent_context.to_peer.id;
663
664 let mut env = TestingEnv::new();
665 let mut ctx = env.context_factory().new_context(persistent_context);
666 prepare_table_metadata(&ctx, HashMap::default()).await;
667 let mailbox_ctx = env.mailbox_context();
668 let mailbox = mailbox_ctx.mailbox().clone();
669
670 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
671
672 mailbox_ctx
673 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
674 .await;
675
676 common_runtime::spawn_global(async move {
677 let resp = rx.recv().await.unwrap().unwrap();
678 let reply_id = resp.mailbox_message.unwrap().id;
679 mailbox
680 .on_recv(
681 reply_id,
682 Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
683 )
684 .await
685 .unwrap();
686
687 let resp = rx.recv().await.unwrap().unwrap();
689 let reply_id = resp.mailbox_message.unwrap().id;
690 mailbox
691 .on_recv(
692 reply_id,
693 Ok(new_upgrade_region_reply(reply_id, false, true, None)),
694 )
695 .await
696 .unwrap();
697
698 let resp = rx.recv().await.unwrap().unwrap();
700 let reply_id = resp.mailbox_message.unwrap().id;
701 mailbox
702 .on_recv(
703 reply_id,
704 Ok(new_upgrade_region_reply(reply_id, false, false, None)),
705 )
706 .await
707 .unwrap();
708 });
709 let procedure_ctx = new_procedure_context();
710 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
711
712 let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
713 assert_matches!(update_metadata, UpdateMetadata::Rollback);
714 }
715
716 #[tokio::test]
717 async fn test_upgrade_region_procedure_exceeded_deadline() {
718 let mut state = Box::<UpgradeCandidateRegion>::default();
719 state.retry_initial_interval = Duration::from_millis(100);
720 let persistent_context = new_persistent_context();
721 let to_peer_id = persistent_context.to_peer.id;
722
723 let mut env = TestingEnv::new();
724 let mut ctx = env.context_factory().new_context(persistent_context);
725 prepare_table_metadata(&ctx, HashMap::default()).await;
726 let mailbox_ctx = env.mailbox_context();
727 let mailbox = mailbox_ctx.mailbox().clone();
728 ctx.volatile_ctx.metrics.operations_elapsed =
729 ctx.persistent_ctx.timeout + Duration::from_secs(1);
730
731 let (tx, rx) = tokio::sync::mpsc::channel(1);
732 mailbox_ctx
733 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
734 .await;
735
736 send_mock_reply(mailbox, rx, |id| {
737 Ok(new_upgrade_region_reply(id, false, true, None))
738 });
739 let procedure_ctx = new_procedure_context();
740 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
741 let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
742 assert_matches!(update_metadata, UpdateMetadata::Rollback);
743 }
744}