meta_srv/procedure/repartition/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use common_error::ext::BoxedError;
18use common_meta::key::TableMetadataManagerRef;
19use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue};
20use common_meta::rpc::router::RegionRoute;
21use snafu::{OptionExt, ResultExt, ensure};
22use store_api::storage::{RegionId, RegionNumber, TableId};
23
24use crate::error::{self, Result};
25
26/// Returns the `datanode_table_value`
27///
28/// Retry:
29/// - Failed to retrieve the metadata of datanode table.
30pub async fn get_datanode_table_value(
31    table_metadata_manager: &TableMetadataManagerRef,
32    table_id: TableId,
33    datanode_id: u64,
34) -> Result<DatanodeTableValue> {
35    let datanode_table_value = table_metadata_manager
36        .datanode_table_manager()
37        .get(&DatanodeTableKey {
38            datanode_id,
39            table_id,
40        })
41        .await
42        .context(error::TableMetadataManagerSnafu)
43        .map_err(BoxedError::new)
44        .with_context(|_| error::RetryLaterWithSourceSnafu {
45            reason: format!("Failed to get DatanodeTable: {table_id}"),
46        })?
47        .context(error::DatanodeTableNotFoundSnafu {
48            table_id,
49            datanode_id,
50        })?;
51    Ok(datanode_table_value)
52}
53
54/// Merges and validates region WAL options for repartition.
55///
56/// This function:
57/// 1. Validates that new WAL options don't overwrite existing ones
58/// 2. Merges existing `region_wal_options` with new `new_region_wal_options`
59/// 3. Filters out WAL options for regions that are not in `new_region_routes`
60/// 4. Validates that every region in `new_region_routes` has a corresponding WAL option
61///
62/// # Arguments
63/// * `region_wal_options` - Existing region WAL options from datanode table
64/// * `new_region_wal_options` - New region WAL options to merge (should only contain newly allocated regions)
65/// * `new_region_routes` - The new region routes after repartition
66/// * `table_id` - Table ID for error reporting
67///
68/// # Returns
69/// Returns the merged and filtered WAL options, ensuring all regions have options.
70///
71/// # Errors
72/// Returns an error if:
73/// - New WAL options try to overwrite existing ones for the same region
74/// - Any region in `new_region_routes` is missing a WAL option
75pub fn merge_and_validate_region_wal_options(
76    region_wal_options: &HashMap<RegionNumber, String>,
77    mut new_region_wal_options: HashMap<RegionNumber, String>,
78    new_region_routes: &[RegionRoute],
79    table_id: TableId,
80) -> Result<HashMap<RegionNumber, String>> {
81    // Doesn't allow overwriting existing WAL options.
82    for (region_number, _) in new_region_wal_options.iter() {
83        if region_wal_options.contains_key(region_number) {
84            return error::UnexpectedSnafu {
85                violated: format!(
86                    "Overwriting existing WAL option for region: {}",
87                    RegionId::new(table_id, *region_number)
88                ),
89            }
90            .fail();
91        }
92    }
93
94    new_region_wal_options.extend(region_wal_options.clone());
95
96    // Extract region numbers from new routes
97    let region_numbers: HashSet<RegionNumber> = new_region_routes
98        .iter()
99        .map(|r| r.region.id.region_number())
100        .collect();
101
102    // Filter out WAL options for regions that are not in new_region_routes
103    new_region_wal_options.retain(|k, _| region_numbers.contains(k));
104
105    // Validate that every region has a WAL option
106    ensure!(
107        region_numbers.len() == new_region_wal_options.len(),
108        error::UnexpectedSnafu {
109            violated: format!(
110                "Mismatch between number of region_numbers ({}) and new_region_wal_options ({}) for table: {}",
111                region_numbers.len(),
112                new_region_wal_options.len(),
113                table_id
114            ),
115        }
116    );
117
118    Ok(new_region_wal_options)
119}
120
121#[cfg(test)]
122mod tests {
123    use common_meta::peer::Peer;
124    use common_meta::rpc::router::{Region, RegionRoute};
125    use common_wal::options::{KafkaWalOptions, WalOptions};
126    use store_api::storage::RegionId;
127
128    use super::*;
129
130    /// Helper function to create a Kafka WAL option string from a topic name.
131    fn kafka_wal_option(topic: &str) -> String {
132        serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
133            topic: topic.to_string(),
134        }))
135        .unwrap()
136    }
137
138    fn new_region_route(region_id: u64, datanode_id: u64) -> RegionRoute {
139        RegionRoute {
140            region: Region {
141                id: RegionId::from_u64(region_id),
142                ..Default::default()
143            },
144            leader_peer: Some(Peer::empty(datanode_id)),
145            follower_peers: vec![],
146            leader_state: None,
147            leader_down_since: None,
148        }
149    }
150
151    #[test]
152    fn test_merge_and_validate_region_wal_options_success() {
153        let table_id = 1;
154        let existing_wal_options: HashMap<RegionNumber, String> = vec![
155            (1, kafka_wal_option("topic_1")),
156            (2, kafka_wal_option("topic_2")),
157        ]
158        .into_iter()
159        .collect();
160        let new_wal_options: HashMap<RegionNumber, String> =
161            vec![(3, kafka_wal_option("topic_3"))].into_iter().collect();
162        let new_region_routes = vec![
163            new_region_route(1, 1),
164            new_region_route(2, 2),
165            new_region_route(3, 3),
166        ];
167        let result = merge_and_validate_region_wal_options(
168            &existing_wal_options,
169            new_wal_options,
170            &new_region_routes,
171            table_id,
172        )
173        .unwrap();
174
175        // Should have all three regions
176        assert_eq!(result.len(), 3);
177        assert!(result.contains_key(&1));
178        assert!(result.contains_key(&2));
179        assert!(result.contains_key(&3));
180        // Existing options should be preserved
181        assert_eq!(result.get(&1).unwrap(), &kafka_wal_option("topic_1"));
182        assert_eq!(result.get(&2).unwrap(), &kafka_wal_option("topic_2"));
183        // New option should be present
184        assert_eq!(result.get(&3).unwrap(), &kafka_wal_option("topic_3"));
185    }
186
187    #[test]
188    fn test_merge_and_validate_region_wal_options_new_overrides_existing() {
189        let table_id = 1;
190        let existing_wal_options: HashMap<RegionNumber, String> =
191            vec![(1, kafka_wal_option("topic_1_old"))]
192                .into_iter()
193                .collect();
194        let new_wal_options: HashMap<RegionNumber, String> =
195            vec![(1, kafka_wal_option("topic_1_new"))]
196                .into_iter()
197                .collect();
198        let new_region_routes = vec![new_region_route(1, 1)];
199        merge_and_validate_region_wal_options(
200            &existing_wal_options,
201            new_wal_options,
202            &new_region_routes,
203            table_id,
204        )
205        .unwrap_err();
206    }
207
208    #[test]
209    fn test_merge_and_validate_region_wal_options_filters_removed_regions() {
210        let table_id = 1;
211        let existing_wal_options: HashMap<RegionNumber, String> = vec![
212            (1, kafka_wal_option("topic_1")),
213            (2, kafka_wal_option("topic_2")),
214            (3, kafka_wal_option("topic_3")),
215        ]
216        .into_iter()
217        .collect();
218        let new_wal_options = HashMap::new();
219        // Only regions 1 and 2 are in new routes (region 3 removed)
220        let new_region_routes = vec![new_region_route(1, 1), new_region_route(2, 2)];
221        let result = merge_and_validate_region_wal_options(
222            &existing_wal_options,
223            new_wal_options,
224            &new_region_routes,
225            table_id,
226        )
227        .unwrap();
228
229        // Should only have regions 1 and 2 (region 3 filtered out)
230        assert_eq!(result.len(), 2);
231        assert!(result.contains_key(&1));
232        assert!(result.contains_key(&2));
233        assert!(!result.contains_key(&3));
234    }
235
236    #[test]
237    fn test_merge_and_validate_region_wal_options_missing_option() {
238        let table_id = 1;
239        let existing_wal_options: HashMap<RegionNumber, String> =
240            vec![(1, kafka_wal_option("topic_1"))].into_iter().collect();
241        let new_wal_options = HashMap::new();
242        // Region 2 is in routes but has no WAL option
243        let new_region_routes = vec![new_region_route(1, 1), new_region_route(2, 2)];
244        let result = merge_and_validate_region_wal_options(
245            &existing_wal_options,
246            new_wal_options,
247            &new_region_routes,
248            table_id,
249        );
250        // Should fail validation
251        assert!(result.is_err());
252        let error_msg = result.unwrap_err().to_string();
253        assert!(error_msg.contains("Mismatch"));
254        assert!(error_msg.contains(&table_id.to_string()));
255    }
256}