1use std::any::Any;
16use std::collections::{HashMap, HashSet};
17
18use common_meta::ddl::create_table::executor::CreateTableExecutor;
19use common_meta::ddl::create_table::template::{
20 CreateRequestBuilder, build_template_from_raw_table_info_for_physical_table,
21};
22use common_meta::lock_key::TableLock;
23use common_meta::node_manager::NodeManagerRef;
24use common_meta::rpc::router::RegionRoute;
25use common_procedure::{Context as ProcedureContext, Status};
26use common_telemetry::{debug, info};
27use serde::{Deserialize, Deserializer, Serialize};
28use snafu::{OptionExt, ResultExt};
29use store_api::storage::{RegionId, RegionNumber, TableId};
30use table::metadata::TableInfo;
31use table::table_reference::TableReference;
32use tokio::time::Instant;
33
34use crate::error::{self, Result};
35use crate::procedure::repartition::dispatch::Dispatch;
36use crate::procedure::repartition::plan::{
37 AllocationPlanEntry, RegionDescriptor, RepartitionPlanEntry,
38 convert_allocation_plan_to_repartition_plan,
39};
40use crate::procedure::repartition::{Context, State};
41
42#[derive(Debug, Clone, Serialize)]
43pub enum AllocateRegion {
44 Build(BuildPlan),
45 Execute(ExecutePlan),
46}
47
48impl<'de> Deserialize<'de> for AllocateRegion {
49 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
50 where
51 D: Deserializer<'de>,
52 {
53 #[derive(Deserialize)]
54 enum CurrentAllocateRegion {
55 Build(BuildPlan),
56 Execute(ExecutePlan),
57 }
58
59 #[derive(Deserialize)]
60 struct LegacyAllocateRegion {
61 plan_entries: Vec<AllocationPlanEntry>,
62 }
63
64 #[derive(Deserialize)]
65 #[serde(untagged)]
66 enum AllocateRegionRepr {
67 Current(CurrentAllocateRegion),
68 Legacy(LegacyAllocateRegion),
69 }
70
71 match AllocateRegionRepr::deserialize(deserializer)? {
72 AllocateRegionRepr::Current(CurrentAllocateRegion::Build(build_plan)) => {
73 Ok(Self::Build(build_plan))
74 }
75 AllocateRegionRepr::Current(CurrentAllocateRegion::Execute(execute_plan)) => {
76 Ok(Self::Execute(execute_plan))
77 }
78 AllocateRegionRepr::Legacy(legacy) => Ok(Self::Build(BuildPlan {
79 plan_entries: legacy.plan_entries,
80 })),
81 }
82 }
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct BuildPlan {
87 plan_entries: Vec<AllocationPlanEntry>,
88}
89
90impl BuildPlan {
91 async fn next(
92 &mut self,
93 ctx: &mut Context,
94 _procedure_ctx: &ProcedureContext,
95 ) -> Result<(Box<dyn State>, Status)> {
96 let timer = Instant::now();
97 let table_id = ctx.persistent_ctx.table_id;
98 let table_route_value = ctx.get_table_route_value().await?;
99 let mut next_region_number =
100 AllocateRegion::get_next_region_number(table_route_value.max_region_number().unwrap());
101
102 let repartition_plan_entries = AllocateRegion::convert_to_repartition_plans(
104 table_id,
105 &mut next_region_number,
106 &self.plan_entries,
107 table_route_value.region_routes().unwrap(),
108 )?;
109 let plan_count = repartition_plan_entries.len();
110 let to_allocate = AllocateRegion::count_regions_to_allocate(&repartition_plan_entries);
111 info!(
112 "Repartition allocate regions start, table_id: {}, groups: {}, regions_to_allocate: {}",
113 table_id, plan_count, to_allocate
114 );
115
116 if AllocateRegion::count_regions_to_allocate(&repartition_plan_entries) == 0 {
118 ctx.persistent_ctx.plans = repartition_plan_entries;
119 ctx.update_allocate_region_elapsed(timer.elapsed());
120 return Ok((Box::new(Dispatch), Status::executing(true)));
121 }
122
123 ctx.persistent_ctx.plans = repartition_plan_entries;
124 debug!(
125 "Repartition allocate regions build plan completed, table_id: {}, elapsed: {:?}",
126 table_id,
127 timer.elapsed()
128 );
129 Ok((
130 Box::new(AllocateRegion::Execute(ExecutePlan)),
131 Status::executing(true),
132 ))
133 }
134}
135
136#[derive(Debug, Clone, Serialize, Deserialize)]
137pub struct ExecutePlan;
138
139impl ExecutePlan {
140 async fn next(
141 &mut self,
142 ctx: &mut Context,
143 procedure_ctx: &ProcedureContext,
144 ) -> Result<(Box<dyn State>, Status)> {
145 let timer = Instant::now();
146 let table_id = ctx.persistent_ctx.table_id;
147 let allocate_regions = AllocateRegion::collect_allocate_regions(&ctx.persistent_ctx.plans);
148 let region_number_and_partition_exprs =
149 AllocateRegion::prepare_region_allocation_data(&allocate_regions)?;
150 let table_info_value = ctx.get_table_info_value().await?;
151 let table_route_value = ctx.get_table_route_value().await?;
152 let region_routes = table_route_value.region_routes().unwrap();
154 let new_allocated_region_routes = ctx
155 .region_routes_allocator
156 .allocate(
157 table_id,
158 ®ion_number_and_partition_exprs
159 .iter()
160 .map(|(n, p)| (*n, p.as_str()))
161 .collect::<Vec<_>>(),
162 )
163 .await
164 .context(error::AllocateRegionRoutesSnafu { table_id })?;
165 let wal_options = ctx
166 .wal_options_allocator
167 .allocate(
168 &allocate_regions
169 .iter()
170 .map(|r| r.region_id.region_number())
171 .collect::<Vec<_>>(),
172 table_info_value.table_info.meta.options.skip_wal,
173 )
174 .await
175 .context(error::AllocateWalOptionsSnafu { table_id })?;
176
177 let new_region_count = new_allocated_region_routes.len();
178 let new_regions_brief: Vec<_> = new_allocated_region_routes
179 .iter()
180 .map(|route| {
181 let region_id = route.region.id;
182 let peer = route.leader_peer.as_ref().map(|p| p.id).unwrap_or_default();
183 format!("region_id: {}, peer: {}", region_id, peer)
184 })
185 .collect();
186 info!(
187 "Allocated regions for repartition, table_id: {}, new_region_count: {}, new_regions: {:?}",
188 table_id, new_region_count, new_regions_brief
189 );
190
191 let _operating_guards = Context::register_operating_regions(
193 &ctx.memory_region_keeper,
194 &new_allocated_region_routes,
195 )?;
196 AllocateRegion::allocate_regions(
198 &ctx.node_manager,
199 &table_info_value.table_info,
200 &new_allocated_region_routes,
201 &wal_options,
202 )
203 .await?;
204
205 let table_lock = TableLock::Write(table_id).into();
207 let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
208 let new_region_routes =
209 AllocateRegion::generate_region_routes(region_routes, &new_allocated_region_routes);
210 ctx.update_table_route(&table_route_value, new_region_routes, wal_options)
211 .await?;
212 ctx.invalidate_table_cache().await?;
213
214 ctx.update_allocate_region_elapsed(timer.elapsed());
215 Ok((Box::new(Dispatch), Status::executing(true)))
216 }
217}
218
219#[async_trait::async_trait]
220#[typetag::serde]
221impl State for AllocateRegion {
222 async fn next(
223 &mut self,
224 ctx: &mut Context,
225 procedure_ctx: &ProcedureContext,
226 ) -> Result<(Box<dyn State>, Status)> {
227 match self {
228 AllocateRegion::Build(build_plan) => build_plan.next(ctx, procedure_ctx).await,
229 AllocateRegion::Execute(execute_plan) => execute_plan.next(ctx, procedure_ctx).await,
230 }
231 }
232
233 fn as_any(&self) -> &dyn Any {
234 self
235 }
236}
237
238impl AllocateRegion {
239 pub fn new(plan_entries: Vec<AllocationPlanEntry>) -> Self {
240 AllocateRegion::Build(BuildPlan { plan_entries })
241 }
242
243 fn generate_region_routes(
244 region_routes: &[RegionRoute],
245 new_allocated_region_ids: &[RegionRoute],
246 ) -> Vec<RegionRoute> {
247 let region_ids = region_routes
248 .iter()
249 .map(|r| r.region.id)
250 .collect::<HashSet<_>>();
251 let mut new_region_routes = region_routes.to_vec();
252 for new_allocated_region_id in new_allocated_region_ids {
253 if !region_ids.contains(&new_allocated_region_id.region.id) {
254 new_region_routes.push(new_allocated_region_id.clone());
255 }
256 }
257 new_region_routes
258 }
259
260 fn convert_to_repartition_plans(
268 table_id: TableId,
269 next_region_number: &mut RegionNumber,
270 plan_entries: &[AllocationPlanEntry],
271 current_region_routes: &[RegionRoute],
272 ) -> Result<Vec<RepartitionPlanEntry>> {
273 let region_routes_map = current_region_routes
274 .iter()
275 .map(|route| (route.region.id, route))
276 .collect::<HashMap<_, _>>();
277
278 plan_entries
279 .iter()
280 .map(|plan_entry| {
281 let mut plan = convert_allocation_plan_to_repartition_plan(
282 table_id,
283 next_region_number,
284 plan_entry,
285 );
286 Self::capture_plan_original_target_routes(&mut plan, ®ion_routes_map)?;
287 Ok(plan)
288 })
289 .collect()
290 }
291
292 fn capture_plan_original_target_routes(
293 plan: &mut RepartitionPlanEntry,
294 region_routes_map: &HashMap<RegionId, &RegionRoute>,
295 ) -> Result<()> {
296 let mut original_target_routes = Vec::with_capacity(plan.target_regions.len());
300 for target in &plan.target_regions {
301 if plan.allocated_region_ids.contains(&target.region_id) {
302 continue;
304 }
305 let route = region_routes_map.get(&target.region_id).context(
306 error::RepartitionTargetRegionMissingSnafu {
307 group_id: plan.group_id,
308 region_id: target.region_id,
309 },
310 )?;
311 {
312 original_target_routes.push((*route).clone());
313 }
314 }
315
316 plan.original_target_routes = original_target_routes;
317 Ok(())
318 }
319
320 fn collect_allocate_regions(
322 repartition_plan_entries: &[RepartitionPlanEntry],
323 ) -> Vec<&RegionDescriptor> {
324 repartition_plan_entries
325 .iter()
326 .flat_map(|p| p.allocate_regions())
327 .collect()
328 }
329
330 fn prepare_region_allocation_data(
332 allocate_regions: &[&RegionDescriptor],
333 ) -> Result<Vec<(RegionNumber, String)>> {
334 allocate_regions
335 .iter()
336 .map(|r| {
337 Ok((
338 r.region_id.region_number(),
339 r.partition_expr
340 .as_json_str()
341 .context(error::SerializePartitionExprSnafu)?,
342 ))
343 })
344 .collect()
345 }
346
347 fn count_regions_to_allocate(repartition_plan_entries: &[RepartitionPlanEntry]) -> usize {
349 repartition_plan_entries
350 .iter()
351 .map(|p| p.allocated_region_ids.len())
352 .sum()
353 }
354
355 fn get_next_region_number(max_region_number: RegionNumber) -> RegionNumber {
357 max_region_number + 1
358 }
359
360 async fn allocate_regions(
361 node_manager: &NodeManagerRef,
362 raw_table_info: &TableInfo,
363 region_routes: &[RegionRoute],
364 wal_options: &HashMap<RegionNumber, String>,
365 ) -> Result<()> {
366 let table_ref = TableReference::full(
367 &raw_table_info.catalog_name,
368 &raw_table_info.schema_name,
369 &raw_table_info.name,
370 );
371 let table_id = raw_table_info.ident.table_id;
372 let request = build_template_from_raw_table_info_for_physical_table(raw_table_info)
375 .context(error::BuildCreateRequestSnafu { table_id })?;
376 common_telemetry::debug!(
377 "Allocating regions request, table_id: {}, request: {:?}",
378 table_id,
379 request
380 );
381 let builder = CreateRequestBuilder::new(request, None);
382 let region_count = region_routes.len();
383 let wal_region_count = wal_options.len();
384 info!(
385 "Allocating regions on datanodes, table_id: {}, region_count: {}, wal_regions: {}",
386 table_id, region_count, wal_region_count
387 );
388 let executor = CreateTableExecutor::new(table_ref.into(), false, builder);
389 executor
390 .on_create_regions(node_manager, table_id, region_routes, wal_options)
391 .await
392 .context(error::AllocateRegionsSnafu { table_id })?;
393
394 Ok(())
395 }
396}
397
398#[cfg(test)]
399mod tests {
400 use common_meta::peer::Peer;
401 use common_meta::rpc::router::{Region, RegionRoute};
402 use store_api::storage::RegionId;
403 use uuid::Uuid;
404
405 use super::*;
406 use crate::procedure::repartition::State;
407 use crate::procedure::repartition::test_util::range_expr;
408
409 fn create_region_descriptor(
410 table_id: TableId,
411 region_number: u32,
412 col: &str,
413 start: i64,
414 end: i64,
415 ) -> RegionDescriptor {
416 RegionDescriptor {
417 region_id: RegionId::new(table_id, region_number),
418 partition_expr: range_expr(col, start, end),
419 }
420 }
421
422 fn create_allocation_plan_entry(
423 table_id: TableId,
424 source_region_numbers: &[u32],
425 target_ranges: &[(i64, i64)],
426 ) -> AllocationPlanEntry {
427 let source_regions = source_region_numbers
428 .iter()
429 .enumerate()
430 .map(|(i, &n)| {
431 let start = i as i64 * 100;
432 let end = (i + 1) as i64 * 100;
433 create_region_descriptor(table_id, n, "x", start, end)
434 })
435 .collect();
436
437 let target_partition_exprs = target_ranges
438 .iter()
439 .map(|&(start, end)| range_expr("x", start, end))
440 .collect();
441
442 AllocationPlanEntry {
443 group_id: Uuid::new_v4(),
444 source_regions,
445 target_partition_exprs,
446 transition_map: vec![],
447 }
448 }
449
450 fn create_current_region_routes(table_id: TableId, region_numbers: &[u32]) -> Vec<RegionRoute> {
451 region_numbers
452 .iter()
453 .map(|region_number| RegionRoute {
454 region: Region {
455 id: RegionId::new(table_id, *region_number),
456 ..Default::default()
457 },
458 leader_peer: Some(Peer::empty(1)),
459 ..Default::default()
460 })
461 .collect()
462 }
463
464 #[test]
465 fn test_convert_to_repartition_plans_no_allocation() {
466 let table_id = 1024;
467 let mut next_region_number = 10;
468
469 let plan_entries = vec![create_allocation_plan_entry(
471 table_id,
472 &[1, 2],
473 &[(0, 50), (50, 200)],
474 )];
475
476 let result = AllocateRegion::convert_to_repartition_plans(
477 table_id,
478 &mut next_region_number,
479 &plan_entries,
480 &create_current_region_routes(table_id, &[1, 2]),
481 )
482 .unwrap();
483
484 assert_eq!(result.len(), 1);
485 assert_eq!(result[0].target_regions.len(), 2);
486 assert!(result[0].allocated_region_ids.is_empty());
487 assert_eq!(next_region_number, 10);
489 }
490
491 #[test]
492 fn test_convert_to_repartition_plans_with_allocation() {
493 let table_id = 1024;
494 let mut next_region_number = 10;
495
496 let plan_entries = vec![create_allocation_plan_entry(
498 table_id,
499 &[1, 2],
500 &[(0, 50), (50, 100), (100, 150), (150, 200)],
501 )];
502
503 let result = AllocateRegion::convert_to_repartition_plans(
504 table_id,
505 &mut next_region_number,
506 &plan_entries,
507 &create_current_region_routes(table_id, &[1, 2]),
508 )
509 .unwrap();
510
511 assert_eq!(result.len(), 1);
512 assert_eq!(result[0].target_regions.len(), 4);
513 assert_eq!(result[0].allocated_region_ids.len(), 2);
514 assert_eq!(
515 result[0].allocated_region_ids[0],
516 RegionId::new(table_id, 10)
517 );
518 assert_eq!(
519 result[0].allocated_region_ids[1],
520 RegionId::new(table_id, 11)
521 );
522 assert_eq!(next_region_number, 12);
524 }
525
526 #[test]
527 fn test_convert_to_repartition_plans_multiple_entries() {
528 let table_id = 1024;
529 let mut next_region_number = 10;
530
531 let plan_entries = vec![
533 create_allocation_plan_entry(table_id, &[1], &[(0, 50), (50, 100)]), create_allocation_plan_entry(table_id, &[2, 3], &[(100, 150), (150, 200)]), create_allocation_plan_entry(table_id, &[4], &[(200, 250), (250, 300), (300, 400)]), ];
537
538 let result = AllocateRegion::convert_to_repartition_plans(
539 table_id,
540 &mut next_region_number,
541 &plan_entries,
542 &create_current_region_routes(table_id, &[1, 2, 3, 4]),
543 )
544 .unwrap();
545
546 assert_eq!(result.len(), 3);
547 assert_eq!(result[0].allocated_region_ids.len(), 1);
548 assert_eq!(result[1].allocated_region_ids.len(), 0);
549 assert_eq!(result[2].allocated_region_ids.len(), 2);
550 assert_eq!(next_region_number, 13);
552 }
553
554 #[test]
555 fn test_count_regions_to_allocate() {
556 let table_id = 1024;
557 let mut next_region_number = 10;
558
559 let plan_entries = vec![
560 create_allocation_plan_entry(table_id, &[1], &[(0, 50), (50, 100)]), create_allocation_plan_entry(table_id, &[2, 3], &[(100, 200)]), create_allocation_plan_entry(table_id, &[4], &[(200, 250), (250, 300)]), ];
564
565 let repartition_plans = AllocateRegion::convert_to_repartition_plans(
566 table_id,
567 &mut next_region_number,
568 &plan_entries,
569 &create_current_region_routes(table_id, &[1, 2, 3, 4]),
570 )
571 .unwrap();
572
573 let count = AllocateRegion::count_regions_to_allocate(&repartition_plans);
574 assert_eq!(count, 2);
575 }
576
577 #[test]
578 fn test_collect_allocate_regions() {
579 let table_id = 1024;
580 let mut next_region_number = 10;
581
582 let plan_entries = vec![
583 create_allocation_plan_entry(table_id, &[1], &[(0, 50), (50, 100)]), create_allocation_plan_entry(table_id, &[2], &[(100, 150), (150, 200)]), ];
586
587 let repartition_plans = AllocateRegion::convert_to_repartition_plans(
588 table_id,
589 &mut next_region_number,
590 &plan_entries,
591 &create_current_region_routes(table_id, &[1, 2]),
592 )
593 .unwrap();
594
595 let allocate_regions = AllocateRegion::collect_allocate_regions(&repartition_plans);
596 assert_eq!(allocate_regions.len(), 2);
597 assert_eq!(allocate_regions[0].region_id, RegionId::new(table_id, 10));
598 assert_eq!(allocate_regions[1].region_id, RegionId::new(table_id, 11));
599 }
600
601 #[test]
602 fn test_prepare_region_allocation_data() {
603 let table_id = 1024;
604 let regions = [
605 create_region_descriptor(table_id, 10, "x", 0, 50),
606 create_region_descriptor(table_id, 11, "x", 50, 100),
607 ];
608 let region_refs: Vec<&RegionDescriptor> = regions.iter().collect();
609
610 let result = AllocateRegion::prepare_region_allocation_data(®ion_refs).unwrap();
611
612 assert_eq!(result.len(), 2);
613 assert_eq!(result[0].0, 10);
614 assert_eq!(result[1].0, 11);
615 assert!(!result[0].1.is_empty());
617 assert!(!result[1].1.is_empty());
618 }
619
620 #[test]
621 fn test_allocate_region_state_backward_compatibility() {
622 let serialized = r#"{"repartition_state":"AllocateRegion","plan_entries":[]}"#;
624
625 let state: Box<dyn State> = serde_json::from_str(serialized).unwrap();
627
628 let allocate_region = state
630 .as_any()
631 .downcast_ref::<AllocateRegion>()
632 .expect("expected AllocateRegion state");
633 match allocate_region {
634 AllocateRegion::Build(build_plan) => assert!(build_plan.plan_entries.is_empty()),
635 AllocateRegion::Execute(_) => panic!("expected build plan"),
636 }
637 }
638
639 #[test]
640 fn test_allocate_region_state_round_trip() {
641 let state: Box<dyn State> = Box::new(AllocateRegion::new(vec![]));
643
644 let serialized = serde_json::to_string(&state).unwrap();
646 let deserialized: Box<dyn State> = serde_json::from_str(&serialized).unwrap();
647
648 assert_eq!(
650 serialized,
651 r#"{"repartition_state":"AllocateRegion","Build":{"plan_entries":[]}}"#
652 );
653 let allocate_region = deserialized
654 .as_any()
655 .downcast_ref::<AllocateRegion>()
656 .expect("expected AllocateRegion state");
657 match allocate_region {
658 AllocateRegion::Build(build_plan) => assert!(build_plan.plan_entries.is_empty()),
659 AllocateRegion::Execute(_) => panic!("expected build plan"),
660 }
661 }
662
663 #[test]
664 fn test_allocate_region_execute_state_round_trip() {
665 let state: Box<dyn State> = Box::new(AllocateRegion::Execute(ExecutePlan));
667
668 let serialized = serde_json::to_string(&state).unwrap();
670 let deserialized: Box<dyn State> = serde_json::from_str(&serialized).unwrap();
671
672 assert_eq!(
674 serialized,
675 r#"{"repartition_state":"AllocateRegion","Execute":null}"#
676 );
677 let allocate_region = deserialized
678 .as_any()
679 .downcast_ref::<AllocateRegion>()
680 .expect("expected AllocateRegion state");
681 match allocate_region {
682 AllocateRegion::Execute(_) => {}
683 AllocateRegion::Build(_) => panic!("expected execute plan"),
684 }
685 }
686}