1use std::any::Any;
16use std::collections::HashSet;
17use std::time::Duration;
18
19use api::v1::meta::MailboxMessage;
20use common_meta::ddl::utils::parse_region_wal_options;
21use common_meta::instruction::{
22 Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply, UpgradeRegionsReply,
23};
24use common_meta::key::topic_region::TopicRegionKey;
25use common_meta::lock_key::RemoteWalLock;
26use common_meta::wal_options_allocator::extract_topic_from_wal_options;
27use common_procedure::{Context as ProcedureContext, Status};
28use common_telemetry::{error, info};
29use common_wal::options::WalOptions;
30use serde::{Deserialize, Serialize};
31use snafu::{OptionExt, ResultExt, ensure};
32use tokio::time::{Instant, sleep};
33
34use crate::error::{self, Result};
35use crate::handler::HeartbeatMailbox;
36use crate::procedure::region_migration::update_metadata::UpdateMetadata;
37use crate::procedure::region_migration::{Context, State};
38use crate::service::mailbox::Channel;
39
40#[derive(Debug, Serialize, Deserialize)]
41pub struct UpgradeCandidateRegion {
42 pub(crate) optimistic_retry: usize,
44 pub(crate) retry_initial_interval: Duration,
46 pub(crate) require_ready: bool,
49}
50
51impl Default for UpgradeCandidateRegion {
52 fn default() -> Self {
53 Self {
54 optimistic_retry: 3,
55 retry_initial_interval: Duration::from_millis(500),
56 require_ready: true,
57 }
58 }
59}
60
61#[async_trait::async_trait]
62#[typetag::serde]
63impl State for UpgradeCandidateRegion {
64 async fn next(
65 &mut self,
66 ctx: &mut Context,
67 procedure_ctx: &ProcedureContext,
68 ) -> Result<(Box<dyn State>, Status)> {
69 let now = Instant::now();
70
71 let topics = self.get_kafka_topics(ctx).await?;
72 if self
73 .upgrade_region_with_retry(ctx, procedure_ctx, topics)
74 .await
75 {
76 ctx.update_upgrade_candidate_region_elapsed(now);
77 Ok((Box::new(UpdateMetadata::Upgrade), Status::executing(false)))
78 } else {
79 ctx.update_upgrade_candidate_region_elapsed(now);
80 Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false)))
81 }
82 }
83
84 fn as_any(&self) -> &dyn Any {
85 self
86 }
87}
88
89impl UpgradeCandidateRegion {
90 async fn get_kafka_topics(&self, ctx: &mut Context) -> Result<HashSet<String>> {
91 let table_regions = ctx.persistent_ctx.table_regions();
92 let datanode_table_values = ctx.get_from_peer_datanode_table_values().await?;
93 let mut topics = HashSet::new();
94 for (table_id, regions) in table_regions {
95 let Some(datanode_table_value) = datanode_table_values.get(&table_id) else {
96 continue;
97 };
98
99 let region_wal_options =
100 parse_region_wal_options(&datanode_table_value.region_info.region_wal_options)
101 .context(error::ParseWalOptionsSnafu)?;
102
103 for region_id in regions {
104 let Some(WalOptions::Kafka(kafka_wal_options)) =
105 region_wal_options.get(®ion_id.region_number())
106 else {
107 continue;
108 };
109 if !topics.contains(&kafka_wal_options.topic) {
110 topics.insert(kafka_wal_options.topic.clone());
111 }
112 }
113 }
114
115 Ok(topics)
116 }
117
118 async fn build_upgrade_region_instruction(
120 &self,
121 ctx: &mut Context,
122 replay_timeout: Duration,
123 ) -> Result<Instruction> {
124 let region_ids = ctx.persistent_ctx.region_ids.clone();
125 let datanode_table_values = ctx.get_from_peer_datanode_table_values().await?;
126 let mut region_topic = Vec::with_capacity(region_ids.len());
127 for region_id in region_ids.iter() {
128 let table_id = region_id.table_id();
129 if let Some(datanode_table_value) = datanode_table_values.get(&table_id)
130 && let Some(topic) = extract_topic_from_wal_options(
131 *region_id,
132 &datanode_table_value.region_info.region_wal_options,
133 )
134 {
135 region_topic.push((*region_id, topic));
136 }
137 }
138
139 let replay_checkpoints = ctx
140 .get_replay_checkpoints(
141 region_topic
142 .iter()
143 .map(|(region_id, topic)| TopicRegionKey::new(*region_id, topic))
144 .collect(),
145 )
146 .await?;
147 let mut upgrade_regions = Vec::with_capacity(region_ids.len());
149 for region_id in region_ids {
150 let last_entry_id = ctx
151 .volatile_ctx
152 .leader_region_last_entry_ids
153 .get(®ion_id)
154 .copied();
155 let metadata_last_entry_id = ctx
156 .volatile_ctx
157 .leader_region_metadata_last_entry_ids
158 .get(®ion_id)
159 .copied();
160 let checkpoint = replay_checkpoints.get(®ion_id).copied();
161 upgrade_regions.push(UpgradeRegion {
162 region_id,
163 last_entry_id,
164 metadata_last_entry_id,
165 replay_timeout,
166 location_id: Some(ctx.persistent_ctx.from_peer.id),
167 replay_entry_id: checkpoint.map(|c| c.entry_id),
168 metadata_replay_entry_id: checkpoint.and_then(|c| c.metadata_entry_id),
169 });
170 }
171
172 Ok(Instruction::UpgradeRegions(upgrade_regions))
173 }
174
175 fn handle_upgrade_region_reply(
176 &self,
177 ctx: &mut Context,
178 UpgradeRegionReply {
179 region_id,
180 ready,
181 exists,
182 error,
183 }: &UpgradeRegionReply,
184 now: &Instant,
185 ) -> Result<()> {
186 let candidate = &ctx.persistent_ctx.to_peer;
187 if error.is_some() {
188 return error::RetryLaterSnafu {
189 reason: format!(
190 "Failed to upgrade the region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
191 region_id,
192 candidate,
193 error,
194 now.elapsed()
195 ),
196 }
197 .fail();
198 }
199
200 ensure!(
201 exists,
202 error::UnexpectedSnafu {
203 violated: format!(
204 "Candidate region {} doesn't exist on datanode {:?}",
205 region_id, candidate
206 )
207 }
208 );
209
210 if self.require_ready && !ready {
211 return error::RetryLaterSnafu {
212 reason: format!(
213 "Candidate region {} still replaying the wal on datanode {:?}, elapsed: {:?}",
214 region_id,
215 candidate,
216 now.elapsed()
217 ),
218 }
219 .fail();
220 }
221
222 Ok(())
223 }
224
225 async fn upgrade_region(&self, ctx: &mut Context) -> Result<()> {
240 let operation_timeout =
241 ctx.next_operation_timeout()
242 .context(error::ExceededDeadlineSnafu {
243 operation: "Upgrade region",
244 })?;
245 let upgrade_instruction = self
246 .build_upgrade_region_instruction(ctx, operation_timeout)
247 .await?;
248
249 let pc = &ctx.persistent_ctx;
250 let region_ids = &pc.region_ids;
251 let candidate = &pc.to_peer;
252
253 let msg = MailboxMessage::json_message(
254 &format!("Upgrade candidate regions: {:?}", region_ids),
255 &format!("Metasrv@{}", ctx.server_addr()),
256 &format!("Datanode-{}@{}", candidate.id, candidate.addr),
257 common_time::util::current_time_millis(),
258 &upgrade_instruction,
259 )
260 .with_context(|_| error::SerializeToJsonSnafu {
261 input: upgrade_instruction.to_string(),
262 })?;
263
264 let ch = Channel::Datanode(candidate.id);
265 let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
266
267 let now = Instant::now();
268 match receiver.await {
269 Ok(msg) => {
270 let reply = HeartbeatMailbox::json_reply(&msg)?;
271 info!(
272 "Received upgrade region reply: {:?}, regions: {:?}, elapsed: {:?}",
273 reply,
274 region_ids,
275 now.elapsed()
276 );
277 let InstructionReply::UpgradeRegions(UpgradeRegionsReply { replies }) = reply
278 else {
279 return error::UnexpectedInstructionReplySnafu {
280 mailbox_message: msg.to_string(),
281 reason: "Unexpected reply of the upgrade region instruction",
282 }
283 .fail();
284 };
285 for reply in replies {
286 self.handle_upgrade_region_reply(ctx, &reply, &now)?;
287 }
288 Ok(())
289 }
290 Err(error::Error::MailboxTimeout { .. }) => {
291 let reason = format!(
292 "Mailbox received timeout for upgrade candidate regions {region_ids:?} on datanode {:?}, elapsed: {:?}",
293 candidate,
294 now.elapsed()
295 );
296 error::RetryLaterSnafu { reason }.fail()
297 }
298 Err(err) => Err(err),
299 }
300 }
301
302 async fn upgrade_region_with_retry(
306 &self,
307 ctx: &mut Context,
308 procedure_ctx: &ProcedureContext,
309 topics: HashSet<String>,
310 ) -> bool {
311 let mut retry = 0;
312 let mut upgraded = false;
313
314 let mut guards = Vec::with_capacity(topics.len());
315 loop {
316 let timer = Instant::now();
317 for topic in &topics {
319 guards.push(
320 procedure_ctx
321 .provider
322 .acquire_lock(&(RemoteWalLock::Read(topic.clone()).into()))
323 .await,
324 );
325 }
326
327 if let Err(err) = self.upgrade_region(ctx).await {
328 retry += 1;
329 ctx.update_operations_elapsed(timer);
330 if matches!(err, error::Error::ExceededDeadline { .. }) {
331 error!("Failed to upgrade region, exceeded deadline");
332 break;
333 } else if err.is_retryable() && retry < self.optimistic_retry {
334 error!("Failed to upgrade region, error: {err:?}, retry later");
335 sleep(self.retry_initial_interval).await;
336 } else {
337 error!("Failed to upgrade region, error: {err:?}");
338 break;
339 }
340 } else {
341 ctx.update_operations_elapsed(timer);
342 upgraded = true;
343 break;
344 }
345 }
346
347 upgraded
348 }
349}
350
351#[cfg(test)]
352mod tests {
353 use std::assert_matches::assert_matches;
354 use std::collections::HashMap;
355
356 use common_meta::key::table_route::TableRouteValue;
357 use common_meta::key::test_utils::new_test_table_info;
358 use common_meta::peer::Peer;
359 use common_meta::rpc::router::{Region, RegionRoute};
360 use store_api::storage::RegionId;
361
362 use super::*;
363 use crate::error::Error;
364 use crate::procedure::region_migration::manager::RegionMigrationTriggerReason;
365 use crate::procedure::region_migration::test_util::{TestingEnv, new_procedure_context};
366 use crate::procedure::region_migration::{ContextFactory, PersistentContext};
367 use crate::procedure::test_util::{
368 new_close_region_reply, new_upgrade_region_reply, send_mock_reply,
369 };
370
371 fn new_persistent_context() -> PersistentContext {
372 PersistentContext::new(
373 vec![("greptime".into(), "public".into())],
374 Peer::empty(1),
375 Peer::empty(2),
376 vec![RegionId::new(1024, 1)],
377 Duration::from_millis(1000),
378 RegionMigrationTriggerReason::Manual,
379 )
380 }
381
382 async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap<u32, String>) {
383 let region_id = ctx.persistent_ctx.region_ids[0];
384 let table_info = new_test_table_info(region_id.table_id(), vec![1]).into();
385 let region_routes = vec![RegionRoute {
386 region: Region::new_test(region_id),
387 leader_peer: Some(ctx.persistent_ctx.from_peer.clone()),
388 follower_peers: vec![ctx.persistent_ctx.to_peer.clone()],
389 ..Default::default()
390 }];
391 ctx.table_metadata_manager
392 .create_table_metadata(
393 table_info,
394 TableRouteValue::physical(region_routes),
395 wal_options,
396 )
397 .await
398 .unwrap();
399 }
400
401 #[tokio::test]
402 async fn test_datanode_is_unreachable() {
403 let state = UpgradeCandidateRegion::default();
404 let persistent_context = new_persistent_context();
405 let env = TestingEnv::new();
406 let mut ctx = env.context_factory().new_context(persistent_context);
407 prepare_table_metadata(&ctx, HashMap::default()).await;
408 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
409
410 assert_matches!(err, Error::PusherNotFound { .. });
411 assert!(!err.is_retryable());
412 }
413
414 #[tokio::test]
415 async fn test_pusher_dropped() {
416 let state = UpgradeCandidateRegion::default();
417 let persistent_context = new_persistent_context();
418 let to_peer_id = persistent_context.to_peer.id;
419
420 let mut env = TestingEnv::new();
421 let mut ctx = env.context_factory().new_context(persistent_context);
422 prepare_table_metadata(&ctx, HashMap::default()).await;
423 let mailbox_ctx = env.mailbox_context();
424
425 let (tx, rx) = tokio::sync::mpsc::channel(1);
426
427 mailbox_ctx
428 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
429 .await;
430
431 drop(rx);
432
433 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
434
435 assert_matches!(err, Error::PushMessage { .. });
436 assert!(!err.is_retryable());
437 }
438
439 #[tokio::test]
440 async fn test_procedure_exceeded_deadline() {
441 let state = UpgradeCandidateRegion::default();
442 let persistent_context = new_persistent_context();
443 let env = TestingEnv::new();
444 let mut ctx = env.context_factory().new_context(persistent_context);
445 prepare_table_metadata(&ctx, HashMap::default()).await;
446 ctx.volatile_ctx.metrics.operations_elapsed =
447 ctx.persistent_ctx.timeout + Duration::from_secs(1);
448
449 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
450
451 assert_matches!(err, Error::ExceededDeadline { .. });
452 assert!(!err.is_retryable());
453 }
454
455 #[tokio::test]
456 async fn test_unexpected_instruction_reply() {
457 let state = UpgradeCandidateRegion::default();
458 let persistent_context = new_persistent_context();
459 let to_peer_id = persistent_context.to_peer.id;
460
461 let mut env = TestingEnv::new();
462 let mut ctx = env.context_factory().new_context(persistent_context);
463 prepare_table_metadata(&ctx, HashMap::default()).await;
464 let mailbox_ctx = env.mailbox_context();
465 let mailbox = mailbox_ctx.mailbox().clone();
466
467 let (tx, rx) = tokio::sync::mpsc::channel(1);
468
469 mailbox_ctx
470 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
471 .await;
472
473 send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
474
475 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
476 assert_matches!(err, Error::UnexpectedInstructionReply { .. });
477 assert!(!err.is_retryable());
478 }
479
480 #[tokio::test]
481 async fn test_upgrade_region_failed() {
482 let state = UpgradeCandidateRegion::default();
483 let persistent_context = new_persistent_context();
484 let to_peer_id = persistent_context.to_peer.id;
485
486 let mut env = TestingEnv::new();
487 let mut ctx = env.context_factory().new_context(persistent_context);
488 prepare_table_metadata(&ctx, HashMap::default()).await;
489 let mailbox_ctx = env.mailbox_context();
490 let mailbox = mailbox_ctx.mailbox().clone();
491
492 let (tx, rx) = tokio::sync::mpsc::channel(1);
493
494 mailbox_ctx
495 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
496 .await;
497
498 send_mock_reply(mailbox, rx, |id| {
500 Ok(new_upgrade_region_reply(
501 id,
502 true,
503 true,
504 Some("test mocked".to_string()),
505 ))
506 });
507
508 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
509
510 assert_matches!(err, Error::RetryLater { .. });
511 assert!(err.is_retryable());
512 assert!(format!("{err:?}").contains("test mocked"));
513 }
514
515 #[tokio::test]
516 async fn test_upgrade_region_not_found() {
517 let state = UpgradeCandidateRegion::default();
518 let persistent_context = new_persistent_context();
519 let to_peer_id = persistent_context.to_peer.id;
520
521 let mut env = TestingEnv::new();
522 let mut ctx = env.context_factory().new_context(persistent_context);
523 prepare_table_metadata(&ctx, HashMap::default()).await;
524 let mailbox_ctx = env.mailbox_context();
525 let mailbox = mailbox_ctx.mailbox().clone();
526
527 let (tx, rx) = tokio::sync::mpsc::channel(1);
528
529 mailbox_ctx
530 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
531 .await;
532
533 send_mock_reply(mailbox, rx, |id| {
534 Ok(new_upgrade_region_reply(id, true, false, None))
535 });
536
537 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
538
539 assert_matches!(err, Error::Unexpected { .. });
540 assert!(!err.is_retryable());
541 assert!(err.to_string().contains("doesn't exist"));
542 }
543
544 #[tokio::test]
545 async fn test_upgrade_region_require_ready() {
546 let mut state = UpgradeCandidateRegion {
547 require_ready: true,
548 ..Default::default()
549 };
550
551 let persistent_context = new_persistent_context();
552 let to_peer_id = persistent_context.to_peer.id;
553
554 let mut env = TestingEnv::new();
555 let mut ctx = env.context_factory().new_context(persistent_context);
556 prepare_table_metadata(&ctx, HashMap::default()).await;
557 let mailbox_ctx = env.mailbox_context();
558 let mailbox = mailbox_ctx.mailbox().clone();
559
560 let (tx, rx) = tokio::sync::mpsc::channel(1);
561
562 mailbox_ctx
563 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
564 .await;
565
566 send_mock_reply(mailbox, rx, |id| {
567 Ok(new_upgrade_region_reply(id, false, true, None))
568 });
569
570 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
571
572 assert_matches!(err, Error::RetryLater { .. });
573 assert!(err.is_retryable());
574 assert!(format!("{err:?}").contains("still replaying the wal"));
575
576 state.require_ready = false;
578
579 let mailbox = mailbox_ctx.mailbox().clone();
580 let (tx, rx) = tokio::sync::mpsc::channel(1);
581
582 mailbox_ctx
583 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
584 .await;
585
586 send_mock_reply(mailbox, rx, |id| {
587 Ok(new_upgrade_region_reply(id, false, true, None))
588 });
589
590 state.upgrade_region(&mut ctx).await.unwrap();
591 }
592
593 #[tokio::test]
594 async fn test_upgrade_region_with_retry_ok() {
595 let mut state = Box::<UpgradeCandidateRegion>::default();
596 state.retry_initial_interval = Duration::from_millis(100);
597 let persistent_context = new_persistent_context();
598 let to_peer_id = persistent_context.to_peer.id;
599
600 let mut env = TestingEnv::new();
601 let mut ctx = env.context_factory().new_context(persistent_context);
602 prepare_table_metadata(&ctx, HashMap::default()).await;
603 let mailbox_ctx = env.mailbox_context();
604 let mailbox = mailbox_ctx.mailbox().clone();
605
606 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
607
608 mailbox_ctx
609 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
610 .await;
611
612 common_runtime::spawn_global(async move {
613 let resp = rx.recv().await.unwrap().unwrap();
614 let reply_id = resp.mailbox_message.unwrap().id;
615 mailbox
616 .on_recv(
617 reply_id,
618 Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
619 )
620 .await
621 .unwrap();
622
623 let resp = rx.recv().await.unwrap().unwrap();
625 let reply_id = resp.mailbox_message.unwrap().id;
626 mailbox
627 .on_recv(
628 reply_id,
629 Ok(new_upgrade_region_reply(reply_id, false, true, None)),
630 )
631 .await
632 .unwrap();
633
634 let resp = rx.recv().await.unwrap().unwrap();
636 let reply_id = resp.mailbox_message.unwrap().id;
637 mailbox
638 .on_recv(
639 reply_id,
640 Ok(new_upgrade_region_reply(reply_id, true, true, None)),
641 )
642 .await
643 .unwrap();
644 });
645
646 let procedure_ctx = new_procedure_context();
647 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
648
649 let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
650
651 assert_matches!(update_metadata, UpdateMetadata::Upgrade);
652 }
653
654 #[tokio::test]
655 async fn test_upgrade_region_with_retry_failed() {
656 let mut state = Box::<UpgradeCandidateRegion>::default();
657 state.retry_initial_interval = Duration::from_millis(100);
658 let persistent_context = new_persistent_context();
659 let to_peer_id = persistent_context.to_peer.id;
660
661 let mut env = TestingEnv::new();
662 let mut ctx = env.context_factory().new_context(persistent_context);
663 prepare_table_metadata(&ctx, HashMap::default()).await;
664 let mailbox_ctx = env.mailbox_context();
665 let mailbox = mailbox_ctx.mailbox().clone();
666
667 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
668
669 mailbox_ctx
670 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
671 .await;
672
673 common_runtime::spawn_global(async move {
674 let resp = rx.recv().await.unwrap().unwrap();
675 let reply_id = resp.mailbox_message.unwrap().id;
676 mailbox
677 .on_recv(
678 reply_id,
679 Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
680 )
681 .await
682 .unwrap();
683
684 let resp = rx.recv().await.unwrap().unwrap();
686 let reply_id = resp.mailbox_message.unwrap().id;
687 mailbox
688 .on_recv(
689 reply_id,
690 Ok(new_upgrade_region_reply(reply_id, false, true, None)),
691 )
692 .await
693 .unwrap();
694
695 let resp = rx.recv().await.unwrap().unwrap();
697 let reply_id = resp.mailbox_message.unwrap().id;
698 mailbox
699 .on_recv(
700 reply_id,
701 Ok(new_upgrade_region_reply(reply_id, false, false, None)),
702 )
703 .await
704 .unwrap();
705 });
706 let procedure_ctx = new_procedure_context();
707 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
708
709 let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
710 assert_matches!(update_metadata, UpdateMetadata::Rollback);
711 }
712
713 #[tokio::test]
714 async fn test_upgrade_region_procedure_exceeded_deadline() {
715 let mut state = Box::<UpgradeCandidateRegion>::default();
716 state.retry_initial_interval = Duration::from_millis(100);
717 let persistent_context = new_persistent_context();
718 let to_peer_id = persistent_context.to_peer.id;
719
720 let mut env = TestingEnv::new();
721 let mut ctx = env.context_factory().new_context(persistent_context);
722 prepare_table_metadata(&ctx, HashMap::default()).await;
723 let mailbox_ctx = env.mailbox_context();
724 let mailbox = mailbox_ctx.mailbox().clone();
725 ctx.volatile_ctx.metrics.operations_elapsed =
726 ctx.persistent_ctx.timeout + Duration::from_secs(1);
727
728 let (tx, rx) = tokio::sync::mpsc::channel(1);
729 mailbox_ctx
730 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
731 .await;
732
733 send_mock_reply(mailbox, rx, |id| {
734 Ok(new_upgrade_region_reply(id, false, true, None))
735 });
736 let procedure_ctx = new_procedure_context();
737 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
738 let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
739 assert_matches!(update_metadata, UpdateMetadata::Rollback);
740 }
741}