meta_srv/procedure/repartition/
repartition_start.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use common_meta::key::table_route::PhysicalTableRouteValue;
18use common_procedure::{Context as ProcedureContext, Status};
19use partition::expr::PartitionExpr;
20use partition::subtask::{self, RepartitionSubtask};
21use serde::{Deserialize, Serialize};
22use snafu::{OptionExt, ResultExt};
23use uuid::Uuid;
24
25use crate::error::{self, Result};
26use crate::procedure::repartition::allocate_region::AllocateRegion;
27use crate::procedure::repartition::plan::{AllocationPlanEntry, RegionDescriptor};
28use crate::procedure::repartition::repartition_end::RepartitionEnd;
29use crate::procedure::repartition::{Context, State};
30
31#[derive(Debug, Clone, Serialize, Deserialize)]
32pub struct RepartitionStart {
33    from_exprs: Vec<PartitionExpr>,
34    to_exprs: Vec<PartitionExpr>,
35}
36
37impl RepartitionStart {
38    pub fn new(from_exprs: Vec<PartitionExpr>, to_exprs: Vec<PartitionExpr>) -> Self {
39        Self {
40            from_exprs,
41            to_exprs,
42        }
43    }
44}
45
46#[async_trait::async_trait]
47#[typetag::serde]
48impl State for RepartitionStart {
49    async fn next(
50        &mut self,
51        ctx: &mut Context,
52        _: &ProcedureContext,
53    ) -> Result<(Box<dyn State>, Status)> {
54        let (_, table_route) = ctx
55            .table_metadata_manager
56            .table_route_manager()
57            .get_physical_table_route(ctx.persistent_ctx.table_id)
58            .await
59            .context(error::TableMetadataManagerSnafu)?;
60
61        let plans = Self::build_plan(&table_route, &self.from_exprs, &self.to_exprs)?;
62
63        if plans.is_empty() {
64            return Ok((Box::new(RepartitionEnd), Status::done()));
65        }
66
67        Ok((
68            Box::new(AllocateRegion::new(plans)),
69            Status::executing(false),
70        ))
71    }
72
73    fn as_any(&self) -> &dyn Any {
74        self
75    }
76}
77
78impl RepartitionStart {
79    #[allow(dead_code)]
80    fn build_plan(
81        physical_route: &PhysicalTableRouteValue,
82        from_exprs: &[PartitionExpr],
83        to_exprs: &[PartitionExpr],
84    ) -> Result<Vec<AllocationPlanEntry>> {
85        let subtasks = subtask::create_subtasks(from_exprs, to_exprs)
86            .context(error::RepartitionCreateSubtasksSnafu)?;
87        if subtasks.is_empty() {
88            return Ok(vec![]);
89        }
90
91        let src_descriptors = Self::source_region_descriptors(from_exprs, physical_route)?;
92        Ok(Self::build_plan_entries(
93            subtasks,
94            &src_descriptors,
95            to_exprs,
96        ))
97    }
98
99    #[allow(dead_code)]
100    fn build_plan_entries(
101        subtasks: Vec<RepartitionSubtask>,
102        source_index: &[RegionDescriptor],
103        target_exprs: &[PartitionExpr],
104    ) -> Vec<AllocationPlanEntry> {
105        subtasks
106            .into_iter()
107            .map(|subtask| {
108                let group_id = Uuid::new_v4();
109                let source_regions = subtask
110                    .from_expr_indices
111                    .iter()
112                    .map(|&idx| source_index[idx].clone())
113                    .collect::<Vec<_>>();
114
115                let target_partition_exprs = subtask
116                    .to_expr_indices
117                    .iter()
118                    .map(|&idx| target_exprs[idx].clone())
119                    .collect::<Vec<_>>();
120                let regions_to_allocate = target_partition_exprs
121                    .len()
122                    .saturating_sub(source_regions.len());
123                let regions_to_deallocate = source_regions
124                    .len()
125                    .saturating_sub(target_partition_exprs.len());
126                AllocationPlanEntry {
127                    group_id,
128                    source_regions,
129                    target_partition_exprs,
130                    regions_to_allocate,
131                    regions_to_deallocate,
132                }
133            })
134            .collect::<Vec<_>>()
135    }
136
137    fn source_region_descriptors(
138        from_exprs: &[PartitionExpr],
139        physical_route: &PhysicalTableRouteValue,
140    ) -> Result<Vec<RegionDescriptor>> {
141        let existing_regions = physical_route
142            .region_routes
143            .iter()
144            .map(|route| (route.region.id, route.region.partition_expr()))
145            .collect::<Vec<_>>();
146
147        let descriptors = from_exprs
148            .iter()
149            .map(|expr| {
150                let expr_json = expr
151                    .as_json_str()
152                    .context(error::SerializePartitionExprSnafu)?;
153
154                let matched_region_id = existing_regions
155                    .iter()
156                    .find_map(|(region_id, existing_expr)| {
157                        (existing_expr == &expr_json).then_some(*region_id)
158                    })
159                    .with_context(|| error::RepartitionSourceExprMismatchSnafu {
160                        expr: expr_json,
161                    })?;
162
163                Ok(RegionDescriptor {
164                    region_id: matched_region_id,
165                    partition_expr: expr.clone(),
166                })
167            })
168            .collect::<Result<Vec<_>>>()?;
169
170        Ok(descriptors)
171    }
172}