meta_srv/procedure/repartition/
utils.rs1use std::collections::{HashMap, HashSet};
16
17use common_error::ext::BoxedError;
18use common_meta::key::TableMetadataManagerRef;
19use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue};
20use common_meta::rpc::router::RegionRoute;
21use snafu::{OptionExt, ResultExt, ensure};
22use store_api::storage::{RegionId, RegionNumber, TableId};
23
24use crate::error::{self, Result};
25
26pub async fn get_datanode_table_value(
31 table_metadata_manager: &TableMetadataManagerRef,
32 table_id: TableId,
33 datanode_id: u64,
34) -> Result<DatanodeTableValue> {
35 let datanode_table_value = table_metadata_manager
36 .datanode_table_manager()
37 .get(&DatanodeTableKey {
38 datanode_id,
39 table_id,
40 })
41 .await
42 .context(error::TableMetadataManagerSnafu)
43 .map_err(BoxedError::new)
44 .with_context(|_| error::RetryLaterWithSourceSnafu {
45 reason: format!("Failed to get DatanodeTable: {table_id}"),
46 })?
47 .context(error::DatanodeTableNotFoundSnafu {
48 table_id,
49 datanode_id,
50 })?;
51 Ok(datanode_table_value)
52}
53
54pub fn merge_and_validate_region_wal_options(
76 region_wal_options: &HashMap<RegionNumber, String>,
77 mut new_region_wal_options: HashMap<RegionNumber, String>,
78 new_region_routes: &[RegionRoute],
79 table_id: TableId,
80) -> Result<HashMap<RegionNumber, String>> {
81 for (region_number, _) in new_region_wal_options.iter() {
83 if region_wal_options.contains_key(region_number) {
84 return error::UnexpectedSnafu {
85 violated: format!(
86 "Overwriting existing WAL option for region: {}",
87 RegionId::new(table_id, *region_number)
88 ),
89 }
90 .fail();
91 }
92 }
93
94 new_region_wal_options.extend(region_wal_options.clone());
95
96 let region_numbers: HashSet<RegionNumber> = new_region_routes
98 .iter()
99 .map(|r| r.region.id.region_number())
100 .collect();
101
102 new_region_wal_options.retain(|k, _| region_numbers.contains(k));
104
105 ensure!(
107 region_numbers.len() == new_region_wal_options.len(),
108 error::UnexpectedSnafu {
109 violated: format!(
110 "Mismatch between number of region_numbers ({}) and new_region_wal_options ({}) for table: {}",
111 region_numbers.len(),
112 new_region_wal_options.len(),
113 table_id
114 ),
115 }
116 );
117
118 Ok(new_region_wal_options)
119}
120
121#[cfg(test)]
122mod tests {
123 use common_meta::peer::Peer;
124 use common_meta::rpc::router::{Region, RegionRoute};
125 use common_wal::options::{KafkaWalOptions, WalOptions};
126 use store_api::storage::RegionId;
127
128 use super::*;
129
130 fn kafka_wal_option(topic: &str) -> String {
132 serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
133 topic: topic.to_string(),
134 }))
135 .unwrap()
136 }
137
138 fn new_region_route(region_id: u64, datanode_id: u64) -> RegionRoute {
139 RegionRoute {
140 region: Region {
141 id: RegionId::from_u64(region_id),
142 ..Default::default()
143 },
144 leader_peer: Some(Peer::empty(datanode_id)),
145 follower_peers: vec![],
146 leader_state: None,
147 leader_down_since: None,
148 }
149 }
150
151 #[test]
152 fn test_merge_and_validate_region_wal_options_success() {
153 let table_id = 1;
154 let existing_wal_options: HashMap<RegionNumber, String> = vec![
155 (1, kafka_wal_option("topic_1")),
156 (2, kafka_wal_option("topic_2")),
157 ]
158 .into_iter()
159 .collect();
160 let new_wal_options: HashMap<RegionNumber, String> =
161 vec![(3, kafka_wal_option("topic_3"))].into_iter().collect();
162 let new_region_routes = vec![
163 new_region_route(1, 1),
164 new_region_route(2, 2),
165 new_region_route(3, 3),
166 ];
167 let result = merge_and_validate_region_wal_options(
168 &existing_wal_options,
169 new_wal_options,
170 &new_region_routes,
171 table_id,
172 )
173 .unwrap();
174
175 assert_eq!(result.len(), 3);
177 assert!(result.contains_key(&1));
178 assert!(result.contains_key(&2));
179 assert!(result.contains_key(&3));
180 assert_eq!(result.get(&1).unwrap(), &kafka_wal_option("topic_1"));
182 assert_eq!(result.get(&2).unwrap(), &kafka_wal_option("topic_2"));
183 assert_eq!(result.get(&3).unwrap(), &kafka_wal_option("topic_3"));
185 }
186
187 #[test]
188 fn test_merge_and_validate_region_wal_options_new_overrides_existing() {
189 let table_id = 1;
190 let existing_wal_options: HashMap<RegionNumber, String> =
191 vec![(1, kafka_wal_option("topic_1_old"))]
192 .into_iter()
193 .collect();
194 let new_wal_options: HashMap<RegionNumber, String> =
195 vec![(1, kafka_wal_option("topic_1_new"))]
196 .into_iter()
197 .collect();
198 let new_region_routes = vec![new_region_route(1, 1)];
199 merge_and_validate_region_wal_options(
200 &existing_wal_options,
201 new_wal_options,
202 &new_region_routes,
203 table_id,
204 )
205 .unwrap_err();
206 }
207
208 #[test]
209 fn test_merge_and_validate_region_wal_options_filters_removed_regions() {
210 let table_id = 1;
211 let existing_wal_options: HashMap<RegionNumber, String> = vec![
212 (1, kafka_wal_option("topic_1")),
213 (2, kafka_wal_option("topic_2")),
214 (3, kafka_wal_option("topic_3")),
215 ]
216 .into_iter()
217 .collect();
218 let new_wal_options = HashMap::new();
219 let new_region_routes = vec![new_region_route(1, 1), new_region_route(2, 2)];
221 let result = merge_and_validate_region_wal_options(
222 &existing_wal_options,
223 new_wal_options,
224 &new_region_routes,
225 table_id,
226 )
227 .unwrap();
228
229 assert_eq!(result.len(), 2);
231 assert!(result.contains_key(&1));
232 assert!(result.contains_key(&2));
233 assert!(!result.contains_key(&3));
234 }
235
236 #[test]
237 fn test_merge_and_validate_region_wal_options_missing_option() {
238 let table_id = 1;
239 let existing_wal_options: HashMap<RegionNumber, String> =
240 vec![(1, kafka_wal_option("topic_1"))].into_iter().collect();
241 let new_wal_options = HashMap::new();
242 let new_region_routes = vec![new_region_route(1, 1), new_region_route(2, 2)];
244 let result = merge_and_validate_region_wal_options(
245 &existing_wal_options,
246 new_wal_options,
247 &new_region_routes,
248 table_id,
249 );
250 assert!(result.is_err());
252 let error_msg = result.unwrap_err().to_string();
253 assert!(error_msg.contains("Mismatch"));
254 assert!(error_msg.contains(&table_id.to_string()));
255 }
256}