1use std::any::Any;
16use std::collections::{HashMap, HashSet};
17
18use common_meta::ddl::create_table::executor::CreateTableExecutor;
19use common_meta::ddl::create_table::template::{
20 CreateRequestBuilder, build_template_from_raw_table_info_for_physical_table,
21};
22use common_meta::lock_key::TableLock;
23use common_meta::node_manager::NodeManagerRef;
24use common_meta::rpc::router::RegionRoute;
25use common_procedure::{Context as ProcedureContext, Status};
26use common_telemetry::{debug, info};
27use serde::{Deserialize, Deserializer, Serialize};
28use snafu::ResultExt;
29use store_api::storage::{RegionNumber, TableId};
30use table::metadata::TableInfo;
31use table::table_reference::TableReference;
32use tokio::time::Instant;
33
34use crate::error::{self, Result};
35use crate::procedure::repartition::dispatch::Dispatch;
36use crate::procedure::repartition::plan::{
37 AllocationPlanEntry, RegionDescriptor, RepartitionPlanEntry,
38 convert_allocation_plan_to_repartition_plan,
39};
40use crate::procedure::repartition::{Context, State};
41
42#[derive(Debug, Clone, Serialize)]
43pub enum AllocateRegion {
44 Build(BuildPlan),
45 Execute(ExecutePlan),
46}
47
48impl<'de> Deserialize<'de> for AllocateRegion {
49 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
50 where
51 D: Deserializer<'de>,
52 {
53 #[derive(Deserialize)]
54 enum CurrentAllocateRegion {
55 Build(BuildPlan),
56 Execute(ExecutePlan),
57 }
58
59 #[derive(Deserialize)]
60 struct LegacyAllocateRegion {
61 plan_entries: Vec<AllocationPlanEntry>,
62 }
63
64 #[derive(Deserialize)]
65 #[serde(untagged)]
66 enum AllocateRegionRepr {
67 Current(CurrentAllocateRegion),
68 Legacy(LegacyAllocateRegion),
69 }
70
71 match AllocateRegionRepr::deserialize(deserializer)? {
72 AllocateRegionRepr::Current(CurrentAllocateRegion::Build(build_plan)) => {
73 Ok(Self::Build(build_plan))
74 }
75 AllocateRegionRepr::Current(CurrentAllocateRegion::Execute(execute_plan)) => {
76 Ok(Self::Execute(execute_plan))
77 }
78 AllocateRegionRepr::Legacy(legacy) => Ok(Self::Build(BuildPlan {
79 plan_entries: legacy.plan_entries,
80 })),
81 }
82 }
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct BuildPlan {
87 plan_entries: Vec<AllocationPlanEntry>,
88}
89
90impl BuildPlan {
91 async fn next(
92 &mut self,
93 ctx: &mut Context,
94 _procedure_ctx: &ProcedureContext,
95 ) -> Result<(Box<dyn State>, Status)> {
96 let timer = Instant::now();
97 let table_id = ctx.persistent_ctx.table_id;
98 let table_route_value = ctx.get_table_route_value().await?;
99 let mut next_region_number =
100 AllocateRegion::get_next_region_number(table_route_value.max_region_number().unwrap());
101
102 let repartition_plan_entries = AllocateRegion::convert_to_repartition_plans(
104 table_id,
105 &mut next_region_number,
106 &self.plan_entries,
107 );
108 let plan_count = repartition_plan_entries.len();
109 let to_allocate = AllocateRegion::count_regions_to_allocate(&repartition_plan_entries);
110 info!(
111 "Repartition allocate regions start, table_id: {}, groups: {}, regions_to_allocate: {}",
112 table_id, plan_count, to_allocate
113 );
114
115 if AllocateRegion::count_regions_to_allocate(&repartition_plan_entries) == 0 {
117 ctx.persistent_ctx.plans = repartition_plan_entries;
118 ctx.update_allocate_region_elapsed(timer.elapsed());
119 return Ok((Box::new(Dispatch), Status::executing(true)));
120 }
121
122 ctx.persistent_ctx.plans = repartition_plan_entries;
123 debug!(
124 "Repartition allocate regions build plan completed, table_id: {}, elapsed: {:?}",
125 table_id,
126 timer.elapsed()
127 );
128 Ok((
129 Box::new(AllocateRegion::Execute(ExecutePlan)),
130 Status::executing(true),
131 ))
132 }
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize)]
136pub struct ExecutePlan;
137
138impl ExecutePlan {
139 async fn next(
140 &mut self,
141 ctx: &mut Context,
142 procedure_ctx: &ProcedureContext,
143 ) -> Result<(Box<dyn State>, Status)> {
144 let timer = Instant::now();
145 let table_id = ctx.persistent_ctx.table_id;
146 let allocate_regions = AllocateRegion::collect_allocate_regions(&ctx.persistent_ctx.plans);
147 let region_number_and_partition_exprs =
148 AllocateRegion::prepare_region_allocation_data(&allocate_regions)?;
149 let table_info_value = ctx.get_table_info_value().await?;
150 let table_route_value = ctx.get_table_route_value().await?;
151 let region_routes = table_route_value.region_routes().unwrap();
153 let new_allocated_region_routes = ctx
154 .region_routes_allocator
155 .allocate(
156 table_id,
157 ®ion_number_and_partition_exprs
158 .iter()
159 .map(|(n, p)| (*n, p.as_str()))
160 .collect::<Vec<_>>(),
161 )
162 .await
163 .context(error::AllocateRegionRoutesSnafu { table_id })?;
164 let wal_options = ctx
165 .wal_options_allocator
166 .allocate(
167 &allocate_regions
168 .iter()
169 .map(|r| r.region_id.region_number())
170 .collect::<Vec<_>>(),
171 table_info_value.table_info.meta.options.skip_wal,
172 )
173 .await
174 .context(error::AllocateWalOptionsSnafu { table_id })?;
175
176 let new_region_count = new_allocated_region_routes.len();
177 let new_regions_brief: Vec<_> = new_allocated_region_routes
178 .iter()
179 .map(|route| {
180 let region_id = route.region.id;
181 let peer = route.leader_peer.as_ref().map(|p| p.id).unwrap_or_default();
182 format!("region_id: {}, peer: {}", region_id, peer)
183 })
184 .collect();
185 info!(
186 "Allocated regions for repartition, table_id: {}, new_region_count: {}, new_regions: {:?}",
187 table_id, new_region_count, new_regions_brief
188 );
189
190 let _operating_guards = Context::register_operating_regions(
192 &ctx.memory_region_keeper,
193 &new_allocated_region_routes,
194 )?;
195 AllocateRegion::allocate_regions(
197 &ctx.node_manager,
198 &table_info_value.table_info,
199 &new_allocated_region_routes,
200 &wal_options,
201 )
202 .await?;
203
204 let table_lock = TableLock::Write(table_id).into();
206 let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
207 let new_region_routes =
208 AllocateRegion::generate_region_routes(region_routes, &new_allocated_region_routes);
209 ctx.update_table_route(&table_route_value, new_region_routes, wal_options)
210 .await?;
211 ctx.invalidate_table_cache().await?;
212
213 ctx.update_allocate_region_elapsed(timer.elapsed());
214 Ok((Box::new(Dispatch), Status::executing(true)))
215 }
216}
217
218#[async_trait::async_trait]
219#[typetag::serde]
220impl State for AllocateRegion {
221 async fn next(
222 &mut self,
223 ctx: &mut Context,
224 procedure_ctx: &ProcedureContext,
225 ) -> Result<(Box<dyn State>, Status)> {
226 match self {
227 AllocateRegion::Build(build_plan) => build_plan.next(ctx, procedure_ctx).await,
228 AllocateRegion::Execute(execute_plan) => execute_plan.next(ctx, procedure_ctx).await,
229 }
230 }
231
232 fn as_any(&self) -> &dyn Any {
233 self
234 }
235}
236
237impl AllocateRegion {
238 pub fn new(plan_entries: Vec<AllocationPlanEntry>) -> Self {
239 AllocateRegion::Build(BuildPlan { plan_entries })
240 }
241
242 fn generate_region_routes(
243 region_routes: &[RegionRoute],
244 new_allocated_region_ids: &[RegionRoute],
245 ) -> Vec<RegionRoute> {
246 let region_ids = region_routes
247 .iter()
248 .map(|r| r.region.id)
249 .collect::<HashSet<_>>();
250 let mut new_region_routes = region_routes.to_vec();
251 for new_allocated_region_id in new_allocated_region_ids {
252 if !region_ids.contains(&new_allocated_region_id.region.id) {
253 new_region_routes.push(new_allocated_region_id.clone());
254 }
255 }
256 new_region_routes
257 }
258
259 fn convert_to_repartition_plans(
264 table_id: TableId,
265 next_region_number: &mut RegionNumber,
266 plan_entries: &[AllocationPlanEntry],
267 ) -> Vec<RepartitionPlanEntry> {
268 plan_entries
269 .iter()
270 .map(|plan_entry| {
271 convert_allocation_plan_to_repartition_plan(
272 table_id,
273 next_region_number,
274 plan_entry,
275 )
276 })
277 .collect()
278 }
279
280 fn collect_allocate_regions(
282 repartition_plan_entries: &[RepartitionPlanEntry],
283 ) -> Vec<&RegionDescriptor> {
284 repartition_plan_entries
285 .iter()
286 .flat_map(|p| p.allocate_regions())
287 .collect()
288 }
289
290 fn prepare_region_allocation_data(
292 allocate_regions: &[&RegionDescriptor],
293 ) -> Result<Vec<(RegionNumber, String)>> {
294 allocate_regions
295 .iter()
296 .map(|r| {
297 Ok((
298 r.region_id.region_number(),
299 r.partition_expr
300 .as_json_str()
301 .context(error::SerializePartitionExprSnafu)?,
302 ))
303 })
304 .collect()
305 }
306
307 fn count_regions_to_allocate(repartition_plan_entries: &[RepartitionPlanEntry]) -> usize {
309 repartition_plan_entries
310 .iter()
311 .map(|p| p.allocated_region_ids.len())
312 .sum()
313 }
314
315 fn get_next_region_number(max_region_number: RegionNumber) -> RegionNumber {
317 max_region_number + 1
318 }
319
320 async fn allocate_regions(
321 node_manager: &NodeManagerRef,
322 raw_table_info: &TableInfo,
323 region_routes: &[RegionRoute],
324 wal_options: &HashMap<RegionNumber, String>,
325 ) -> Result<()> {
326 let table_ref = TableReference::full(
327 &raw_table_info.catalog_name,
328 &raw_table_info.schema_name,
329 &raw_table_info.name,
330 );
331 let table_id = raw_table_info.ident.table_id;
332 let request = build_template_from_raw_table_info_for_physical_table(raw_table_info)
335 .context(error::BuildCreateRequestSnafu { table_id })?;
336 common_telemetry::debug!(
337 "Allocating regions request, table_id: {}, request: {:?}",
338 table_id,
339 request
340 );
341 let builder = CreateRequestBuilder::new(request, None);
342 let region_count = region_routes.len();
343 let wal_region_count = wal_options.len();
344 info!(
345 "Allocating regions on datanodes, table_id: {}, region_count: {}, wal_regions: {}",
346 table_id, region_count, wal_region_count
347 );
348 let executor = CreateTableExecutor::new(table_ref.into(), false, builder);
349 executor
350 .on_create_regions(node_manager, table_id, region_routes, wal_options)
351 .await
352 .context(error::AllocateRegionsSnafu { table_id })?;
353
354 Ok(())
355 }
356}
357
358#[cfg(test)]
359mod tests {
360 use store_api::storage::RegionId;
361 use uuid::Uuid;
362
363 use super::*;
364 use crate::procedure::repartition::State;
365 use crate::procedure::repartition::test_util::range_expr;
366
367 fn create_region_descriptor(
368 table_id: TableId,
369 region_number: u32,
370 col: &str,
371 start: i64,
372 end: i64,
373 ) -> RegionDescriptor {
374 RegionDescriptor {
375 region_id: RegionId::new(table_id, region_number),
376 partition_expr: range_expr(col, start, end),
377 }
378 }
379
380 fn create_allocation_plan_entry(
381 table_id: TableId,
382 source_region_numbers: &[u32],
383 target_ranges: &[(i64, i64)],
384 ) -> AllocationPlanEntry {
385 let source_regions = source_region_numbers
386 .iter()
387 .enumerate()
388 .map(|(i, &n)| {
389 let start = i as i64 * 100;
390 let end = (i + 1) as i64 * 100;
391 create_region_descriptor(table_id, n, "x", start, end)
392 })
393 .collect();
394
395 let target_partition_exprs = target_ranges
396 .iter()
397 .map(|&(start, end)| range_expr("x", start, end))
398 .collect();
399
400 AllocationPlanEntry {
401 group_id: Uuid::new_v4(),
402 source_regions,
403 target_partition_exprs,
404 transition_map: vec![],
405 }
406 }
407
408 #[test]
409 fn test_convert_to_repartition_plans_no_allocation() {
410 let table_id = 1024;
411 let mut next_region_number = 10;
412
413 let plan_entries = vec![create_allocation_plan_entry(
415 table_id,
416 &[1, 2],
417 &[(0, 50), (50, 200)],
418 )];
419
420 let result = AllocateRegion::convert_to_repartition_plans(
421 table_id,
422 &mut next_region_number,
423 &plan_entries,
424 );
425
426 assert_eq!(result.len(), 1);
427 assert_eq!(result[0].target_regions.len(), 2);
428 assert!(result[0].allocated_region_ids.is_empty());
429 assert_eq!(next_region_number, 10);
431 }
432
433 #[test]
434 fn test_convert_to_repartition_plans_with_allocation() {
435 let table_id = 1024;
436 let mut next_region_number = 10;
437
438 let plan_entries = vec![create_allocation_plan_entry(
440 table_id,
441 &[1, 2],
442 &[(0, 50), (50, 100), (100, 150), (150, 200)],
443 )];
444
445 let result = AllocateRegion::convert_to_repartition_plans(
446 table_id,
447 &mut next_region_number,
448 &plan_entries,
449 );
450
451 assert_eq!(result.len(), 1);
452 assert_eq!(result[0].target_regions.len(), 4);
453 assert_eq!(result[0].allocated_region_ids.len(), 2);
454 assert_eq!(
455 result[0].allocated_region_ids[0],
456 RegionId::new(table_id, 10)
457 );
458 assert_eq!(
459 result[0].allocated_region_ids[1],
460 RegionId::new(table_id, 11)
461 );
462 assert_eq!(next_region_number, 12);
464 }
465
466 #[test]
467 fn test_convert_to_repartition_plans_multiple_entries() {
468 let table_id = 1024;
469 let mut next_region_number = 10;
470
471 let plan_entries = vec![
473 create_allocation_plan_entry(table_id, &[1], &[(0, 50), (50, 100)]), create_allocation_plan_entry(table_id, &[2, 3], &[(100, 150), (150, 200)]), create_allocation_plan_entry(table_id, &[4], &[(200, 250), (250, 300), (300, 400)]), ];
477
478 let result = AllocateRegion::convert_to_repartition_plans(
479 table_id,
480 &mut next_region_number,
481 &plan_entries,
482 );
483
484 assert_eq!(result.len(), 3);
485 assert_eq!(result[0].allocated_region_ids.len(), 1);
486 assert_eq!(result[1].allocated_region_ids.len(), 0);
487 assert_eq!(result[2].allocated_region_ids.len(), 2);
488 assert_eq!(next_region_number, 13);
490 }
491
492 #[test]
493 fn test_count_regions_to_allocate() {
494 let table_id = 1024;
495 let mut next_region_number = 10;
496
497 let plan_entries = vec![
498 create_allocation_plan_entry(table_id, &[1], &[(0, 50), (50, 100)]), create_allocation_plan_entry(table_id, &[2, 3], &[(100, 200)]), create_allocation_plan_entry(table_id, &[4], &[(200, 250), (250, 300)]), ];
502
503 let repartition_plans = AllocateRegion::convert_to_repartition_plans(
504 table_id,
505 &mut next_region_number,
506 &plan_entries,
507 );
508
509 let count = AllocateRegion::count_regions_to_allocate(&repartition_plans);
510 assert_eq!(count, 2);
511 }
512
513 #[test]
514 fn test_collect_allocate_regions() {
515 let table_id = 1024;
516 let mut next_region_number = 10;
517
518 let plan_entries = vec![
519 create_allocation_plan_entry(table_id, &[1], &[(0, 50), (50, 100)]), create_allocation_plan_entry(table_id, &[2], &[(100, 150), (150, 200)]), ];
522
523 let repartition_plans = AllocateRegion::convert_to_repartition_plans(
524 table_id,
525 &mut next_region_number,
526 &plan_entries,
527 );
528
529 let allocate_regions = AllocateRegion::collect_allocate_regions(&repartition_plans);
530 assert_eq!(allocate_regions.len(), 2);
531 assert_eq!(allocate_regions[0].region_id, RegionId::new(table_id, 10));
532 assert_eq!(allocate_regions[1].region_id, RegionId::new(table_id, 11));
533 }
534
535 #[test]
536 fn test_prepare_region_allocation_data() {
537 let table_id = 1024;
538 let regions = [
539 create_region_descriptor(table_id, 10, "x", 0, 50),
540 create_region_descriptor(table_id, 11, "x", 50, 100),
541 ];
542 let region_refs: Vec<&RegionDescriptor> = regions.iter().collect();
543
544 let result = AllocateRegion::prepare_region_allocation_data(®ion_refs).unwrap();
545
546 assert_eq!(result.len(), 2);
547 assert_eq!(result[0].0, 10);
548 assert_eq!(result[1].0, 11);
549 assert!(!result[0].1.is_empty());
551 assert!(!result[1].1.is_empty());
552 }
553
554 #[test]
555 fn test_allocate_region_state_backward_compatibility() {
556 let serialized = r#"{"repartition_state":"AllocateRegion","plan_entries":[]}"#;
558
559 let state: Box<dyn State> = serde_json::from_str(serialized).unwrap();
561
562 let allocate_region = state
564 .as_any()
565 .downcast_ref::<AllocateRegion>()
566 .expect("expected AllocateRegion state");
567 match allocate_region {
568 AllocateRegion::Build(build_plan) => assert!(build_plan.plan_entries.is_empty()),
569 AllocateRegion::Execute(_) => panic!("expected build plan"),
570 }
571 }
572
573 #[test]
574 fn test_allocate_region_state_round_trip() {
575 let state: Box<dyn State> = Box::new(AllocateRegion::new(vec![]));
577
578 let serialized = serde_json::to_string(&state).unwrap();
580 let deserialized: Box<dyn State> = serde_json::from_str(&serialized).unwrap();
581
582 assert_eq!(
584 serialized,
585 r#"{"repartition_state":"AllocateRegion","Build":{"plan_entries":[]}}"#
586 );
587 let allocate_region = deserialized
588 .as_any()
589 .downcast_ref::<AllocateRegion>()
590 .expect("expected AllocateRegion state");
591 match allocate_region {
592 AllocateRegion::Build(build_plan) => assert!(build_plan.plan_entries.is_empty()),
593 AllocateRegion::Execute(_) => panic!("expected build plan"),
594 }
595 }
596
597 #[test]
598 fn test_allocate_region_execute_state_round_trip() {
599 let state: Box<dyn State> = Box::new(AllocateRegion::Execute(ExecutePlan));
601
602 let serialized = serde_json::to_string(&state).unwrap();
604 let deserialized: Box<dyn State> = serde_json::from_str(&serialized).unwrap();
605
606 assert_eq!(
608 serialized,
609 r#"{"repartition_state":"AllocateRegion","Execute":null}"#
610 );
611 let allocate_region = deserialized
612 .as_any()
613 .downcast_ref::<AllocateRegion>()
614 .expect("expected AllocateRegion state");
615 match allocate_region {
616 AllocateRegion::Execute(_) => {}
617 AllocateRegion::Build(_) => panic!("expected execute plan"),
618 }
619 }
620}