1use std::any::Any;
16use std::collections::{HashMap, HashSet};
17
18use common_meta::ddl::create_table::executor::CreateTableExecutor;
19use common_meta::ddl::create_table::template::{
20 CreateRequestBuilder, build_template_from_raw_table_info_for_physical_table,
21};
22use common_meta::lock_key::TableLock;
23use common_meta::node_manager::NodeManagerRef;
24use common_meta::peer::PeerAllocContext;
25use common_meta::rpc::router::RegionRoute;
26use common_meta::wal_provider::{
27 RegionWalOptions, acquire_remote_wal_read_locks, refresh_initial_pruned_entry_ids,
28};
29use common_procedure::{Context as ProcedureContext, Status};
30use common_telemetry::{debug, info};
31use serde::{Deserialize, Deserializer, Serialize};
32use snafu::{OptionExt, ResultExt};
33use store_api::region_request::RegionRequirements;
34use store_api::storage::{RegionId, RegionNumber, TableId};
35use table::metadata::TableInfo;
36use table::table_reference::TableReference;
37use tokio::time::Instant;
38
39use crate::error::{self, Result};
40use crate::procedure::repartition::dispatch::Dispatch;
41use crate::procedure::repartition::plan::{
42 AllocationPlanEntry, RepartitionPlanEntry, TargetRegionDescriptor,
43 convert_allocation_plan_to_repartition_plan,
44};
45use crate::procedure::repartition::{Context, State};
46
47#[derive(Debug, Clone, Serialize)]
48pub enum AllocateRegion {
49 Build(BuildPlan),
50 Execute(ExecutePlan),
51}
52
53impl<'de> Deserialize<'de> for AllocateRegion {
54 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
55 where
56 D: Deserializer<'de>,
57 {
58 #[derive(Deserialize)]
59 enum CurrentAllocateRegion {
60 Build(BuildPlan),
61 Execute(ExecutePlan),
62 }
63
64 #[derive(Deserialize)]
65 struct LegacyAllocateRegion {
66 plan_entries: Vec<AllocationPlanEntry>,
67 }
68
69 #[derive(Deserialize)]
70 #[serde(untagged)]
71 enum AllocateRegionRepr {
72 Current(CurrentAllocateRegion),
73 Legacy(LegacyAllocateRegion),
74 }
75
76 match AllocateRegionRepr::deserialize(deserializer)? {
77 AllocateRegionRepr::Current(CurrentAllocateRegion::Build(build_plan)) => {
78 Ok(Self::Build(build_plan))
79 }
80 AllocateRegionRepr::Current(CurrentAllocateRegion::Execute(execute_plan)) => {
81 Ok(Self::Execute(execute_plan))
82 }
83 AllocateRegionRepr::Legacy(legacy) => Ok(Self::Build(BuildPlan {
84 plan_entries: legacy.plan_entries,
85 })),
86 }
87 }
88}
89
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct BuildPlan {
92 plan_entries: Vec<AllocationPlanEntry>,
93}
94
95impl BuildPlan {
96 async fn next(
97 &mut self,
98 ctx: &mut Context,
99 _procedure_ctx: &ProcedureContext,
100 ) -> Result<(Box<dyn State>, Status)> {
101 let timer = Instant::now();
102 let table_id = ctx.persistent_ctx.table_id;
103 let table_route_value = ctx.get_table_route_value().await?;
104 let mut next_region_number =
105 AllocateRegion::get_next_region_number(table_route_value.max_region_number().unwrap());
106
107 let repartition_plan_entries = AllocateRegion::convert_to_repartition_plans(
109 table_id,
110 &mut next_region_number,
111 &self.plan_entries,
112 table_route_value.region_routes().unwrap(),
113 )?;
114 let plan_count = repartition_plan_entries.len();
115 let to_allocate = AllocateRegion::count_regions_to_allocate(&repartition_plan_entries);
116 info!(
117 "Repartition allocate regions start, table_id: {}, groups: {}, regions_to_allocate: {}",
118 table_id, plan_count, to_allocate
119 );
120
121 if AllocateRegion::count_regions_to_allocate(&repartition_plan_entries) == 0 {
123 ctx.persistent_ctx.plans = repartition_plan_entries;
124 ctx.update_allocate_region_elapsed(timer.elapsed());
125 return Ok((Box::new(Dispatch), Status::executing(true)));
126 }
127
128 ctx.persistent_ctx.plans = repartition_plan_entries;
129 debug!(
130 "Repartition allocate regions build plan completed, table_id: {}, elapsed: {:?}",
131 table_id,
132 timer.elapsed()
133 );
134 Ok((
135 Box::new(AllocateRegion::Execute(ExecutePlan)),
136 Status::executing(true),
137 ))
138 }
139}
140
141#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct ExecutePlan;
143
144impl ExecutePlan {
145 async fn next(
146 &mut self,
147 ctx: &mut Context,
148 procedure_ctx: &ProcedureContext,
149 ) -> Result<(Box<dyn State>, Status)> {
150 let timer = Instant::now();
151 let table_id = ctx.persistent_ctx.table_id;
152 let allocate_regions = AllocateRegion::collect_allocate_regions(&ctx.persistent_ctx.plans);
153 let region_number_and_partition_exprs =
154 AllocateRegion::prepare_region_allocation_data(&allocate_regions)?;
155 let table_info_value = ctx.get_table_info_value().await?;
156 let new_allocated_region_routes = ctx
157 .region_routes_allocator
158 .allocate(
159 table_id,
160 ®ion_number_and_partition_exprs
161 .iter()
162 .map(|(n, p)| (*n, p.as_str()))
163 .collect::<Vec<_>>(),
164 &PeerAllocContext::default(),
165 )
166 .await
167 .context(error::AllocateRegionRoutesSnafu { table_id })?;
168 let mut wal_options = ctx
169 .wal_options_allocator
170 .allocate(
171 &allocate_regions
172 .iter()
173 .map(|r| r.region_id.region_number())
174 .collect::<Vec<_>>(),
175 table_info_value.table_info.meta.options.skip_wal,
176 )
177 .await
178 .context(error::AllocateWalOptionsSnafu { table_id })?;
179 let _remote_wal_lock_guards =
180 acquire_remote_wal_read_locks(procedure_ctx, &wal_options).await;
181 refresh_initial_pruned_entry_ids(&ctx.table_metadata_manager, &mut wal_options)
182 .await
183 .context(error::AllocateWalOptionsSnafu { table_id })?;
184
185 let new_region_count = new_allocated_region_routes.len();
186 let new_regions_brief: Vec<_> = new_allocated_region_routes
187 .iter()
188 .map(|route| {
189 let region_id = route.region.id;
190 let peer = route.leader_peer.as_ref().map(|p| p.id).unwrap_or_default();
191 format!("region_id: {}, peer: {}", region_id, peer)
192 })
193 .collect();
194 info!(
195 "Allocated regions for repartition, table_id: {}, new_region_count: {}, new_regions: {:?}",
196 table_id, new_region_count, new_regions_brief
197 );
198
199 let _operating_guards = Context::register_operating_regions(
201 &ctx.memory_region_keeper,
202 &new_allocated_region_routes,
203 )?;
204 AllocateRegion::allocate_regions(
206 &ctx.node_manager,
207 &table_info_value.table_info,
208 &new_allocated_region_routes,
209 &wal_options,
210 )
211 .await?;
212
213 let table_lock = TableLock::Write(table_id).into();
215 let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
216 let table_route_value = ctx.get_table_route_value().await?;
219 let region_routes = table_route_value.region_routes().unwrap();
221 let new_region_routes =
222 AllocateRegion::generate_region_routes(region_routes, &new_allocated_region_routes);
223 ctx.update_table_route(&table_route_value, new_region_routes, wal_options)
224 .await?;
225 ctx.invalidate_table_cache().await?;
226
227 ctx.update_allocate_region_elapsed(timer.elapsed());
228 Ok((Box::new(Dispatch), Status::executing(true)))
229 }
230}
231
232#[async_trait::async_trait]
233#[typetag::serde]
234impl State for AllocateRegion {
235 async fn next(
236 &mut self,
237 ctx: &mut Context,
238 procedure_ctx: &ProcedureContext,
239 ) -> Result<(Box<dyn State>, Status)> {
240 match self {
241 AllocateRegion::Build(build_plan) => build_plan.next(ctx, procedure_ctx).await,
242 AllocateRegion::Execute(execute_plan) => execute_plan.next(ctx, procedure_ctx).await,
243 }
244 }
245
246 fn as_any(&self) -> &dyn Any {
247 self
248 }
249}
250
251impl AllocateRegion {
252 pub fn new(plan_entries: Vec<AllocationPlanEntry>) -> Self {
253 AllocateRegion::Build(BuildPlan { plan_entries })
254 }
255
256 fn generate_region_routes(
257 region_routes: &[RegionRoute],
258 new_allocated_region_ids: &[RegionRoute],
259 ) -> Vec<RegionRoute> {
260 let region_ids = region_routes
261 .iter()
262 .map(|r| r.region.id)
263 .collect::<HashSet<_>>();
264 let mut new_region_routes = region_routes.to_vec();
265 for new_allocated_region_id in new_allocated_region_ids {
266 if !region_ids.contains(&new_allocated_region_id.region.id) {
267 new_region_routes.push(new_allocated_region_id.clone());
268 }
269 }
270 new_region_routes
271 }
272
273 fn convert_to_repartition_plans(
281 table_id: TableId,
282 next_region_number: &mut RegionNumber,
283 plan_entries: &[AllocationPlanEntry],
284 current_region_routes: &[RegionRoute],
285 ) -> Result<Vec<RepartitionPlanEntry>> {
286 let region_routes_map = current_region_routes
287 .iter()
288 .map(|route| (route.region.id, route))
289 .collect::<HashMap<_, _>>();
290
291 plan_entries
292 .iter()
293 .map(|plan_entry| {
294 let mut plan = convert_allocation_plan_to_repartition_plan(
295 table_id,
296 next_region_number,
297 plan_entry,
298 );
299 Self::capture_plan_original_target_routes(&mut plan, ®ion_routes_map)?;
300 Ok(plan)
301 })
302 .collect()
303 }
304
305 fn capture_plan_original_target_routes(
306 plan: &mut RepartitionPlanEntry,
307 region_routes_map: &HashMap<RegionId, &RegionRoute>,
308 ) -> Result<()> {
309 let mut original_target_routes = Vec::with_capacity(plan.target_regions.len());
313 for target in &plan.target_regions {
314 if plan.allocated_region_ids.contains(&target.region_id) {
315 continue;
317 }
318 let route = region_routes_map.get(&target.region_id).context(
319 error::RepartitionTargetRegionMissingSnafu {
320 group_id: plan.group_id,
321 region_id: target.region_id,
322 },
323 )?;
324 {
325 original_target_routes.push((*route).clone());
326 }
327 }
328
329 plan.original_target_routes = original_target_routes;
330 Ok(())
331 }
332
333 fn collect_allocate_regions(
335 repartition_plan_entries: &[RepartitionPlanEntry],
336 ) -> Vec<&TargetRegionDescriptor> {
337 repartition_plan_entries
338 .iter()
339 .flat_map(|p| p.allocate_regions())
340 .collect()
341 }
342
343 fn prepare_region_allocation_data(
345 allocate_regions: &[&TargetRegionDescriptor],
346 ) -> Result<Vec<(RegionNumber, String)>> {
347 allocate_regions
348 .iter()
349 .map(|r| {
350 Ok((
351 r.region_id.region_number(),
352 r.partition_expr
353 .as_json_str()
354 .context(error::SerializePartitionExprSnafu)?,
355 ))
356 })
357 .collect()
358 }
359
360 fn count_regions_to_allocate(repartition_plan_entries: &[RepartitionPlanEntry]) -> usize {
362 repartition_plan_entries
363 .iter()
364 .map(|p| p.allocated_region_ids.len())
365 .sum()
366 }
367
368 fn get_next_region_number(max_region_number: RegionNumber) -> RegionNumber {
370 max_region_number + 1
371 }
372
373 async fn allocate_regions(
374 node_manager: &NodeManagerRef,
375 raw_table_info: &TableInfo,
376 region_routes: &[RegionRoute],
377 wal_options: &RegionWalOptions,
378 ) -> Result<()> {
379 let table_ref = TableReference::full(
380 &raw_table_info.catalog_name,
381 &raw_table_info.schema_name,
382 &raw_table_info.name,
383 );
384 let table_id = raw_table_info.ident.table_id;
385 let request = build_template_from_raw_table_info_for_physical_table(raw_table_info)
388 .context(error::BuildCreateRequestSnafu { table_id })?;
389 common_telemetry::debug!(
390 "Allocating regions request, table_id: {}, request: {:?}",
391 table_id,
392 request
393 );
394 let builder = CreateRequestBuilder::new(request, None)
395 .with_requirements(RegionRequirements::object_storage());
396 let region_count = region_routes.len();
397 let wal_region_count = wal_options.len();
398 info!(
399 "Allocating regions on datanodes, table_id: {}, region_count: {}, wal_regions: {}",
400 table_id, region_count, wal_region_count
401 );
402 let executor = CreateTableExecutor::new(table_ref.into(), false, builder);
403 executor
404 .on_create_regions(node_manager, table_id, region_routes, wal_options)
405 .await
406 .context(error::AllocateRegionsSnafu { table_id })?;
407
408 Ok(())
409 }
410}
411
412#[cfg(test)]
413mod tests {
414 use std::sync::Arc;
415
416 use api::v1::region::region_request::Body;
417 use common_meta::ddl::test_util::datanode_handler::DatanodeWatcher;
418 use common_meta::key::TableMetadataManagerRef;
419 use common_meta::key::datanode_table::DatanodeTableKey;
420 use common_meta::peer::Peer;
421 use common_meta::rpc::router::{Region, RegionRoute};
422 use common_meta::test_util::MockDatanodeManager;
423 use common_procedure::{ContextProvider, ProcedureId, ProcedureState};
424 use common_procedure_test::MockContextProvider;
425 use store_api::storage::RegionId;
426 use tokio::sync::{mpsc, watch};
427 use uuid::Uuid;
428
429 use super::*;
430 use crate::procedure::repartition::State;
431 use crate::procedure::repartition::plan::SourceRegionDescriptor;
432 use crate::procedure::repartition::test_util::{
433 TestingEnv, current_parent_region_routes, new_parent_context, range_expr,
434 test_region_wal_options,
435 };
436
437 fn create_region_descriptor(
438 table_id: TableId,
439 region_number: u32,
440 col: &str,
441 start: i64,
442 end: i64,
443 ) -> SourceRegionDescriptor {
444 SourceRegionDescriptor::partitioned(
445 RegionId::new(table_id, region_number),
446 range_expr(col, start, end),
447 )
448 }
449
450 fn create_target_region_descriptor(
451 table_id: TableId,
452 region_number: u32,
453 col: &str,
454 start: i64,
455 end: i64,
456 ) -> TargetRegionDescriptor {
457 TargetRegionDescriptor {
458 region_id: RegionId::new(table_id, region_number),
459 partition_expr: range_expr(col, start, end),
460 }
461 }
462
463 fn create_allocation_plan_entry(
464 table_id: TableId,
465 source_region_numbers: &[u32],
466 target_ranges: &[(i64, i64)],
467 ) -> AllocationPlanEntry {
468 let source_regions = source_region_numbers
469 .iter()
470 .enumerate()
471 .map(|(i, &n)| {
472 let start = i as i64 * 100;
473 let end = (i + 1) as i64 * 100;
474 create_region_descriptor(table_id, n, "x", start, end)
475 })
476 .collect();
477
478 let target_partition_exprs = target_ranges
479 .iter()
480 .map(|&(start, end)| range_expr("x", start, end))
481 .collect();
482
483 AllocationPlanEntry {
484 group_id: Uuid::new_v4(),
485 source_regions,
486 target_partition_exprs,
487 transition_map: vec![],
488 }
489 }
490
491 fn create_current_region_routes(table_id: TableId, region_numbers: &[u32]) -> Vec<RegionRoute> {
492 region_numbers
493 .iter()
494 .map(|region_number| RegionRoute {
495 region: Region {
496 id: RegionId::new(table_id, *region_number),
497 ..Default::default()
498 },
499 leader_peer: Some(Peer::empty(1)),
500 ..Default::default()
501 })
502 .collect()
503 }
504
505 struct ConcurrentTableRouteUpdateProvider {
506 inner: MockContextProvider,
507 table_metadata_manager: TableMetadataManagerRef,
508 table_id: TableId,
509 concurrent_region_route: RegionRoute,
510 region_wal_options: RegionWalOptions,
511 }
512
513 #[async_trait::async_trait]
514 impl ContextProvider for ConcurrentTableRouteUpdateProvider {
515 async fn procedure_state(
516 &self,
517 procedure_id: ProcedureId,
518 ) -> common_procedure::Result<Option<ProcedureState>> {
519 self.inner.procedure_state(procedure_id).await
520 }
521
522 async fn procedure_state_receiver(
523 &self,
524 procedure_id: ProcedureId,
525 ) -> common_procedure::Result<Option<watch::Receiver<ProcedureState>>> {
526 self.inner.procedure_state_receiver(procedure_id).await
527 }
528
529 async fn try_put_poison(
530 &self,
531 key: &common_procedure::PoisonKey,
532 procedure_id: ProcedureId,
533 ) -> common_procedure::Result<()> {
534 self.inner.try_put_poison(key, procedure_id).await
535 }
536
537 async fn acquire_lock(
538 &self,
539 key: &common_procedure::StringKey,
540 ) -> common_procedure::local::DynamicKeyLockGuard {
541 let current_table_route_value = self
542 .table_metadata_manager
543 .table_route_manager()
544 .table_route_storage()
545 .get_with_raw_bytes(self.table_id)
546 .await
547 .unwrap()
548 .unwrap();
549 let mut region_routes = current_table_route_value.region_routes().unwrap().clone();
550
551 if !region_routes
552 .iter()
553 .any(|route| route.region.id == self.concurrent_region_route.region.id)
554 {
555 region_routes.push(self.concurrent_region_route.clone());
556 let datanode_id = current_table_route_value.region_routes().unwrap()[0]
557 .leader_peer
558 .as_ref()
559 .unwrap()
560 .id;
561 let datanode_table_value = self
562 .table_metadata_manager
563 .datanode_table_manager()
564 .get(&DatanodeTableKey::new(datanode_id, self.table_id))
565 .await
566 .unwrap()
567 .unwrap();
568 let region_options = &datanode_table_value.region_info.region_options;
569
570 self.table_metadata_manager
571 .update_table_route(
572 self.table_id,
573 datanode_table_value.region_info.clone(),
574 ¤t_table_route_value,
575 region_routes,
576 region_options,
577 &self.region_wal_options,
578 )
579 .await
580 .unwrap();
581 }
582
583 self.inner.acquire_lock(key).await
584 }
585 }
586
587 #[test]
588 fn test_convert_to_repartition_plans_no_allocation() {
589 let table_id = 1024;
590 let mut next_region_number = 10;
591
592 let plan_entries = vec![create_allocation_plan_entry(
594 table_id,
595 &[1, 2],
596 &[(0, 50), (50, 200)],
597 )];
598
599 let result = AllocateRegion::convert_to_repartition_plans(
600 table_id,
601 &mut next_region_number,
602 &plan_entries,
603 &create_current_region_routes(table_id, &[1, 2]),
604 )
605 .unwrap();
606
607 assert_eq!(result.len(), 1);
608 assert_eq!(result[0].target_regions.len(), 2);
609 assert!(result[0].allocated_region_ids.is_empty());
610 assert_eq!(next_region_number, 10);
612 }
613
614 #[test]
615 fn test_convert_to_repartition_plans_with_allocation() {
616 let table_id = 1024;
617 let mut next_region_number = 10;
618
619 let plan_entries = vec![create_allocation_plan_entry(
621 table_id,
622 &[1, 2],
623 &[(0, 50), (50, 100), (100, 150), (150, 200)],
624 )];
625
626 let result = AllocateRegion::convert_to_repartition_plans(
627 table_id,
628 &mut next_region_number,
629 &plan_entries,
630 &create_current_region_routes(table_id, &[1, 2]),
631 )
632 .unwrap();
633
634 assert_eq!(result.len(), 1);
635 assert_eq!(result[0].target_regions.len(), 4);
636 assert_eq!(result[0].allocated_region_ids.len(), 2);
637 assert_eq!(
638 result[0].allocated_region_ids[0],
639 RegionId::new(table_id, 10)
640 );
641 assert_eq!(
642 result[0].allocated_region_ids[1],
643 RegionId::new(table_id, 11)
644 );
645 assert_eq!(next_region_number, 12);
647 }
648
649 #[test]
650 fn test_convert_to_repartition_plans_multiple_entries() {
651 let table_id = 1024;
652 let mut next_region_number = 10;
653
654 let plan_entries = vec![
656 create_allocation_plan_entry(table_id, &[1], &[(0, 50), (50, 100)]), create_allocation_plan_entry(table_id, &[2, 3], &[(100, 150), (150, 200)]), create_allocation_plan_entry(table_id, &[4], &[(200, 250), (250, 300), (300, 400)]), ];
660
661 let result = AllocateRegion::convert_to_repartition_plans(
662 table_id,
663 &mut next_region_number,
664 &plan_entries,
665 &create_current_region_routes(table_id, &[1, 2, 3, 4]),
666 )
667 .unwrap();
668
669 assert_eq!(result.len(), 3);
670 assert_eq!(result[0].allocated_region_ids.len(), 1);
671 assert_eq!(result[1].allocated_region_ids.len(), 0);
672 assert_eq!(result[2].allocated_region_ids.len(), 2);
673 assert_eq!(next_region_number, 13);
675 }
676
677 #[test]
678 fn test_count_regions_to_allocate() {
679 let table_id = 1024;
680 let mut next_region_number = 10;
681
682 let plan_entries = vec![
683 create_allocation_plan_entry(table_id, &[1], &[(0, 50), (50, 100)]), create_allocation_plan_entry(table_id, &[2, 3], &[(100, 200)]), create_allocation_plan_entry(table_id, &[4], &[(200, 250), (250, 300)]), ];
687
688 let repartition_plans = AllocateRegion::convert_to_repartition_plans(
689 table_id,
690 &mut next_region_number,
691 &plan_entries,
692 &create_current_region_routes(table_id, &[1, 2, 3, 4]),
693 )
694 .unwrap();
695
696 let count = AllocateRegion::count_regions_to_allocate(&repartition_plans);
697 assert_eq!(count, 2);
698 }
699
700 #[test]
701 fn test_collect_allocate_regions() {
702 let table_id = 1024;
703 let mut next_region_number = 10;
704
705 let plan_entries = vec![
706 create_allocation_plan_entry(table_id, &[1], &[(0, 50), (50, 100)]), create_allocation_plan_entry(table_id, &[2], &[(100, 150), (150, 200)]), ];
709
710 let repartition_plans = AllocateRegion::convert_to_repartition_plans(
711 table_id,
712 &mut next_region_number,
713 &plan_entries,
714 &create_current_region_routes(table_id, &[1, 2]),
715 )
716 .unwrap();
717
718 let allocate_regions = AllocateRegion::collect_allocate_regions(&repartition_plans);
719 assert_eq!(allocate_regions.len(), 2);
720 assert_eq!(allocate_regions[0].region_id, RegionId::new(table_id, 10));
721 assert_eq!(allocate_regions[1].region_id, RegionId::new(table_id, 11));
722 }
723
724 #[test]
725 fn test_prepare_region_allocation_data() {
726 let table_id = 1024;
727 let regions = [
728 create_target_region_descriptor(table_id, 10, "x", 0, 50),
729 create_target_region_descriptor(table_id, 11, "x", 50, 100),
730 ];
731 let region_refs: Vec<&TargetRegionDescriptor> = regions.iter().collect();
732
733 let result = AllocateRegion::prepare_region_allocation_data(®ion_refs).unwrap();
734
735 assert_eq!(result.len(), 2);
736 assert_eq!(result[0].0, 10);
737 assert_eq!(result[1].0, 11);
738 assert!(!result[0].1.is_empty());
740 assert!(!result[1].1.is_empty());
741 }
742
743 #[tokio::test]
744 async fn test_execute_plan_uses_latest_table_route_after_lock() {
745 let env = TestingEnv::new();
746 let table_id = 1024;
747 let original_region_routes = create_current_region_routes(table_id, &[1]);
748 env.create_physical_table_metadata_for_repartition(
749 table_id,
750 original_region_routes,
751 test_region_wal_options(&[1]),
752 )
753 .await;
754
755 let (sender, mut receiver) = mpsc::channel(1);
756 let node_manager = Arc::new(MockDatanodeManager::new(DatanodeWatcher::new(sender)));
757 let mut ctx = new_parent_context(&env, node_manager, table_id);
758 ctx.persistent_ctx.plans = vec![RepartitionPlanEntry {
759 group_id: Uuid::new_v4(),
760 source_regions: vec![],
761 target_regions: vec![create_target_region_descriptor(table_id, 3, "x", 0, 100)],
762 allocated_region_ids: vec![RegionId::new(table_id, 3)],
763 pending_deallocate_region_ids: vec![],
764 transition_map: vec![],
765 original_target_routes: vec![],
766 }];
767 let concurrent_region_route = create_current_region_routes(table_id, &[2])
768 .into_iter()
769 .next()
770 .unwrap();
771 let procedure_ctx = ProcedureContext {
772 procedure_id: ProcedureId::random(),
773 provider: Arc::new(ConcurrentTableRouteUpdateProvider {
774 inner: MockContextProvider::default(),
775 table_metadata_manager: env.table_metadata_manager.clone(),
776 table_id,
777 concurrent_region_route,
778 region_wal_options: test_region_wal_options(&[1, 2]),
779 }),
780 };
781 let mut state = ExecutePlan;
782
783 state.next(&mut ctx, &procedure_ctx).await.unwrap();
784
785 let (_, request) = receiver.recv().await.unwrap();
786 let Some(Body::Create(create)) = request.body else {
787 unreachable!()
788 };
789 assert!(create.requirements.unwrap().object_storage);
790
791 let region_ids = current_parent_region_routes(&ctx)
792 .await
793 .into_iter()
794 .map(|route| route.region.id)
795 .collect::<Vec<_>>();
796 assert_eq!(
797 region_ids,
798 vec![
799 RegionId::new(table_id, 1),
800 RegionId::new(table_id, 2),
801 RegionId::new(table_id, 3),
802 ]
803 );
804 }
805
806 #[test]
807 fn test_allocate_region_state_backward_compatibility() {
808 let serialized = r#"{"repartition_state":"AllocateRegion","plan_entries":[]}"#;
810
811 let state: Box<dyn State> = serde_json::from_str(serialized).unwrap();
813
814 let allocate_region = state
816 .as_any()
817 .downcast_ref::<AllocateRegion>()
818 .expect("expected AllocateRegion state");
819 match allocate_region {
820 AllocateRegion::Build(build_plan) => assert!(build_plan.plan_entries.is_empty()),
821 AllocateRegion::Execute(_) => panic!("expected build plan"),
822 }
823 }
824
825 #[test]
826 fn test_allocate_region_state_round_trip() {
827 let state: Box<dyn State> = Box::new(AllocateRegion::new(vec![]));
829
830 let serialized = serde_json::to_string(&state).unwrap();
832 let deserialized: Box<dyn State> = serde_json::from_str(&serialized).unwrap();
833
834 assert_eq!(
836 serialized,
837 r#"{"repartition_state":"AllocateRegion","Build":{"plan_entries":[]}}"#
838 );
839 let allocate_region = deserialized
840 .as_any()
841 .downcast_ref::<AllocateRegion>()
842 .expect("expected AllocateRegion state");
843 match allocate_region {
844 AllocateRegion::Build(build_plan) => assert!(build_plan.plan_entries.is_empty()),
845 AllocateRegion::Execute(_) => panic!("expected build plan"),
846 }
847 }
848
849 #[test]
850 fn test_allocate_region_execute_state_round_trip() {
851 let state: Box<dyn State> = Box::new(AllocateRegion::Execute(ExecutePlan));
853
854 let serialized = serde_json::to_string(&state).unwrap();
856 let deserialized: Box<dyn State> = serde_json::from_str(&serialized).unwrap();
857
858 assert_eq!(
860 serialized,
861 r#"{"repartition_state":"AllocateRegion","Execute":null}"#
862 );
863 let allocate_region = deserialized
864 .as_any()
865 .downcast_ref::<AllocateRegion>()
866 .expect("expected AllocateRegion state");
867 match allocate_region {
868 AllocateRegion::Execute(_) => {}
869 AllocateRegion::Build(_) => panic!("expected execute plan"),
870 }
871 }
872}