Skip to main content

meta_srv/procedure/repartition/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use common_error::ext::BoxedError;
18use common_meta::key::TableMetadataManagerRef;
19use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue};
20use common_meta::rpc::router::RegionRoute;
21use common_meta::wal_provider::RegionWalOptions;
22use snafu::{OptionExt, ResultExt, ensure};
23use store_api::storage::{RegionId, RegionNumber, TableId};
24
25use crate::error::{self, Result};
26use crate::procedure::repartition::group::GroupId;
27use crate::procedure::repartition::plan::SourceRegionDescriptor;
28
29/// Returns the `datanode_table_value`
30///
31/// Retry:
32/// - Failed to retrieve the metadata of datanode table.
33pub async fn get_datanode_table_value(
34    table_metadata_manager: &TableMetadataManagerRef,
35    table_id: TableId,
36    datanode_id: u64,
37) -> Result<DatanodeTableValue> {
38    let datanode_table_value = table_metadata_manager
39        .datanode_table_manager()
40        .get(&DatanodeTableKey {
41            datanode_id,
42            table_id,
43        })
44        .await
45        .context(error::TableMetadataManagerSnafu)
46        .map_err(BoxedError::new)
47        .with_context(|_| error::RetryLaterWithSourceSnafu {
48            reason: format!("Failed to get DatanodeTable: {table_id}"),
49        })?
50        .context(error::DatanodeTableNotFoundSnafu {
51            table_id,
52            datanode_id,
53        })?;
54    Ok(datanode_table_value)
55}
56
57/// Merges and validates region WAL options for repartition.
58///
59/// This function:
60/// 1. Validates that new WAL options don't overwrite existing ones
61/// 2. Merges existing `region_wal_options` with new `new_region_wal_options`
62/// 3. Filters out WAL options for regions that are not in `new_region_routes`
63/// 4. Validates that every region in `new_region_routes` has a corresponding WAL option
64///
65/// # Arguments
66/// * `region_wal_options` - Existing region WAL options from datanode table
67/// * `new_region_wal_options` - New region WAL options to merge (should only contain newly allocated regions)
68/// * `new_region_routes` - The new region routes after repartition
69/// * `table_id` - Table ID for error reporting
70///
71/// # Returns
72/// Returns the merged and filtered WAL options, ensuring all regions have options.
73///
74/// # Errors
75/// Returns an error if:
76/// - New WAL options try to overwrite existing ones for the same region
77/// - Any region in `new_region_routes` is missing a WAL option
78pub fn merge_and_validate_region_wal_options(
79    region_wal_options: &RegionWalOptions,
80    mut new_region_wal_options: RegionWalOptions,
81    new_region_routes: &[RegionRoute],
82    table_id: TableId,
83) -> Result<RegionWalOptions> {
84    // Doesn't allow overwriting existing WAL options.
85    for (region_number, _) in new_region_wal_options.iter() {
86        if region_wal_options.contains_key(region_number) {
87            return error::UnexpectedSnafu {
88                violated: format!(
89                    "Overwriting existing WAL option for region: {}",
90                    RegionId::new(table_id, *region_number)
91                ),
92            }
93            .fail();
94        }
95    }
96
97    new_region_wal_options.extend(region_wal_options.clone());
98
99    // Extract region numbers from new routes
100    let region_numbers: HashSet<RegionNumber> = new_region_routes
101        .iter()
102        .map(|r| r.region.id.region_number())
103        .collect();
104
105    // Filter out WAL options for regions that are not in new_region_routes
106    new_region_wal_options.retain(|k, _| region_numbers.contains(k));
107
108    // Validate that every region has a WAL option
109    ensure!(
110        region_numbers.len() == new_region_wal_options.len(),
111        error::UnexpectedSnafu {
112            violated: format!(
113                "Mismatch between number of region_numbers ({}) and new_region_wal_options ({}) for table: {}",
114                region_numbers.len(),
115                new_region_wal_options.len(),
116                table_id
117            ),
118        }
119    );
120
121    Ok(new_region_wal_options)
122}
123
124/// Restores group staging metadata in-place for parent repartition rollback.
125///
126/// This helper lives in repartition utilities instead of the group subprocedure
127/// because parent repartition owns crash recovery and rollback selection.
128///
129/// The function mutates `region_routes` in place to avoid rebuilding the route
130/// vector for each selected plan. It restores:
131/// - source-region leader staging flags,
132/// - merge-source `ignore_all_writes` markers for pending-deallocate sources,
133/// - target-region partition expressions,
134/// - target-region write-route policies,
135/// - target-region leader staging flags.
136///
137/// `original_target_routes` contains only pre-existing target routes.
138/// Newly allocated targets are removed by parent rollback instead of being
139/// restored here.
140pub fn rollback_group_metadata_routes(
141    group_id: GroupId,
142    source_regions: &[SourceRegionDescriptor],
143    original_target_routes: &[RegionRoute],
144    allocated_region_ids: &[RegionId],
145    pending_deallocate_region_ids: &[RegionId],
146    region_routes_map: &mut HashMap<RegionId, &mut RegionRoute>,
147) -> Result<()> {
148    for source in source_regions {
149        let region_id = source.region_id();
150        let region_route = region_routes_map.get_mut(&region_id).context(
151            error::RepartitionSourceRegionMissingSnafu {
152                group_id,
153                region_id,
154            },
155        )?;
156        region_route.clear_leader_staging();
157        region_route.region.partition_expr = source.route_expr_for_rollback()?;
158        if pending_deallocate_region_ids.contains(&region_id) {
159            region_route.clear_ignore_all_writes();
160        }
161    }
162
163    for target in original_target_routes {
164        let Some(region_route) = region_routes_map.get_mut(&target.region.id) else {
165            // Ignores newly allocated region routes that do not exist in the current region routes.
166            // They may have already been deleted (to ensure idempotency).
167            if allocated_region_ids.contains(&target.region.id) {
168                continue;
169            }
170
171            return error::RepartitionTargetRegionMissingSnafu {
172                group_id,
173                region_id: target.region.id,
174            }
175            .fail();
176        };
177        region_route.region.partition_expr = target.region.partition_expr.clone();
178        region_route.write_route_policy = target.write_route_policy;
179        region_route.clear_leader_staging();
180    }
181
182    Ok(())
183}
184
185#[cfg(test)]
186mod tests {
187    use std::collections::HashSet;
188
189    use common_meta::peer::Peer;
190    use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
191    use common_wal::options::{KafkaWalOptions, WalOptions};
192    use store_api::storage::RegionId;
193    use uuid::Uuid;
194
195    use super::*;
196    use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
197    use crate::procedure::repartition::plan::{SourceRegionDescriptor, TargetRegionDescriptor};
198    use crate::procedure::repartition::test_util::range_expr;
199
200    /// Helper function to create a Kafka WAL option from a topic name.
201    fn kafka_wal_option(topic: &str) -> WalOptions {
202        WalOptions::Kafka(KafkaWalOptions::new(topic.to_string()))
203    }
204
205    fn new_region_route(region_id: u64, datanode_id: u64) -> RegionRoute {
206        RegionRoute {
207            region: Region {
208                id: RegionId::from_u64(region_id),
209                ..Default::default()
210            },
211            leader_peer: Some(Peer::empty(datanode_id)),
212            follower_peers: vec![],
213            leader_state: None,
214            leader_down_since: None,
215            write_route_policy: None,
216        }
217    }
218
219    fn new_staged_region_route(
220        region_id: RegionId,
221        partition_expr: &str,
222        leader_state: Option<LeaderState>,
223        ignore_all_writes: bool,
224    ) -> RegionRoute {
225        let mut route = RegionRoute {
226            region: Region {
227                id: region_id,
228                partition_expr: partition_expr.to_string(),
229                ..Default::default()
230            },
231            leader_peer: Some(Peer::empty(1)),
232            leader_state,
233            ..Default::default()
234        };
235
236        if ignore_all_writes {
237            route.set_ignore_all_writes();
238        }
239
240        route
241    }
242
243    fn original_target_routes(
244        region_routes: &[RegionRoute],
245        targets: &[TargetRegionDescriptor],
246    ) -> Vec<RegionRoute> {
247        let target_ids = targets
248            .iter()
249            .map(|target| target.region_id)
250            .collect::<HashSet<_>>();
251        region_routes
252            .iter()
253            .filter(|route| target_ids.contains(&route.region.id))
254            .cloned()
255            .collect()
256    }
257
258    #[test]
259    fn test_merge_and_validate_region_wal_options_success() {
260        let table_id = 1;
261        let existing_wal_options: RegionWalOptions = vec![
262            (1, kafka_wal_option("topic_1")),
263            (2, kafka_wal_option("topic_2")),
264        ]
265        .into_iter()
266        .collect();
267        let new_wal_options: RegionWalOptions =
268            vec![(3, kafka_wal_option("topic_3"))].into_iter().collect();
269        let new_region_routes = vec![
270            new_region_route(1, 1),
271            new_region_route(2, 2),
272            new_region_route(3, 3),
273        ];
274        let result = merge_and_validate_region_wal_options(
275            &existing_wal_options,
276            new_wal_options,
277            &new_region_routes,
278            table_id,
279        )
280        .unwrap();
281
282        // Should have all three regions
283        assert_eq!(result.len(), 3);
284        assert!(result.contains_key(&1));
285        assert!(result.contains_key(&2));
286        assert!(result.contains_key(&3));
287        // Existing options should be preserved
288        assert_eq!(result.get(&1).unwrap(), &kafka_wal_option("topic_1"));
289        assert_eq!(result.get(&2).unwrap(), &kafka_wal_option("topic_2"));
290        // New option should be present
291        assert_eq!(result.get(&3).unwrap(), &kafka_wal_option("topic_3"));
292    }
293
294    #[test]
295    fn test_merge_and_validate_region_wal_options_new_overrides_existing() {
296        let table_id = 1;
297        let existing_wal_options: RegionWalOptions = vec![(1, kafka_wal_option("topic_1_old"))]
298            .into_iter()
299            .collect();
300        let new_wal_options: RegionWalOptions = vec![(1, kafka_wal_option("topic_1_new"))]
301            .into_iter()
302            .collect();
303        let new_region_routes = vec![new_region_route(1, 1)];
304        merge_and_validate_region_wal_options(
305            &existing_wal_options,
306            new_wal_options,
307            &new_region_routes,
308            table_id,
309        )
310        .unwrap_err();
311    }
312
313    #[test]
314    fn test_merge_and_validate_region_wal_options_filters_removed_regions() {
315        let table_id = 1;
316        let existing_wal_options: RegionWalOptions = vec![
317            (1, kafka_wal_option("topic_1")),
318            (2, kafka_wal_option("topic_2")),
319            (3, kafka_wal_option("topic_3")),
320        ]
321        .into_iter()
322        .collect();
323        let new_wal_options = HashMap::new();
324        // Only regions 1 and 2 are in new routes (region 3 removed)
325        let new_region_routes = vec![new_region_route(1, 1), new_region_route(2, 2)];
326        let result = merge_and_validate_region_wal_options(
327            &existing_wal_options,
328            new_wal_options,
329            &new_region_routes,
330            table_id,
331        )
332        .unwrap();
333
334        // Should only have regions 1 and 2 (region 3 filtered out)
335        assert_eq!(result.len(), 2);
336        assert!(result.contains_key(&1));
337        assert!(result.contains_key(&2));
338        assert!(!result.contains_key(&3));
339    }
340
341    #[test]
342    fn test_merge_and_validate_region_wal_options_missing_option() {
343        let table_id = 1;
344        let existing_wal_options: RegionWalOptions =
345            vec![(1, kafka_wal_option("topic_1"))].into_iter().collect();
346        let new_wal_options = HashMap::new();
347        // Region 2 is in routes but has no WAL option
348        let new_region_routes = vec![new_region_route(1, 1), new_region_route(2, 2)];
349        let result = merge_and_validate_region_wal_options(
350            &existing_wal_options,
351            new_wal_options,
352            &new_region_routes,
353            table_id,
354        );
355        // Should fail validation
356        assert!(result.is_err());
357        let error_msg = result.unwrap_err().to_string();
358        assert!(error_msg.contains("Mismatch"));
359        assert!(error_msg.contains(&table_id.to_string()));
360    }
361
362    #[test]
363    fn test_rollback_group_metadata_routes_split_case() {
364        let group_id = Uuid::new_v4();
365        let table_id = 1024;
366        let original_region_routes = vec![
367            new_staged_region_route(
368                RegionId::new(table_id, 1),
369                &range_expr("x", 0, 100).as_json_str().unwrap(),
370                None,
371                false,
372            ),
373            new_staged_region_route(
374                RegionId::new(table_id, 2),
375                &range_expr("x", 100, 200).as_json_str().unwrap(),
376                None,
377                false,
378            ),
379            new_staged_region_route(RegionId::new(table_id, 3), "", None, false),
380        ];
381        let sources = vec![SourceRegionDescriptor::partitioned(
382            RegionId::new(table_id, 1),
383            range_expr("x", 0, 100),
384        )];
385        let targets = vec![
386            TargetRegionDescriptor {
387                region_id: RegionId::new(table_id, 1),
388                partition_expr: range_expr("x", 0, 50),
389            },
390            TargetRegionDescriptor {
391                region_id: RegionId::new(table_id, 3),
392                partition_expr: range_expr("x", 50, 100),
393            },
394        ];
395        let mut applied_region_routes = UpdateMetadata::apply_staging_region_routes(
396            group_id,
397            &sources,
398            &targets,
399            &[],
400            &original_region_routes,
401        )
402        .unwrap();
403        let target_routes = original_target_routes(&original_region_routes, &targets);
404
405        rollback_group_metadata_routes(
406            group_id,
407            &sources,
408            &target_routes,
409            &[],
410            &[],
411            &mut applied_region_routes
412                .iter_mut()
413                .map(|route| (route.region.id, route))
414                .collect(),
415        )
416        .unwrap();
417
418        assert_eq!(applied_region_routes, original_region_routes);
419    }
420
421    #[test]
422    fn test_rollback_group_metadata_routes_default_source_restores_empty_expr() {
423        let group_id = Uuid::new_v4();
424        let table_id = 1024;
425        let default_region_id = RegionId::new(table_id, 1);
426        let allocated_region_id = RegionId::new(table_id, 2);
427        let source_regions = vec![SourceRegionDescriptor::Default {
428            region_id: default_region_id,
429        }];
430        let target_regions = vec![
431            TargetRegionDescriptor {
432                region_id: default_region_id,
433                partition_expr: range_expr("x", 0, 50),
434            },
435            TargetRegionDescriptor {
436                region_id: allocated_region_id,
437                partition_expr: range_expr("x", 50, 100),
438            },
439        ];
440        let current_region_routes = vec![
441            new_staged_region_route(default_region_id, "", None, false),
442            new_staged_region_route(allocated_region_id, "", None, false),
443        ];
444        let original_target_routes = vec![current_region_routes[0].clone()];
445        let mut applied_region_routes = UpdateMetadata::apply_staging_region_routes(
446            group_id,
447            &source_regions,
448            &target_regions,
449            &[],
450            &current_region_routes,
451        )
452        .unwrap();
453        assert_eq!(
454            applied_region_routes[0].region.partition_expr,
455            range_expr("x", 0, 50).as_json_str().unwrap()
456        );
457
458        rollback_group_metadata_routes(
459            group_id,
460            &source_regions,
461            &original_target_routes,
462            &[allocated_region_id],
463            &[],
464            &mut applied_region_routes
465                .iter_mut()
466                .map(|route| (route.region.id, route))
467                .collect(),
468        )
469        .unwrap();
470
471        assert_eq!(applied_region_routes[0].region.partition_expr, "");
472        assert!(!applied_region_routes[0].is_leader_staging());
473    }
474
475    #[test]
476    fn test_rollback_group_metadata_routes_merge_case_is_idempotent() {
477        let group_id = Uuid::new_v4();
478        let table_id = 1024;
479        let original_region_routes = vec![
480            new_staged_region_route(
481                RegionId::new(table_id, 1),
482                &range_expr("x", 0, 100).as_json_str().unwrap(),
483                None,
484                false,
485            ),
486            new_staged_region_route(
487                RegionId::new(table_id, 2),
488                &range_expr("x", 100, 200).as_json_str().unwrap(),
489                None,
490                false,
491            ),
492            new_staged_region_route(
493                RegionId::new(table_id, 3),
494                &range_expr("x", 200, 300).as_json_str().unwrap(),
495                None,
496                false,
497            ),
498        ];
499        let sources = vec![
500            SourceRegionDescriptor::partitioned(
501                RegionId::new(table_id, 1),
502                range_expr("x", 0, 100),
503            ),
504            SourceRegionDescriptor::partitioned(
505                RegionId::new(table_id, 2),
506                range_expr("x", 100, 200),
507            ),
508        ];
509        let targets = vec![TargetRegionDescriptor {
510            region_id: RegionId::new(table_id, 1),
511            partition_expr: range_expr("x", 0, 200),
512        }];
513        let target_routes = original_target_routes(&original_region_routes, &targets);
514        let mut once = UpdateMetadata::apply_staging_region_routes(
515            group_id,
516            &sources,
517            &targets,
518            &[RegionId::new(table_id, 2)],
519            &original_region_routes,
520        )
521        .unwrap();
522
523        rollback_group_metadata_routes(
524            group_id,
525            &sources,
526            &target_routes,
527            &[],
528            &[RegionId::new(table_id, 2)],
529            &mut once
530                .iter_mut()
531                .map(|route| (route.region.id, route))
532                .collect(),
533        )
534        .unwrap();
535        let mut twice = once.clone();
536        rollback_group_metadata_routes(
537            group_id,
538            &sources,
539            &target_routes,
540            &[],
541            &[RegionId::new(table_id, 2)],
542            &mut twice
543                .iter_mut()
544                .map(|route| (route.region.id, route))
545                .collect(),
546        )
547        .unwrap();
548
549        assert_eq!(once, original_region_routes);
550        assert_eq!(once, twice);
551    }
552}