1use std::any::Any;
16use std::collections::{HashMap, HashSet};
17
18use common_meta::ddl::create_table::executor::CreateTableExecutor;
19use common_meta::ddl::create_table::template::{
20 CreateRequestBuilder, build_template_from_raw_table_info_for_physical_table,
21};
22use common_meta::lock_key::TableLock;
23use common_meta::node_manager::NodeManagerRef;
24use common_meta::peer::PeerAllocContext;
25use common_meta::rpc::router::RegionRoute;
26use common_procedure::{Context as ProcedureContext, Status};
27use common_telemetry::{debug, info};
28use serde::{Deserialize, Deserializer, Serialize};
29use snafu::{OptionExt, ResultExt};
30use store_api::storage::{RegionId, RegionNumber, TableId};
31use table::metadata::TableInfo;
32use table::table_reference::TableReference;
33use tokio::time::Instant;
34
35use crate::error::{self, Result};
36use crate::procedure::repartition::dispatch::Dispatch;
37use crate::procedure::repartition::plan::{
38 AllocationPlanEntry, RegionDescriptor, RepartitionPlanEntry,
39 convert_allocation_plan_to_repartition_plan,
40};
41use crate::procedure::repartition::{Context, State};
42
43#[derive(Debug, Clone, Serialize)]
44pub enum AllocateRegion {
45 Build(BuildPlan),
46 Execute(ExecutePlan),
47}
48
49impl<'de> Deserialize<'de> for AllocateRegion {
50 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
51 where
52 D: Deserializer<'de>,
53 {
54 #[derive(Deserialize)]
55 enum CurrentAllocateRegion {
56 Build(BuildPlan),
57 Execute(ExecutePlan),
58 }
59
60 #[derive(Deserialize)]
61 struct LegacyAllocateRegion {
62 plan_entries: Vec<AllocationPlanEntry>,
63 }
64
65 #[derive(Deserialize)]
66 #[serde(untagged)]
67 enum AllocateRegionRepr {
68 Current(CurrentAllocateRegion),
69 Legacy(LegacyAllocateRegion),
70 }
71
72 match AllocateRegionRepr::deserialize(deserializer)? {
73 AllocateRegionRepr::Current(CurrentAllocateRegion::Build(build_plan)) => {
74 Ok(Self::Build(build_plan))
75 }
76 AllocateRegionRepr::Current(CurrentAllocateRegion::Execute(execute_plan)) => {
77 Ok(Self::Execute(execute_plan))
78 }
79 AllocateRegionRepr::Legacy(legacy) => Ok(Self::Build(BuildPlan {
80 plan_entries: legacy.plan_entries,
81 })),
82 }
83 }
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct BuildPlan {
88 plan_entries: Vec<AllocationPlanEntry>,
89}
90
91impl BuildPlan {
92 async fn next(
93 &mut self,
94 ctx: &mut Context,
95 _procedure_ctx: &ProcedureContext,
96 ) -> Result<(Box<dyn State>, Status)> {
97 let timer = Instant::now();
98 let table_id = ctx.persistent_ctx.table_id;
99 let table_route_value = ctx.get_table_route_value().await?;
100 let mut next_region_number =
101 AllocateRegion::get_next_region_number(table_route_value.max_region_number().unwrap());
102
103 let repartition_plan_entries = AllocateRegion::convert_to_repartition_plans(
105 table_id,
106 &mut next_region_number,
107 &self.plan_entries,
108 table_route_value.region_routes().unwrap(),
109 )?;
110 let plan_count = repartition_plan_entries.len();
111 let to_allocate = AllocateRegion::count_regions_to_allocate(&repartition_plan_entries);
112 info!(
113 "Repartition allocate regions start, table_id: {}, groups: {}, regions_to_allocate: {}",
114 table_id, plan_count, to_allocate
115 );
116
117 if AllocateRegion::count_regions_to_allocate(&repartition_plan_entries) == 0 {
119 ctx.persistent_ctx.plans = repartition_plan_entries;
120 ctx.update_allocate_region_elapsed(timer.elapsed());
121 return Ok((Box::new(Dispatch), Status::executing(true)));
122 }
123
124 ctx.persistent_ctx.plans = repartition_plan_entries;
125 debug!(
126 "Repartition allocate regions build plan completed, table_id: {}, elapsed: {:?}",
127 table_id,
128 timer.elapsed()
129 );
130 Ok((
131 Box::new(AllocateRegion::Execute(ExecutePlan)),
132 Status::executing(true),
133 ))
134 }
135}
136
137#[derive(Debug, Clone, Serialize, Deserialize)]
138pub struct ExecutePlan;
139
140impl ExecutePlan {
141 async fn next(
142 &mut self,
143 ctx: &mut Context,
144 procedure_ctx: &ProcedureContext,
145 ) -> Result<(Box<dyn State>, Status)> {
146 let timer = Instant::now();
147 let table_id = ctx.persistent_ctx.table_id;
148 let allocate_regions = AllocateRegion::collect_allocate_regions(&ctx.persistent_ctx.plans);
149 let region_number_and_partition_exprs =
150 AllocateRegion::prepare_region_allocation_data(&allocate_regions)?;
151 let table_info_value = ctx.get_table_info_value().await?;
152 let new_allocated_region_routes = ctx
153 .region_routes_allocator
154 .allocate(
155 table_id,
156 ®ion_number_and_partition_exprs
157 .iter()
158 .map(|(n, p)| (*n, p.as_str()))
159 .collect::<Vec<_>>(),
160 &PeerAllocContext::default(),
161 )
162 .await
163 .context(error::AllocateRegionRoutesSnafu { table_id })?;
164 let wal_options = ctx
165 .wal_options_allocator
166 .allocate(
167 &allocate_regions
168 .iter()
169 .map(|r| r.region_id.region_number())
170 .collect::<Vec<_>>(),
171 table_info_value.table_info.meta.options.skip_wal,
172 )
173 .await
174 .context(error::AllocateWalOptionsSnafu { table_id })?;
175
176 let new_region_count = new_allocated_region_routes.len();
177 let new_regions_brief: Vec<_> = new_allocated_region_routes
178 .iter()
179 .map(|route| {
180 let region_id = route.region.id;
181 let peer = route.leader_peer.as_ref().map(|p| p.id).unwrap_or_default();
182 format!("region_id: {}, peer: {}", region_id, peer)
183 })
184 .collect();
185 info!(
186 "Allocated regions for repartition, table_id: {}, new_region_count: {}, new_regions: {:?}",
187 table_id, new_region_count, new_regions_brief
188 );
189
190 let _operating_guards = Context::register_operating_regions(
192 &ctx.memory_region_keeper,
193 &new_allocated_region_routes,
194 )?;
195 AllocateRegion::allocate_regions(
197 &ctx.node_manager,
198 &table_info_value.table_info,
199 &new_allocated_region_routes,
200 &wal_options,
201 )
202 .await?;
203
204 let table_lock = TableLock::Write(table_id).into();
206 let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
207 let table_route_value = ctx.get_table_route_value().await?;
210 let region_routes = table_route_value.region_routes().unwrap();
212 let new_region_routes =
213 AllocateRegion::generate_region_routes(region_routes, &new_allocated_region_routes);
214 ctx.update_table_route(&table_route_value, new_region_routes, wal_options)
215 .await?;
216 ctx.invalidate_table_cache().await?;
217
218 ctx.update_allocate_region_elapsed(timer.elapsed());
219 Ok((Box::new(Dispatch), Status::executing(true)))
220 }
221}
222
223#[async_trait::async_trait]
224#[typetag::serde]
225impl State for AllocateRegion {
226 async fn next(
227 &mut self,
228 ctx: &mut Context,
229 procedure_ctx: &ProcedureContext,
230 ) -> Result<(Box<dyn State>, Status)> {
231 match self {
232 AllocateRegion::Build(build_plan) => build_plan.next(ctx, procedure_ctx).await,
233 AllocateRegion::Execute(execute_plan) => execute_plan.next(ctx, procedure_ctx).await,
234 }
235 }
236
237 fn as_any(&self) -> &dyn Any {
238 self
239 }
240}
241
242impl AllocateRegion {
243 pub fn new(plan_entries: Vec<AllocationPlanEntry>) -> Self {
244 AllocateRegion::Build(BuildPlan { plan_entries })
245 }
246
247 fn generate_region_routes(
248 region_routes: &[RegionRoute],
249 new_allocated_region_ids: &[RegionRoute],
250 ) -> Vec<RegionRoute> {
251 let region_ids = region_routes
252 .iter()
253 .map(|r| r.region.id)
254 .collect::<HashSet<_>>();
255 let mut new_region_routes = region_routes.to_vec();
256 for new_allocated_region_id in new_allocated_region_ids {
257 if !region_ids.contains(&new_allocated_region_id.region.id) {
258 new_region_routes.push(new_allocated_region_id.clone());
259 }
260 }
261 new_region_routes
262 }
263
264 fn convert_to_repartition_plans(
272 table_id: TableId,
273 next_region_number: &mut RegionNumber,
274 plan_entries: &[AllocationPlanEntry],
275 current_region_routes: &[RegionRoute],
276 ) -> Result<Vec<RepartitionPlanEntry>> {
277 let region_routes_map = current_region_routes
278 .iter()
279 .map(|route| (route.region.id, route))
280 .collect::<HashMap<_, _>>();
281
282 plan_entries
283 .iter()
284 .map(|plan_entry| {
285 let mut plan = convert_allocation_plan_to_repartition_plan(
286 table_id,
287 next_region_number,
288 plan_entry,
289 );
290 Self::capture_plan_original_target_routes(&mut plan, ®ion_routes_map)?;
291 Ok(plan)
292 })
293 .collect()
294 }
295
296 fn capture_plan_original_target_routes(
297 plan: &mut RepartitionPlanEntry,
298 region_routes_map: &HashMap<RegionId, &RegionRoute>,
299 ) -> Result<()> {
300 let mut original_target_routes = Vec::with_capacity(plan.target_regions.len());
304 for target in &plan.target_regions {
305 if plan.allocated_region_ids.contains(&target.region_id) {
306 continue;
308 }
309 let route = region_routes_map.get(&target.region_id).context(
310 error::RepartitionTargetRegionMissingSnafu {
311 group_id: plan.group_id,
312 region_id: target.region_id,
313 },
314 )?;
315 {
316 original_target_routes.push((*route).clone());
317 }
318 }
319
320 plan.original_target_routes = original_target_routes;
321 Ok(())
322 }
323
324 fn collect_allocate_regions(
326 repartition_plan_entries: &[RepartitionPlanEntry],
327 ) -> Vec<&RegionDescriptor> {
328 repartition_plan_entries
329 .iter()
330 .flat_map(|p| p.allocate_regions())
331 .collect()
332 }
333
334 fn prepare_region_allocation_data(
336 allocate_regions: &[&RegionDescriptor],
337 ) -> Result<Vec<(RegionNumber, String)>> {
338 allocate_regions
339 .iter()
340 .map(|r| {
341 Ok((
342 r.region_id.region_number(),
343 r.partition_expr
344 .as_json_str()
345 .context(error::SerializePartitionExprSnafu)?,
346 ))
347 })
348 .collect()
349 }
350
351 fn count_regions_to_allocate(repartition_plan_entries: &[RepartitionPlanEntry]) -> usize {
353 repartition_plan_entries
354 .iter()
355 .map(|p| p.allocated_region_ids.len())
356 .sum()
357 }
358
359 fn get_next_region_number(max_region_number: RegionNumber) -> RegionNumber {
361 max_region_number + 1
362 }
363
364 async fn allocate_regions(
365 node_manager: &NodeManagerRef,
366 raw_table_info: &TableInfo,
367 region_routes: &[RegionRoute],
368 wal_options: &HashMap<RegionNumber, String>,
369 ) -> Result<()> {
370 let table_ref = TableReference::full(
371 &raw_table_info.catalog_name,
372 &raw_table_info.schema_name,
373 &raw_table_info.name,
374 );
375 let table_id = raw_table_info.ident.table_id;
376 let request = build_template_from_raw_table_info_for_physical_table(raw_table_info)
379 .context(error::BuildCreateRequestSnafu { table_id })?;
380 common_telemetry::debug!(
381 "Allocating regions request, table_id: {}, request: {:?}",
382 table_id,
383 request
384 );
385 let builder = CreateRequestBuilder::new(request, None);
386 let region_count = region_routes.len();
387 let wal_region_count = wal_options.len();
388 info!(
389 "Allocating regions on datanodes, table_id: {}, region_count: {}, wal_regions: {}",
390 table_id, region_count, wal_region_count
391 );
392 let executor = CreateTableExecutor::new(table_ref.into(), false, builder);
393 executor
394 .on_create_regions(node_manager, table_id, region_routes, wal_options)
395 .await
396 .context(error::AllocateRegionsSnafu { table_id })?;
397
398 Ok(())
399 }
400}
401
402#[cfg(test)]
403mod tests {
404 use std::collections::HashMap;
405 use std::sync::Arc;
406
407 use common_meta::key::TableMetadataManagerRef;
408 use common_meta::key::datanode_table::DatanodeTableKey;
409 use common_meta::peer::Peer;
410 use common_meta::rpc::router::{Region, RegionRoute};
411 use common_meta::test_util::MockDatanodeManager;
412 use common_procedure::{ContextProvider, ProcedureId, ProcedureState};
413 use common_procedure_test::MockContextProvider;
414 use store_api::storage::RegionId;
415 use tokio::sync::watch;
416 use uuid::Uuid;
417
418 use super::*;
419 use crate::procedure::repartition::State;
420 use crate::procedure::repartition::test_util::{
421 TestingEnv, current_parent_region_routes, new_parent_context, range_expr,
422 test_region_wal_options,
423 };
424
425 fn create_region_descriptor(
426 table_id: TableId,
427 region_number: u32,
428 col: &str,
429 start: i64,
430 end: i64,
431 ) -> RegionDescriptor {
432 RegionDescriptor {
433 region_id: RegionId::new(table_id, region_number),
434 partition_expr: range_expr(col, start, end),
435 }
436 }
437
438 fn create_allocation_plan_entry(
439 table_id: TableId,
440 source_region_numbers: &[u32],
441 target_ranges: &[(i64, i64)],
442 ) -> AllocationPlanEntry {
443 let source_regions = source_region_numbers
444 .iter()
445 .enumerate()
446 .map(|(i, &n)| {
447 let start = i as i64 * 100;
448 let end = (i + 1) as i64 * 100;
449 create_region_descriptor(table_id, n, "x", start, end)
450 })
451 .collect();
452
453 let target_partition_exprs = target_ranges
454 .iter()
455 .map(|&(start, end)| range_expr("x", start, end))
456 .collect();
457
458 AllocationPlanEntry {
459 group_id: Uuid::new_v4(),
460 source_regions,
461 target_partition_exprs,
462 transition_map: vec![],
463 }
464 }
465
466 fn create_current_region_routes(table_id: TableId, region_numbers: &[u32]) -> Vec<RegionRoute> {
467 region_numbers
468 .iter()
469 .map(|region_number| RegionRoute {
470 region: Region {
471 id: RegionId::new(table_id, *region_number),
472 ..Default::default()
473 },
474 leader_peer: Some(Peer::empty(1)),
475 ..Default::default()
476 })
477 .collect()
478 }
479
480 struct ConcurrentTableRouteUpdateProvider {
481 inner: MockContextProvider,
482 table_metadata_manager: TableMetadataManagerRef,
483 table_id: TableId,
484 concurrent_region_route: RegionRoute,
485 region_wal_options: HashMap<RegionNumber, String>,
486 }
487
488 #[async_trait::async_trait]
489 impl ContextProvider for ConcurrentTableRouteUpdateProvider {
490 async fn procedure_state(
491 &self,
492 procedure_id: ProcedureId,
493 ) -> common_procedure::Result<Option<ProcedureState>> {
494 self.inner.procedure_state(procedure_id).await
495 }
496
497 async fn procedure_state_receiver(
498 &self,
499 procedure_id: ProcedureId,
500 ) -> common_procedure::Result<Option<watch::Receiver<ProcedureState>>> {
501 self.inner.procedure_state_receiver(procedure_id).await
502 }
503
504 async fn try_put_poison(
505 &self,
506 key: &common_procedure::PoisonKey,
507 procedure_id: ProcedureId,
508 ) -> common_procedure::Result<()> {
509 self.inner.try_put_poison(key, procedure_id).await
510 }
511
512 async fn acquire_lock(
513 &self,
514 key: &common_procedure::StringKey,
515 ) -> common_procedure::local::DynamicKeyLockGuard {
516 let current_table_route_value = self
517 .table_metadata_manager
518 .table_route_manager()
519 .table_route_storage()
520 .get_with_raw_bytes(self.table_id)
521 .await
522 .unwrap()
523 .unwrap();
524 let mut region_routes = current_table_route_value.region_routes().unwrap().clone();
525
526 if !region_routes
527 .iter()
528 .any(|route| route.region.id == self.concurrent_region_route.region.id)
529 {
530 region_routes.push(self.concurrent_region_route.clone());
531 let datanode_id = current_table_route_value.region_routes().unwrap()[0]
532 .leader_peer
533 .as_ref()
534 .unwrap()
535 .id;
536 let datanode_table_value = self
537 .table_metadata_manager
538 .datanode_table_manager()
539 .get(&DatanodeTableKey::new(datanode_id, self.table_id))
540 .await
541 .unwrap()
542 .unwrap();
543 let region_options = &datanode_table_value.region_info.region_options;
544
545 self.table_metadata_manager
546 .update_table_route(
547 self.table_id,
548 datanode_table_value.region_info.clone(),
549 ¤t_table_route_value,
550 region_routes,
551 region_options,
552 &self.region_wal_options,
553 )
554 .await
555 .unwrap();
556 }
557
558 self.inner.acquire_lock(key).await
559 }
560 }
561
562 #[test]
563 fn test_convert_to_repartition_plans_no_allocation() {
564 let table_id = 1024;
565 let mut next_region_number = 10;
566
567 let plan_entries = vec![create_allocation_plan_entry(
569 table_id,
570 &[1, 2],
571 &[(0, 50), (50, 200)],
572 )];
573
574 let result = AllocateRegion::convert_to_repartition_plans(
575 table_id,
576 &mut next_region_number,
577 &plan_entries,
578 &create_current_region_routes(table_id, &[1, 2]),
579 )
580 .unwrap();
581
582 assert_eq!(result.len(), 1);
583 assert_eq!(result[0].target_regions.len(), 2);
584 assert!(result[0].allocated_region_ids.is_empty());
585 assert_eq!(next_region_number, 10);
587 }
588
589 #[test]
590 fn test_convert_to_repartition_plans_with_allocation() {
591 let table_id = 1024;
592 let mut next_region_number = 10;
593
594 let plan_entries = vec![create_allocation_plan_entry(
596 table_id,
597 &[1, 2],
598 &[(0, 50), (50, 100), (100, 150), (150, 200)],
599 )];
600
601 let result = AllocateRegion::convert_to_repartition_plans(
602 table_id,
603 &mut next_region_number,
604 &plan_entries,
605 &create_current_region_routes(table_id, &[1, 2]),
606 )
607 .unwrap();
608
609 assert_eq!(result.len(), 1);
610 assert_eq!(result[0].target_regions.len(), 4);
611 assert_eq!(result[0].allocated_region_ids.len(), 2);
612 assert_eq!(
613 result[0].allocated_region_ids[0],
614 RegionId::new(table_id, 10)
615 );
616 assert_eq!(
617 result[0].allocated_region_ids[1],
618 RegionId::new(table_id, 11)
619 );
620 assert_eq!(next_region_number, 12);
622 }
623
624 #[test]
625 fn test_convert_to_repartition_plans_multiple_entries() {
626 let table_id = 1024;
627 let mut next_region_number = 10;
628
629 let plan_entries = vec![
631 create_allocation_plan_entry(table_id, &[1], &[(0, 50), (50, 100)]), create_allocation_plan_entry(table_id, &[2, 3], &[(100, 150), (150, 200)]), create_allocation_plan_entry(table_id, &[4], &[(200, 250), (250, 300), (300, 400)]), ];
635
636 let result = AllocateRegion::convert_to_repartition_plans(
637 table_id,
638 &mut next_region_number,
639 &plan_entries,
640 &create_current_region_routes(table_id, &[1, 2, 3, 4]),
641 )
642 .unwrap();
643
644 assert_eq!(result.len(), 3);
645 assert_eq!(result[0].allocated_region_ids.len(), 1);
646 assert_eq!(result[1].allocated_region_ids.len(), 0);
647 assert_eq!(result[2].allocated_region_ids.len(), 2);
648 assert_eq!(next_region_number, 13);
650 }
651
652 #[test]
653 fn test_count_regions_to_allocate() {
654 let table_id = 1024;
655 let mut next_region_number = 10;
656
657 let plan_entries = vec![
658 create_allocation_plan_entry(table_id, &[1], &[(0, 50), (50, 100)]), create_allocation_plan_entry(table_id, &[2, 3], &[(100, 200)]), create_allocation_plan_entry(table_id, &[4], &[(200, 250), (250, 300)]), ];
662
663 let repartition_plans = AllocateRegion::convert_to_repartition_plans(
664 table_id,
665 &mut next_region_number,
666 &plan_entries,
667 &create_current_region_routes(table_id, &[1, 2, 3, 4]),
668 )
669 .unwrap();
670
671 let count = AllocateRegion::count_regions_to_allocate(&repartition_plans);
672 assert_eq!(count, 2);
673 }
674
675 #[test]
676 fn test_collect_allocate_regions() {
677 let table_id = 1024;
678 let mut next_region_number = 10;
679
680 let plan_entries = vec![
681 create_allocation_plan_entry(table_id, &[1], &[(0, 50), (50, 100)]), create_allocation_plan_entry(table_id, &[2], &[(100, 150), (150, 200)]), ];
684
685 let repartition_plans = AllocateRegion::convert_to_repartition_plans(
686 table_id,
687 &mut next_region_number,
688 &plan_entries,
689 &create_current_region_routes(table_id, &[1, 2]),
690 )
691 .unwrap();
692
693 let allocate_regions = AllocateRegion::collect_allocate_regions(&repartition_plans);
694 assert_eq!(allocate_regions.len(), 2);
695 assert_eq!(allocate_regions[0].region_id, RegionId::new(table_id, 10));
696 assert_eq!(allocate_regions[1].region_id, RegionId::new(table_id, 11));
697 }
698
699 #[test]
700 fn test_prepare_region_allocation_data() {
701 let table_id = 1024;
702 let regions = [
703 create_region_descriptor(table_id, 10, "x", 0, 50),
704 create_region_descriptor(table_id, 11, "x", 50, 100),
705 ];
706 let region_refs: Vec<&RegionDescriptor> = regions.iter().collect();
707
708 let result = AllocateRegion::prepare_region_allocation_data(®ion_refs).unwrap();
709
710 assert_eq!(result.len(), 2);
711 assert_eq!(result[0].0, 10);
712 assert_eq!(result[1].0, 11);
713 assert!(!result[0].1.is_empty());
715 assert!(!result[1].1.is_empty());
716 }
717
718 #[tokio::test]
719 async fn test_execute_plan_uses_latest_table_route_after_lock() {
720 let env = TestingEnv::new();
721 let table_id = 1024;
722 let original_region_routes = create_current_region_routes(table_id, &[1]);
723 env.create_physical_table_metadata_for_repartition(
724 table_id,
725 original_region_routes,
726 test_region_wal_options(&[1]),
727 )
728 .await;
729
730 let node_manager = Arc::new(MockDatanodeManager::new(()));
731 let mut ctx = new_parent_context(&env, node_manager, table_id);
732 ctx.persistent_ctx.plans = vec![RepartitionPlanEntry {
733 group_id: Uuid::new_v4(),
734 source_regions: vec![],
735 target_regions: vec![create_region_descriptor(table_id, 3, "x", 0, 100)],
736 allocated_region_ids: vec![RegionId::new(table_id, 3)],
737 pending_deallocate_region_ids: vec![],
738 transition_map: vec![],
739 original_target_routes: vec![],
740 }];
741 let concurrent_region_route = create_current_region_routes(table_id, &[2])
742 .into_iter()
743 .next()
744 .unwrap();
745 let procedure_ctx = ProcedureContext {
746 procedure_id: ProcedureId::random(),
747 provider: Arc::new(ConcurrentTableRouteUpdateProvider {
748 inner: MockContextProvider::default(),
749 table_metadata_manager: env.table_metadata_manager.clone(),
750 table_id,
751 concurrent_region_route,
752 region_wal_options: test_region_wal_options(&[1, 2]),
753 }),
754 };
755 let mut state = ExecutePlan;
756
757 state.next(&mut ctx, &procedure_ctx).await.unwrap();
758
759 let region_ids = current_parent_region_routes(&ctx)
760 .await
761 .into_iter()
762 .map(|route| route.region.id)
763 .collect::<Vec<_>>();
764 assert_eq!(
765 region_ids,
766 vec![
767 RegionId::new(table_id, 1),
768 RegionId::new(table_id, 2),
769 RegionId::new(table_id, 3),
770 ]
771 );
772 }
773
774 #[test]
775 fn test_allocate_region_state_backward_compatibility() {
776 let serialized = r#"{"repartition_state":"AllocateRegion","plan_entries":[]}"#;
778
779 let state: Box<dyn State> = serde_json::from_str(serialized).unwrap();
781
782 let allocate_region = state
784 .as_any()
785 .downcast_ref::<AllocateRegion>()
786 .expect("expected AllocateRegion state");
787 match allocate_region {
788 AllocateRegion::Build(build_plan) => assert!(build_plan.plan_entries.is_empty()),
789 AllocateRegion::Execute(_) => panic!("expected build plan"),
790 }
791 }
792
793 #[test]
794 fn test_allocate_region_state_round_trip() {
795 let state: Box<dyn State> = Box::new(AllocateRegion::new(vec![]));
797
798 let serialized = serde_json::to_string(&state).unwrap();
800 let deserialized: Box<dyn State> = serde_json::from_str(&serialized).unwrap();
801
802 assert_eq!(
804 serialized,
805 r#"{"repartition_state":"AllocateRegion","Build":{"plan_entries":[]}}"#
806 );
807 let allocate_region = deserialized
808 .as_any()
809 .downcast_ref::<AllocateRegion>()
810 .expect("expected AllocateRegion state");
811 match allocate_region {
812 AllocateRegion::Build(build_plan) => assert!(build_plan.plan_entries.is_empty()),
813 AllocateRegion::Execute(_) => panic!("expected build plan"),
814 }
815 }
816
817 #[test]
818 fn test_allocate_region_execute_state_round_trip() {
819 let state: Box<dyn State> = Box::new(AllocateRegion::Execute(ExecutePlan));
821
822 let serialized = serde_json::to_string(&state).unwrap();
824 let deserialized: Box<dyn State> = serde_json::from_str(&serialized).unwrap();
825
826 assert_eq!(
828 serialized,
829 r#"{"repartition_state":"AllocateRegion","Execute":null}"#
830 );
831 let allocate_region = deserialized
832 .as_any()
833 .downcast_ref::<AllocateRegion>()
834 .expect("expected AllocateRegion state");
835 match allocate_region {
836 AllocateRegion::Execute(_) => {}
837 AllocateRegion::Build(_) => panic!("expected execute plan"),
838 }
839 }
840}