meta_srv/procedure/repartition/
repartition_start.rs1use std::any::Any;
16
17use common_meta::key::table_route::PhysicalTableRouteValue;
18use common_procedure::{Context as ProcedureContext, Status};
19use partition::expr::PartitionExpr;
20use partition::subtask::{self, RepartitionSubtask};
21use serde::{Deserialize, Serialize};
22use snafu::{OptionExt, ResultExt};
23use uuid::Uuid;
24
25use crate::error::{self, Result};
26use crate::procedure::repartition::allocate_region::AllocateRegion;
27use crate::procedure::repartition::plan::{AllocationPlanEntry, RegionDescriptor};
28use crate::procedure::repartition::repartition_end::RepartitionEnd;
29use crate::procedure::repartition::{Context, State};
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct RepartitionStart {
33 from_exprs: Vec<PartitionExpr>,
34 to_exprs: Vec<PartitionExpr>,
35}
36
37impl RepartitionStart {
38 pub fn new(from_exprs: Vec<PartitionExpr>, to_exprs: Vec<PartitionExpr>) -> Self {
39 Self {
40 from_exprs,
41 to_exprs,
42 }
43 }
44}
45
46#[async_trait::async_trait]
47#[typetag::serde]
48impl State for RepartitionStart {
49 async fn next(
50 &mut self,
51 ctx: &mut Context,
52 _: &ProcedureContext,
53 ) -> Result<(Box<dyn State>, Status)> {
54 let (_, table_route) = ctx
55 .table_metadata_manager
56 .table_route_manager()
57 .get_physical_table_route(ctx.persistent_ctx.table_id)
58 .await
59 .context(error::TableMetadataManagerSnafu)?;
60
61 let plans = Self::build_plan(&table_route, &self.from_exprs, &self.to_exprs)?;
62
63 if plans.is_empty() {
64 return Ok((Box::new(RepartitionEnd), Status::done()));
65 }
66
67 Ok((
68 Box::new(AllocateRegion::new(plans)),
69 Status::executing(false),
70 ))
71 }
72
73 fn as_any(&self) -> &dyn Any {
74 self
75 }
76}
77
78impl RepartitionStart {
79 #[allow(dead_code)]
80 fn build_plan(
81 physical_route: &PhysicalTableRouteValue,
82 from_exprs: &[PartitionExpr],
83 to_exprs: &[PartitionExpr],
84 ) -> Result<Vec<AllocationPlanEntry>> {
85 let subtasks = subtask::create_subtasks(from_exprs, to_exprs)
86 .context(error::RepartitionCreateSubtasksSnafu)?;
87 if subtasks.is_empty() {
88 return Ok(vec![]);
89 }
90
91 let src_descriptors = Self::source_region_descriptors(from_exprs, physical_route)?;
92 Ok(Self::build_plan_entries(
93 subtasks,
94 &src_descriptors,
95 to_exprs,
96 ))
97 }
98
99 #[allow(dead_code)]
100 fn build_plan_entries(
101 subtasks: Vec<RepartitionSubtask>,
102 source_index: &[RegionDescriptor],
103 target_exprs: &[PartitionExpr],
104 ) -> Vec<AllocationPlanEntry> {
105 subtasks
106 .into_iter()
107 .map(|subtask| {
108 let group_id = Uuid::new_v4();
109 let source_regions = subtask
110 .from_expr_indices
111 .iter()
112 .map(|&idx| source_index[idx].clone())
113 .collect::<Vec<_>>();
114
115 let target_partition_exprs = subtask
116 .to_expr_indices
117 .iter()
118 .map(|&idx| target_exprs[idx].clone())
119 .collect::<Vec<_>>();
120 let regions_to_allocate = target_partition_exprs
121 .len()
122 .saturating_sub(source_regions.len());
123 let regions_to_deallocate = source_regions
124 .len()
125 .saturating_sub(target_partition_exprs.len());
126 AllocationPlanEntry {
127 group_id,
128 source_regions,
129 target_partition_exprs,
130 regions_to_allocate,
131 regions_to_deallocate,
132 }
133 })
134 .collect::<Vec<_>>()
135 }
136
137 fn source_region_descriptors(
138 from_exprs: &[PartitionExpr],
139 physical_route: &PhysicalTableRouteValue,
140 ) -> Result<Vec<RegionDescriptor>> {
141 let existing_regions = physical_route
142 .region_routes
143 .iter()
144 .map(|route| (route.region.id, route.region.partition_expr()))
145 .collect::<Vec<_>>();
146
147 let descriptors = from_exprs
148 .iter()
149 .map(|expr| {
150 let expr_json = expr
151 .as_json_str()
152 .context(error::SerializePartitionExprSnafu)?;
153
154 let matched_region_id = existing_regions
155 .iter()
156 .find_map(|(region_id, existing_expr)| {
157 (existing_expr == &expr_json).then_some(*region_id)
158 })
159 .with_context(|| error::RepartitionSourceExprMismatchSnafu {
160 expr: expr_json,
161 })?;
162
163 Ok(RegionDescriptor {
164 region_id: matched_region_id,
165 partition_expr: expr.clone(),
166 })
167 })
168 .collect::<Result<Vec<_>>>()?;
169
170 Ok(descriptors)
171 }
172}