meta_srv/procedure/repartition/
repartition_start.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use common_meta::key::table_route::PhysicalTableRouteValue;
18use common_procedure::{Context as ProcedureContext, Status};
19use common_telemetry::debug;
20use partition::expr::PartitionExpr;
21use partition::subtask::{self, RepartitionSubtask};
22use serde::{Deserialize, Serialize};
23use snafu::{OptionExt, ResultExt, ensure};
24use tokio::time::Instant;
25use uuid::Uuid;
26
27use crate::error::{self, Result};
28use crate::procedure::repartition::allocate_region::AllocateRegion;
29use crate::procedure::repartition::plan::{AllocationPlanEntry, RegionDescriptor};
30use crate::procedure::repartition::repartition_end::RepartitionEnd;
31use crate::procedure::repartition::{Context, State};
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct RepartitionStart {
35    from_exprs: Vec<PartitionExpr>,
36    to_exprs: Vec<PartitionExpr>,
37}
38
39impl RepartitionStart {
40    pub fn new(from_exprs: Vec<PartitionExpr>, to_exprs: Vec<PartitionExpr>) -> Self {
41        Self {
42            from_exprs,
43            to_exprs,
44        }
45    }
46}
47
48#[async_trait::async_trait]
49#[typetag::serde]
50impl State for RepartitionStart {
51    async fn next(
52        &mut self,
53        ctx: &mut Context,
54        _: &ProcedureContext,
55    ) -> Result<(Box<dyn State>, Status)> {
56        let timer = Instant::now();
57        let (physical_table_id, table_route) = ctx
58            .table_metadata_manager
59            .table_route_manager()
60            .get_physical_table_route(ctx.persistent_ctx.table_id)
61            .await
62            .context(error::TableMetadataManagerSnafu)?;
63        let table_id = ctx.persistent_ctx.table_id;
64        ensure!(
65            physical_table_id == table_id,
66            error::UnexpectedSnafu {
67                violated: format!(
68                    "Repartition only works on the physical table, but got logical table: {}, physical table id: {}",
69                    table_id, physical_table_id
70                ),
71            }
72        );
73
74        let plans = Self::build_plan(&table_route, &self.from_exprs, &self.to_exprs)?;
75        let plan_count = plans.len();
76        let total_source_regions: usize = plans.iter().map(|p| p.source_regions.len()).sum();
77        let total_target_regions: usize =
78            plans.iter().map(|p| p.target_partition_exprs.len()).sum();
79        common_telemetry::info!(
80            "Repartition start, table_id: {}, plans: {}, total_source_regions: {}, total_target_regions: {}",
81            table_id,
82            plan_count,
83            total_source_regions,
84            total_target_regions
85        );
86
87        ctx.update_build_plan_elapsed(timer.elapsed());
88
89        if plans.is_empty() {
90            return Ok((Box::new(RepartitionEnd), Status::done()));
91        }
92
93        Ok((
94            Box::new(AllocateRegion::new(plans)),
95            Status::executing(false),
96        ))
97    }
98
99    fn as_any(&self) -> &dyn Any {
100        self
101    }
102}
103
104impl RepartitionStart {
105    fn build_plan(
106        physical_route: &PhysicalTableRouteValue,
107        from_exprs: &[PartitionExpr],
108        to_exprs: &[PartitionExpr],
109    ) -> Result<Vec<AllocationPlanEntry>> {
110        let subtasks = subtask::create_subtasks(from_exprs, to_exprs)
111            .context(error::RepartitionCreateSubtasksSnafu)?;
112        if subtasks.is_empty() {
113            return Ok(vec![]);
114        }
115
116        let src_descriptors = Self::source_region_descriptors(from_exprs, physical_route)?;
117        Ok(Self::build_plan_entries(
118            subtasks,
119            &src_descriptors,
120            to_exprs,
121        ))
122    }
123
124    fn build_plan_entries(
125        subtasks: Vec<RepartitionSubtask>,
126        source_index: &[RegionDescriptor],
127        target_exprs: &[PartitionExpr],
128    ) -> Vec<AllocationPlanEntry> {
129        subtasks
130            .into_iter()
131            .map(|subtask| {
132                let group_id = Uuid::new_v4();
133                let source_regions = subtask
134                    .from_expr_indices
135                    .iter()
136                    .map(|&idx| source_index[idx].clone())
137                    .collect::<Vec<_>>();
138
139                let target_partition_exprs = subtask
140                    .to_expr_indices
141                    .iter()
142                    .map(|&idx| target_exprs[idx].clone())
143                    .collect::<Vec<_>>();
144                AllocationPlanEntry {
145                    group_id,
146                    source_regions,
147                    target_partition_exprs,
148                    transition_map: subtask.transition_map,
149                }
150            })
151            .collect::<Vec<_>>()
152    }
153
154    fn source_region_descriptors(
155        from_exprs: &[PartitionExpr],
156        physical_route: &PhysicalTableRouteValue,
157    ) -> Result<Vec<RegionDescriptor>> {
158        let existing_regions = physical_route
159            .region_routes
160            .iter()
161            .map(|route| (route.region.id, route.region.partition_expr()))
162            .collect::<Vec<_>>();
163
164        let descriptors = from_exprs
165            .iter()
166            .map(|expr| {
167                let expr_json = expr
168                    .as_json_str()
169                    .context(error::SerializePartitionExprSnafu)?;
170
171                let matched_region_id = existing_regions
172                    .iter()
173                    .find_map(|(region_id, existing_expr)| {
174                        (existing_expr == &expr_json).then_some(*region_id)
175                    })
176                    .with_context(|| error::RepartitionSourceExprMismatchSnafu { expr: &expr_json })
177                    .inspect_err(|_| {
178                        debug!("Failed to find matching region for partition expression: {}, existing regions: {:?}", expr_json, existing_regions);
179                    })?;
180
181                Ok(RegionDescriptor {
182                    region_id: matched_region_id,
183                    partition_expr: expr.clone(),
184                })
185            })
186            .collect::<Result<Vec<_>>>()?;
187
188        Ok(descriptors)
189    }
190}