Skip to main content

meta_srv/procedure/repartition/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use common_error::ext::BoxedError;
18use common_meta::key::TableMetadataManagerRef;
19use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue};
20use common_meta::rpc::router::RegionRoute;
21use snafu::{OptionExt, ResultExt, ensure};
22use store_api::storage::{RegionId, RegionNumber, TableId};
23
24use crate::error::{self, Result};
25use crate::procedure::repartition::group::GroupId;
26use crate::procedure::repartition::plan::RegionDescriptor;
27
28/// Returns the `datanode_table_value`
29///
30/// Retry:
31/// - Failed to retrieve the metadata of datanode table.
32pub async fn get_datanode_table_value(
33    table_metadata_manager: &TableMetadataManagerRef,
34    table_id: TableId,
35    datanode_id: u64,
36) -> Result<DatanodeTableValue> {
37    let datanode_table_value = table_metadata_manager
38        .datanode_table_manager()
39        .get(&DatanodeTableKey {
40            datanode_id,
41            table_id,
42        })
43        .await
44        .context(error::TableMetadataManagerSnafu)
45        .map_err(BoxedError::new)
46        .with_context(|_| error::RetryLaterWithSourceSnafu {
47            reason: format!("Failed to get DatanodeTable: {table_id}"),
48        })?
49        .context(error::DatanodeTableNotFoundSnafu {
50            table_id,
51            datanode_id,
52        })?;
53    Ok(datanode_table_value)
54}
55
56/// Merges and validates region WAL options for repartition.
57///
58/// This function:
59/// 1. Validates that new WAL options don't overwrite existing ones
60/// 2. Merges existing `region_wal_options` with new `new_region_wal_options`
61/// 3. Filters out WAL options for regions that are not in `new_region_routes`
62/// 4. Validates that every region in `new_region_routes` has a corresponding WAL option
63///
64/// # Arguments
65/// * `region_wal_options` - Existing region WAL options from datanode table
66/// * `new_region_wal_options` - New region WAL options to merge (should only contain newly allocated regions)
67/// * `new_region_routes` - The new region routes after repartition
68/// * `table_id` - Table ID for error reporting
69///
70/// # Returns
71/// Returns the merged and filtered WAL options, ensuring all regions have options.
72///
73/// # Errors
74/// Returns an error if:
75/// - New WAL options try to overwrite existing ones for the same region
76/// - Any region in `new_region_routes` is missing a WAL option
77pub fn merge_and_validate_region_wal_options(
78    region_wal_options: &HashMap<RegionNumber, String>,
79    mut new_region_wal_options: HashMap<RegionNumber, String>,
80    new_region_routes: &[RegionRoute],
81    table_id: TableId,
82) -> Result<HashMap<RegionNumber, String>> {
83    // Doesn't allow overwriting existing WAL options.
84    for (region_number, _) in new_region_wal_options.iter() {
85        if region_wal_options.contains_key(region_number) {
86            return error::UnexpectedSnafu {
87                violated: format!(
88                    "Overwriting existing WAL option for region: {}",
89                    RegionId::new(table_id, *region_number)
90                ),
91            }
92            .fail();
93        }
94    }
95
96    new_region_wal_options.extend(region_wal_options.clone());
97
98    // Extract region numbers from new routes
99    let region_numbers: HashSet<RegionNumber> = new_region_routes
100        .iter()
101        .map(|r| r.region.id.region_number())
102        .collect();
103
104    // Filter out WAL options for regions that are not in new_region_routes
105    new_region_wal_options.retain(|k, _| region_numbers.contains(k));
106
107    // Validate that every region has a WAL option
108    ensure!(
109        region_numbers.len() == new_region_wal_options.len(),
110        error::UnexpectedSnafu {
111            violated: format!(
112                "Mismatch between number of region_numbers ({}) and new_region_wal_options ({}) for table: {}",
113                region_numbers.len(),
114                new_region_wal_options.len(),
115                table_id
116            ),
117        }
118    );
119
120    Ok(new_region_wal_options)
121}
122
123/// Restores group staging metadata in-place for parent repartition rollback.
124///
125/// This helper lives in repartition utilities instead of the group subprocedure
126/// because parent repartition owns crash recovery and rollback selection.
127///
128/// The function mutates `region_routes` in place to avoid rebuilding the route
129/// vector for each selected plan. It restores:
130/// - source-region leader staging flags,
131/// - merge-source `ignore_all_writes` markers for pending-deallocate sources,
132/// - target-region partition expressions,
133/// - target-region write-route policies,
134/// - target-region leader staging flags.
135///
136/// `original_target_routes` contains only pre-existing target routes.
137/// Newly allocated targets are removed by parent rollback instead of being
138/// restored here.
139pub fn rollback_group_metadata_routes(
140    group_id: GroupId,
141    source_regions: &[RegionDescriptor],
142    original_target_routes: &[RegionRoute],
143    allocated_region_ids: &[RegionId],
144    pending_deallocate_region_ids: &[RegionId],
145    region_routes_map: &mut HashMap<RegionId, &mut RegionRoute>,
146) -> Result<()> {
147    for source in source_regions {
148        let region_route = region_routes_map.get_mut(&source.region_id).context(
149            error::RepartitionSourceRegionMissingSnafu {
150                group_id,
151                region_id: source.region_id,
152            },
153        )?;
154        region_route.clear_leader_staging();
155        if pending_deallocate_region_ids.contains(&source.region_id) {
156            region_route.clear_ignore_all_writes();
157        }
158    }
159
160    for target in original_target_routes {
161        let Some(region_route) = region_routes_map.get_mut(&target.region.id) else {
162            // Ignores newly allocated region routes that do not exist in the current region routes.
163            // They may have already been deleted (to ensure idempotency).
164            if allocated_region_ids.contains(&target.region.id) {
165                continue;
166            }
167
168            return error::RepartitionTargetRegionMissingSnafu {
169                group_id,
170                region_id: target.region.id,
171            }
172            .fail();
173        };
174        region_route.region.partition_expr = target.region.partition_expr.clone();
175        region_route.write_route_policy = target.write_route_policy;
176        region_route.clear_leader_staging();
177    }
178
179    Ok(())
180}
181
182#[cfg(test)]
183mod tests {
184    use std::collections::HashSet;
185
186    use common_meta::peer::Peer;
187    use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
188    use common_wal::options::{KafkaWalOptions, WalOptions};
189    use store_api::storage::RegionId;
190    use uuid::Uuid;
191
192    use super::*;
193    use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
194    use crate::procedure::repartition::plan::RegionDescriptor;
195    use crate::procedure::repartition::test_util::range_expr;
196
197    /// Helper function to create a Kafka WAL option string from a topic name.
198    fn kafka_wal_option(topic: &str) -> String {
199        serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
200            topic: topic.to_string(),
201        }))
202        .unwrap()
203    }
204
205    fn new_region_route(region_id: u64, datanode_id: u64) -> RegionRoute {
206        RegionRoute {
207            region: Region {
208                id: RegionId::from_u64(region_id),
209                ..Default::default()
210            },
211            leader_peer: Some(Peer::empty(datanode_id)),
212            follower_peers: vec![],
213            leader_state: None,
214            leader_down_since: None,
215            write_route_policy: None,
216        }
217    }
218
219    fn new_staged_region_route(
220        region_id: RegionId,
221        partition_expr: &str,
222        leader_state: Option<LeaderState>,
223        ignore_all_writes: bool,
224    ) -> RegionRoute {
225        let mut route = RegionRoute {
226            region: Region {
227                id: region_id,
228                partition_expr: partition_expr.to_string(),
229                ..Default::default()
230            },
231            leader_peer: Some(Peer::empty(1)),
232            leader_state,
233            ..Default::default()
234        };
235
236        if ignore_all_writes {
237            route.set_ignore_all_writes();
238        }
239
240        route
241    }
242
243    fn original_target_routes(
244        region_routes: &[RegionRoute],
245        targets: &[RegionDescriptor],
246    ) -> Vec<RegionRoute> {
247        let target_ids = targets
248            .iter()
249            .map(|target| target.region_id)
250            .collect::<HashSet<_>>();
251        region_routes
252            .iter()
253            .filter(|route| target_ids.contains(&route.region.id))
254            .cloned()
255            .collect()
256    }
257
258    #[test]
259    fn test_merge_and_validate_region_wal_options_success() {
260        let table_id = 1;
261        let existing_wal_options: HashMap<RegionNumber, String> = vec![
262            (1, kafka_wal_option("topic_1")),
263            (2, kafka_wal_option("topic_2")),
264        ]
265        .into_iter()
266        .collect();
267        let new_wal_options: HashMap<RegionNumber, String> =
268            vec![(3, kafka_wal_option("topic_3"))].into_iter().collect();
269        let new_region_routes = vec![
270            new_region_route(1, 1),
271            new_region_route(2, 2),
272            new_region_route(3, 3),
273        ];
274        let result = merge_and_validate_region_wal_options(
275            &existing_wal_options,
276            new_wal_options,
277            &new_region_routes,
278            table_id,
279        )
280        .unwrap();
281
282        // Should have all three regions
283        assert_eq!(result.len(), 3);
284        assert!(result.contains_key(&1));
285        assert!(result.contains_key(&2));
286        assert!(result.contains_key(&3));
287        // Existing options should be preserved
288        assert_eq!(result.get(&1).unwrap(), &kafka_wal_option("topic_1"));
289        assert_eq!(result.get(&2).unwrap(), &kafka_wal_option("topic_2"));
290        // New option should be present
291        assert_eq!(result.get(&3).unwrap(), &kafka_wal_option("topic_3"));
292    }
293
294    #[test]
295    fn test_merge_and_validate_region_wal_options_new_overrides_existing() {
296        let table_id = 1;
297        let existing_wal_options: HashMap<RegionNumber, String> =
298            vec![(1, kafka_wal_option("topic_1_old"))]
299                .into_iter()
300                .collect();
301        let new_wal_options: HashMap<RegionNumber, String> =
302            vec![(1, kafka_wal_option("topic_1_new"))]
303                .into_iter()
304                .collect();
305        let new_region_routes = vec![new_region_route(1, 1)];
306        merge_and_validate_region_wal_options(
307            &existing_wal_options,
308            new_wal_options,
309            &new_region_routes,
310            table_id,
311        )
312        .unwrap_err();
313    }
314
315    #[test]
316    fn test_merge_and_validate_region_wal_options_filters_removed_regions() {
317        let table_id = 1;
318        let existing_wal_options: HashMap<RegionNumber, String> = vec![
319            (1, kafka_wal_option("topic_1")),
320            (2, kafka_wal_option("topic_2")),
321            (3, kafka_wal_option("topic_3")),
322        ]
323        .into_iter()
324        .collect();
325        let new_wal_options = HashMap::new();
326        // Only regions 1 and 2 are in new routes (region 3 removed)
327        let new_region_routes = vec![new_region_route(1, 1), new_region_route(2, 2)];
328        let result = merge_and_validate_region_wal_options(
329            &existing_wal_options,
330            new_wal_options,
331            &new_region_routes,
332            table_id,
333        )
334        .unwrap();
335
336        // Should only have regions 1 and 2 (region 3 filtered out)
337        assert_eq!(result.len(), 2);
338        assert!(result.contains_key(&1));
339        assert!(result.contains_key(&2));
340        assert!(!result.contains_key(&3));
341    }
342
343    #[test]
344    fn test_merge_and_validate_region_wal_options_missing_option() {
345        let table_id = 1;
346        let existing_wal_options: HashMap<RegionNumber, String> =
347            vec![(1, kafka_wal_option("topic_1"))].into_iter().collect();
348        let new_wal_options = HashMap::new();
349        // Region 2 is in routes but has no WAL option
350        let new_region_routes = vec![new_region_route(1, 1), new_region_route(2, 2)];
351        let result = merge_and_validate_region_wal_options(
352            &existing_wal_options,
353            new_wal_options,
354            &new_region_routes,
355            table_id,
356        );
357        // Should fail validation
358        assert!(result.is_err());
359        let error_msg = result.unwrap_err().to_string();
360        assert!(error_msg.contains("Mismatch"));
361        assert!(error_msg.contains(&table_id.to_string()));
362    }
363
364    #[test]
365    fn test_rollback_group_metadata_routes_split_case() {
366        let group_id = Uuid::new_v4();
367        let table_id = 1024;
368        let original_region_routes = vec![
369            new_staged_region_route(
370                RegionId::new(table_id, 1),
371                &range_expr("x", 0, 100).as_json_str().unwrap(),
372                None,
373                false,
374            ),
375            new_staged_region_route(
376                RegionId::new(table_id, 2),
377                &range_expr("x", 100, 200).as_json_str().unwrap(),
378                None,
379                false,
380            ),
381            new_staged_region_route(RegionId::new(table_id, 3), "", None, false),
382        ];
383        let sources = vec![RegionDescriptor {
384            region_id: RegionId::new(table_id, 1),
385            partition_expr: range_expr("x", 0, 100),
386        }];
387        let targets = vec![
388            RegionDescriptor {
389                region_id: RegionId::new(table_id, 1),
390                partition_expr: range_expr("x", 0, 50),
391            },
392            RegionDescriptor {
393                region_id: RegionId::new(table_id, 3),
394                partition_expr: range_expr("x", 50, 100),
395            },
396        ];
397        let mut applied_region_routes = UpdateMetadata::apply_staging_region_routes(
398            group_id,
399            &sources,
400            &targets,
401            &[],
402            &original_region_routes,
403        )
404        .unwrap();
405        let target_routes = original_target_routes(&original_region_routes, &targets);
406
407        rollback_group_metadata_routes(
408            group_id,
409            &sources,
410            &target_routes,
411            &[],
412            &[],
413            &mut applied_region_routes
414                .iter_mut()
415                .map(|route| (route.region.id, route))
416                .collect(),
417        )
418        .unwrap();
419
420        assert_eq!(applied_region_routes, original_region_routes);
421    }
422
423    #[test]
424    fn test_rollback_group_metadata_routes_merge_case_is_idempotent() {
425        let group_id = Uuid::new_v4();
426        let table_id = 1024;
427        let original_region_routes = vec![
428            new_staged_region_route(
429                RegionId::new(table_id, 1),
430                &range_expr("x", 0, 100).as_json_str().unwrap(),
431                None,
432                false,
433            ),
434            new_staged_region_route(
435                RegionId::new(table_id, 2),
436                &range_expr("x", 100, 200).as_json_str().unwrap(),
437                None,
438                false,
439            ),
440            new_staged_region_route(
441                RegionId::new(table_id, 3),
442                &range_expr("x", 200, 300).as_json_str().unwrap(),
443                None,
444                false,
445            ),
446        ];
447        let sources = vec![
448            RegionDescriptor {
449                region_id: RegionId::new(table_id, 1),
450                partition_expr: range_expr("x", 0, 100),
451            },
452            RegionDescriptor {
453                region_id: RegionId::new(table_id, 2),
454                partition_expr: range_expr("x", 100, 200),
455            },
456        ];
457        let targets = vec![RegionDescriptor {
458            region_id: RegionId::new(table_id, 1),
459            partition_expr: range_expr("x", 0, 200),
460        }];
461        let target_routes = original_target_routes(&original_region_routes, &targets);
462        let mut once = UpdateMetadata::apply_staging_region_routes(
463            group_id,
464            &sources,
465            &targets,
466            &[RegionId::new(table_id, 2)],
467            &original_region_routes,
468        )
469        .unwrap();
470
471        rollback_group_metadata_routes(
472            group_id,
473            &sources,
474            &target_routes,
475            &[],
476            &[RegionId::new(table_id, 2)],
477            &mut once
478                .iter_mut()
479                .map(|route| (route.region.id, route))
480                .collect(),
481        )
482        .unwrap();
483        let mut twice = once.clone();
484        rollback_group_metadata_routes(
485            group_id,
486            &sources,
487            &target_routes,
488            &[],
489            &[RegionId::new(table_id, 2)],
490            &mut twice
491                .iter_mut()
492                .map(|route| (route.region.id, route))
493                .collect(),
494        )
495        .unwrap();
496
497        assert_eq!(once, original_region_routes);
498        assert_eq!(once, twice);
499    }
500}