Skip to main content

meta_srv/procedure/repartition/group/
repartition_start.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::{HashMap, HashSet};
17
18use common_meta::rpc::router::RegionRoute;
19use common_procedure::{Context as ProcedureContext, Status};
20use common_telemetry::debug;
21use serde::{Deserialize, Serialize};
22use snafu::{OptionExt, ensure};
23
24use crate::error::{self, Result};
25use crate::procedure::repartition::group::sync_region::SyncRegion;
26use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
27use crate::procedure::repartition::group::{
28    Context, GroupId, GroupPrepareResult, State, region_routes,
29};
30use crate::procedure::repartition::plan::{SourceRegionDescriptor, TargetRegionDescriptor};
31
32#[derive(Debug, Serialize, Deserialize)]
33pub struct RepartitionStart;
34
35/// Ensures that the partition expression of the source region route matches the source descriptor.
36fn ensure_source_region_route_expr_match(
37    region_route: &RegionRoute,
38    source: &SourceRegionDescriptor,
39) -> Result<RegionRoute> {
40    let actual = region_route.region.partition_expr();
41    let expected = source.route_expr_for_rollback()?;
42    ensure!(
43        actual == expected,
44        error::PartitionExprMismatchSnafu {
45            region_id: region_route.region.id,
46            expected,
47            actual,
48        }
49    );
50    Ok(region_route.clone())
51}
52
53impl RepartitionStart {
54    /// Ensures that both source and target regions are present in the region routes.
55    ///
56    /// Both source and target regions must be present in the region routes (target regions should be allocated before repartitioning).
57    fn ensure_route_present(
58        group_id: GroupId,
59        region_routes: &[RegionRoute],
60        sources: &[SourceRegionDescriptor],
61        targets: &[TargetRegionDescriptor],
62    ) -> Result<GroupPrepareResult> {
63        ensure!(
64            !sources.is_empty(),
65            error::UnexpectedSnafu {
66                violated: "Sources are empty"
67            }
68        );
69
70        let region_routes_map = region_routes
71            .iter()
72            .map(|r| (r.region.id, r))
73            .collect::<HashMap<_, _>>();
74        let source_region_routes = sources
75            .iter()
76            .map(|s| {
77                region_routes_map
78                    .get(&s.region_id())
79                    .context(error::RepartitionSourceRegionMissingSnafu {
80                        group_id,
81                        region_id: s.region_id(),
82                    })
83                    .and_then(|r| ensure_source_region_route_expr_match(r, s))
84            })
85            .collect::<Result<Vec<_>>>()?;
86        let target_region_routes = targets
87            .iter()
88            .map(|t| {
89                region_routes_map
90                    .get(&t.region_id)
91                    .context(error::RepartitionTargetRegionMissingSnafu {
92                        group_id,
93                        region_id: t.region_id,
94                    })
95                    .map(|r| (*r).clone())
96            })
97            .collect::<Result<Vec<_>>>()?;
98        for target_region_route in &target_region_routes {
99            ensure!(
100                target_region_route.leader_peer.is_some(),
101                error::UnexpectedSnafu {
102                    violated: format!(
103                        "Leader peer is not set for region: {}",
104                        target_region_route.region.id
105                    ),
106                }
107            );
108        }
109        let central_region = sources[0].region_id();
110        let central_region_datanode = source_region_routes[0]
111            .leader_peer
112            .as_ref()
113            .context(error::UnexpectedSnafu {
114                violated: format!(
115                    "Leader peer is not set for central region: {}",
116                    central_region
117                ),
118            })?
119            .clone();
120
121        Ok(GroupPrepareResult {
122            source_routes: source_region_routes,
123            target_routes: target_region_routes,
124            central_region,
125            central_region_datanode,
126        })
127    }
128}
129
130#[async_trait::async_trait]
131#[typetag::serde]
132impl State for RepartitionStart {
133    /// Captures the group prepare result.
134    ///
135    /// Retry:
136    /// - Failed to get the table route.
137    ///
138    /// Abort
139    /// - Table route not found.
140    /// - Table route is not physical.
141    /// - Failed to ensure the route is present.
142    /// - Failed to capture the group prepare result.
143    async fn next(
144        &mut self,
145        ctx: &mut Context,
146        _procedure_ctx: &ProcedureContext,
147    ) -> Result<(Box<dyn State>, Status)> {
148        if ctx.persistent_ctx.group_prepare_result.is_some() {
149            return Ok((
150                Box::new(UpdateMetadata::ApplyStaging),
151                Status::executing(true),
152            ));
153        }
154        let table_id = ctx.persistent_ctx.table_id;
155        let group_id = ctx.persistent_ctx.group_id;
156        let table_route_value = ctx.get_table_route_value().await?.into_inner();
157        let region_routes = region_routes(table_id, &table_route_value)?;
158        let group_prepare_result = Self::ensure_route_present(
159            group_id,
160            region_routes,
161            &ctx.persistent_ctx.sources,
162            &ctx.persistent_ctx.targets,
163        )?;
164        ctx.persistent_ctx.group_prepare_result = Some(group_prepare_result);
165        debug!(
166            "Repartition group {}: captured {} sources, {} targets",
167            group_id,
168            ctx.persistent_ctx.sources.len(),
169            ctx.persistent_ctx.targets.len()
170        );
171
172        if ctx.persistent_ctx.sync_region {
173            let prepare_result = ctx.persistent_ctx.group_prepare_result.as_ref().unwrap();
174            let allocated_region_ids: HashSet<_> = ctx
175                .persistent_ctx
176                .allocated_region_ids
177                .iter()
178                .copied()
179                .collect();
180            let region_routes: Vec<_> = prepare_result
181                .target_routes
182                .iter()
183                .filter(|route| allocated_region_ids.contains(&route.region.id))
184                .cloned()
185                .collect();
186            if !region_routes.is_empty() {
187                return Ok((
188                    Box::new(SyncRegion { region_routes }),
189                    Status::executing(true),
190                ));
191            }
192        }
193
194        Ok((
195            Box::new(UpdateMetadata::ApplyStaging),
196            Status::executing(true),
197        ))
198    }
199
200    fn as_any(&self) -> &dyn Any {
201        self
202    }
203}
204
205#[cfg(test)]
206mod tests {
207    use std::assert_matches;
208
209    use common_meta::peer::Peer;
210    use common_meta::rpc::router::{Region, RegionRoute};
211    use store_api::storage::RegionId;
212    use uuid::Uuid;
213
214    use crate::error::Error;
215    use crate::procedure::repartition::group::repartition_start::RepartitionStart;
216    use crate::procedure::repartition::plan::{SourceRegionDescriptor, TargetRegionDescriptor};
217    use crate::procedure::repartition::test_util::range_expr;
218
219    #[test]
220    fn test_ensure_route_present_missing_source_region() {
221        let source_region =
222            SourceRegionDescriptor::partitioned(RegionId::new(1024, 1), range_expr("x", 0, 100));
223        let target_region = TargetRegionDescriptor {
224            region_id: RegionId::new(1024, 2),
225            partition_expr: range_expr("x", 0, 10),
226        };
227        let region_routes = vec![RegionRoute {
228            region: Region {
229                id: RegionId::new(1024, 2),
230                ..Default::default()
231            },
232            leader_peer: Some(Peer::empty(1)),
233            ..Default::default()
234        }];
235        let err = RepartitionStart::ensure_route_present(
236            Uuid::new_v4(),
237            &region_routes,
238            &[source_region],
239            &[target_region],
240        )
241        .unwrap_err();
242        assert_matches!(err, Error::RepartitionSourceRegionMissing { .. });
243    }
244
245    #[test]
246    fn test_ensure_route_present_partition_expr_mismatch() {
247        let source_region =
248            SourceRegionDescriptor::partitioned(RegionId::new(1024, 1), range_expr("x", 0, 100));
249        let target_region = TargetRegionDescriptor {
250            region_id: RegionId::new(1024, 2),
251            partition_expr: range_expr("x", 0, 10),
252        };
253        let region_routes = vec![RegionRoute {
254            region: Region {
255                id: RegionId::new(1024, 1),
256                partition_expr: range_expr("x", 0, 5).as_json_str().unwrap(),
257                ..Default::default()
258            },
259            leader_peer: Some(Peer::empty(1)),
260            ..Default::default()
261        }];
262        let err = RepartitionStart::ensure_route_present(
263            Uuid::new_v4(),
264            &region_routes,
265            &[source_region],
266            &[target_region],
267        )
268        .unwrap_err();
269        assert_matches!(err, Error::PartitionExprMismatch { .. });
270    }
271
272    #[test]
273    fn test_ensure_route_present_default_source_matches_empty_partition_expr() {
274        let source_region = SourceRegionDescriptor::Default {
275            region_id: RegionId::new(1024, 1),
276        };
277        let target_region = TargetRegionDescriptor {
278            region_id: RegionId::new(1024, 1),
279            partition_expr: range_expr("x", 0, 10),
280        };
281        let region_routes = vec![RegionRoute {
282            region: Region {
283                id: RegionId::new(1024, 1),
284                partition_expr: String::new(),
285                ..Default::default()
286            },
287            leader_peer: Some(Peer::empty(1)),
288            ..Default::default()
289        }];
290
291        let result = RepartitionStart::ensure_route_present(
292            Uuid::new_v4(),
293            &region_routes,
294            &[source_region],
295            &[target_region],
296        );
297
298        assert!(result.is_ok());
299    }
300
301    #[test]
302    fn test_ensure_route_present_default_source_rejects_non_empty_partition_expr() {
303        let source_region = SourceRegionDescriptor::Default {
304            region_id: RegionId::new(1024, 1),
305        };
306        let target_region = TargetRegionDescriptor {
307            region_id: RegionId::new(1024, 1),
308            partition_expr: range_expr("x", 0, 10),
309        };
310        let region_routes = vec![RegionRoute {
311            region: Region {
312                id: RegionId::new(1024, 1),
313                partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(),
314                ..Default::default()
315            },
316            leader_peer: Some(Peer::empty(1)),
317            ..Default::default()
318        }];
319
320        let err = RepartitionStart::ensure_route_present(
321            Uuid::new_v4(),
322            &region_routes,
323            &[source_region],
324            &[target_region],
325        )
326        .unwrap_err();
327
328        assert_matches!(err, Error::PartitionExprMismatch { .. });
329    }
330
331    #[test]
332    fn test_ensure_route_present_missing_target_region() {
333        let source_region =
334            SourceRegionDescriptor::partitioned(RegionId::new(1024, 1), range_expr("x", 0, 100));
335        let target_region = TargetRegionDescriptor {
336            region_id: RegionId::new(1024, 2),
337            partition_expr: range_expr("x", 0, 10),
338        };
339        let region_routes = vec![RegionRoute {
340            region: Region {
341                id: RegionId::new(1024, 1),
342                partition_expr: range_expr("x", 0, 100).as_json_str().unwrap(),
343                ..Default::default()
344            },
345            leader_peer: Some(Peer::empty(1)),
346            ..Default::default()
347        }];
348        let err = RepartitionStart::ensure_route_present(
349            Uuid::new_v4(),
350            &region_routes,
351            &[source_region],
352            &[target_region],
353        )
354        .unwrap_err();
355        assert_matches!(err, Error::RepartitionTargetRegionMissing { .. });
356    }
357
358    #[test]
359    fn test_ensure_route_present_legacy_partition_expr_source() {
360        let source_region =
361            SourceRegionDescriptor::partitioned(RegionId::new(1024, 1), range_expr("x", 0, 100));
362        let target_region = TargetRegionDescriptor {
363            region_id: RegionId::new(1024, 2),
364            partition_expr: range_expr("x", 0, 10),
365        };
366        let legacy_partition_expr = range_expr("x", 0, 100).as_json_str().unwrap();
367        let legacy_region_json = serde_json::json!({
368            "id": RegionId::new(1024, 1).as_u64(),
369            "name": "",
370            "partition": {
371                "column_list": ["x"],
372                "value_list": [legacy_partition_expr]
373            },
374            "partition_expr": "",
375            "attrs": {}
376        });
377
378        let region_routes = vec![
379            RegionRoute {
380                region: serde_json::from_value(legacy_region_json).unwrap(),
381                leader_peer: Some(Peer::empty(1)),
382                ..Default::default()
383            },
384            RegionRoute {
385                region: Region {
386                    id: RegionId::new(1024, 2),
387                    ..Default::default()
388                },
389                leader_peer: Some(Peer::empty(1)),
390                ..Default::default()
391            },
392        ];
393
394        let result = RepartitionStart::ensure_route_present(
395            Uuid::new_v4(),
396            &region_routes,
397            &[source_region],
398            &[target_region],
399        );
400        assert!(result.is_ok());
401    }
402}