meta_srv/procedure/repartition/
repartition_start.rs1use std::any::Any;
16
17use common_meta::key::table_route::PhysicalTableRouteValue;
18use common_procedure::{Context as ProcedureContext, Status};
19use common_telemetry::debug;
20use partition::expr::PartitionExpr;
21use partition::subtask::{self, RepartitionSubtask};
22use serde::{Deserialize, Serialize};
23use snafu::{OptionExt, ResultExt, ensure};
24use tokio::time::Instant;
25use uuid::Uuid;
26
27use crate::error::{self, Result};
28use crate::procedure::repartition::allocate_region::AllocateRegion;
29use crate::procedure::repartition::plan::{AllocationPlanEntry, RegionDescriptor};
30use crate::procedure::repartition::repartition_end::RepartitionEnd;
31use crate::procedure::repartition::{Context, State};
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct RepartitionStart {
35 from_exprs: Vec<PartitionExpr>,
36 to_exprs: Vec<PartitionExpr>,
37}
38
39impl RepartitionStart {
40 pub fn new(from_exprs: Vec<PartitionExpr>, to_exprs: Vec<PartitionExpr>) -> Self {
41 Self {
42 from_exprs,
43 to_exprs,
44 }
45 }
46}
47
48#[async_trait::async_trait]
49#[typetag::serde]
50impl State for RepartitionStart {
51 async fn next(
52 &mut self,
53 ctx: &mut Context,
54 _: &ProcedureContext,
55 ) -> Result<(Box<dyn State>, Status)> {
56 let timer = Instant::now();
57 let (physical_table_id, table_route) = ctx
58 .table_metadata_manager
59 .table_route_manager()
60 .get_physical_table_route(ctx.persistent_ctx.table_id)
61 .await
62 .context(error::TableMetadataManagerSnafu)?;
63 let table_id = ctx.persistent_ctx.table_id;
64 ensure!(
65 physical_table_id == table_id,
66 error::UnexpectedSnafu {
67 violated: format!(
68 "Repartition only works on the physical table, but got logical table: {}, physical table id: {}",
69 table_id, physical_table_id
70 ),
71 }
72 );
73
74 let plans = Self::build_plan(&table_route, &self.from_exprs, &self.to_exprs)?;
75 let plan_count = plans.len();
76 let total_source_regions: usize = plans.iter().map(|p| p.source_regions.len()).sum();
77 let total_target_regions: usize =
78 plans.iter().map(|p| p.target_partition_exprs.len()).sum();
79 common_telemetry::info!(
80 "Repartition start, table_id: {}, plans: {}, total_source_regions: {}, total_target_regions: {}",
81 table_id,
82 plan_count,
83 total_source_regions,
84 total_target_regions
85 );
86
87 ctx.update_build_plan_elapsed(timer.elapsed());
88
89 if plans.is_empty() {
90 return Ok((Box::new(RepartitionEnd), Status::done()));
91 }
92
93 Ok((
94 Box::new(AllocateRegion::new(plans)),
95 Status::executing(false),
96 ))
97 }
98
99 fn as_any(&self) -> &dyn Any {
100 self
101 }
102}
103
104impl RepartitionStart {
105 fn build_plan(
106 physical_route: &PhysicalTableRouteValue,
107 from_exprs: &[PartitionExpr],
108 to_exprs: &[PartitionExpr],
109 ) -> Result<Vec<AllocationPlanEntry>> {
110 let subtasks = subtask::create_subtasks(from_exprs, to_exprs)
111 .context(error::RepartitionCreateSubtasksSnafu)?;
112 if subtasks.is_empty() {
113 return Ok(vec![]);
114 }
115
116 let src_descriptors = Self::source_region_descriptors(from_exprs, physical_route)?;
117 Ok(Self::build_plan_entries(
118 subtasks,
119 &src_descriptors,
120 to_exprs,
121 ))
122 }
123
124 fn build_plan_entries(
125 subtasks: Vec<RepartitionSubtask>,
126 source_index: &[RegionDescriptor],
127 target_exprs: &[PartitionExpr],
128 ) -> Vec<AllocationPlanEntry> {
129 subtasks
130 .into_iter()
131 .map(|subtask| {
132 let group_id = Uuid::new_v4();
133 let source_regions = subtask
134 .from_expr_indices
135 .iter()
136 .map(|&idx| source_index[idx].clone())
137 .collect::<Vec<_>>();
138
139 let target_partition_exprs = subtask
140 .to_expr_indices
141 .iter()
142 .map(|&idx| target_exprs[idx].clone())
143 .collect::<Vec<_>>();
144 AllocationPlanEntry {
145 group_id,
146 source_regions,
147 target_partition_exprs,
148 transition_map: subtask.transition_map,
149 }
150 })
151 .collect::<Vec<_>>()
152 }
153
154 fn source_region_descriptors(
155 from_exprs: &[PartitionExpr],
156 physical_route: &PhysicalTableRouteValue,
157 ) -> Result<Vec<RegionDescriptor>> {
158 let existing_regions = physical_route
159 .region_routes
160 .iter()
161 .map(|route| (route.region.id, route.region.partition_expr()))
162 .collect::<Vec<_>>();
163
164 let descriptors = from_exprs
165 .iter()
166 .map(|expr| {
167 let expr_json = expr
168 .as_json_str()
169 .context(error::SerializePartitionExprSnafu)?;
170
171 let matched_region_id = existing_regions
172 .iter()
173 .find_map(|(region_id, existing_expr)| {
174 (existing_expr == &expr_json).then_some(*region_id)
175 })
176 .with_context(|| error::RepartitionSourceExprMismatchSnafu { expr: &expr_json })
177 .inspect_err(|_| {
178 debug!("Failed to find matching region for partition expression: {}, existing regions: {:?}", expr_json, existing_regions);
179 })?;
180
181 Ok(RegionDescriptor {
182 region_id: matched_region_id,
183 partition_expr: expr.clone(),
184 })
185 })
186 .collect::<Result<Vec<_>>>()?;
187
188 Ok(descriptors)
189 }
190}