meta_srv/procedure/repartition/
utils.rs1use std::collections::{HashMap, HashSet};
16
17use common_error::ext::BoxedError;
18use common_meta::key::TableMetadataManagerRef;
19use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue};
20use common_meta::rpc::router::RegionRoute;
21use snafu::{OptionExt, ResultExt, ensure};
22use store_api::storage::{RegionId, RegionNumber, TableId};
23
24use crate::error::{self, Result};
25
26pub async fn get_datanode_table_value(
31 table_metadata_manager: &TableMetadataManagerRef,
32 table_id: TableId,
33 datanode_id: u64,
34) -> Result<DatanodeTableValue> {
35 let datanode_table_value = table_metadata_manager
36 .datanode_table_manager()
37 .get(&DatanodeTableKey {
38 datanode_id,
39 table_id,
40 })
41 .await
42 .context(error::TableMetadataManagerSnafu)
43 .map_err(BoxedError::new)
44 .with_context(|_| error::RetryLaterWithSourceSnafu {
45 reason: format!("Failed to get DatanodeTable: {table_id}"),
46 })?
47 .context(error::DatanodeTableNotFoundSnafu {
48 table_id,
49 datanode_id,
50 })?;
51 Ok(datanode_table_value)
52}
53
54pub fn merge_and_validate_region_wal_options(
76 region_wal_options: &HashMap<RegionNumber, String>,
77 mut new_region_wal_options: HashMap<RegionNumber, String>,
78 new_region_routes: &[RegionRoute],
79 table_id: TableId,
80) -> Result<HashMap<RegionNumber, String>> {
81 for (region_number, _) in new_region_wal_options.iter() {
83 if region_wal_options.contains_key(region_number) {
84 return error::UnexpectedSnafu {
85 violated: format!(
86 "Overwriting existing WAL option for region: {}",
87 RegionId::new(table_id, *region_number)
88 ),
89 }
90 .fail();
91 }
92 }
93
94 new_region_wal_options.extend(region_wal_options.clone());
95
96 let region_numbers: HashSet<RegionNumber> = new_region_routes
98 .iter()
99 .map(|r| r.region.id.region_number())
100 .collect();
101
102 new_region_wal_options.retain(|k, _| region_numbers.contains(k));
104
105 ensure!(
107 region_numbers.len() == new_region_wal_options.len(),
108 error::UnexpectedSnafu {
109 violated: format!(
110 "Mismatch between number of region_numbers ({}) and new_region_wal_options ({}) for table: {}",
111 region_numbers.len(),
112 new_region_wal_options.len(),
113 table_id
114 ),
115 }
116 );
117
118 Ok(new_region_wal_options)
119}
120
121#[cfg(test)]
122mod tests {
123 use common_meta::peer::Peer;
124 use common_meta::rpc::router::{Region, RegionRoute};
125 use common_wal::options::{KafkaWalOptions, WalOptions};
126 use store_api::storage::RegionId;
127
128 use super::*;
129
130 fn kafka_wal_option(topic: &str) -> String {
132 serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
133 topic: topic.to_string(),
134 }))
135 .unwrap()
136 }
137
138 fn new_region_route(region_id: u64, datanode_id: u64) -> RegionRoute {
139 RegionRoute {
140 region: Region {
141 id: RegionId::from_u64(region_id),
142 ..Default::default()
143 },
144 leader_peer: Some(Peer::empty(datanode_id)),
145 follower_peers: vec![],
146 leader_state: None,
147 leader_down_since: None,
148 write_route_policy: None,
149 }
150 }
151
152 #[test]
153 fn test_merge_and_validate_region_wal_options_success() {
154 let table_id = 1;
155 let existing_wal_options: HashMap<RegionNumber, String> = vec![
156 (1, kafka_wal_option("topic_1")),
157 (2, kafka_wal_option("topic_2")),
158 ]
159 .into_iter()
160 .collect();
161 let new_wal_options: HashMap<RegionNumber, String> =
162 vec![(3, kafka_wal_option("topic_3"))].into_iter().collect();
163 let new_region_routes = vec![
164 new_region_route(1, 1),
165 new_region_route(2, 2),
166 new_region_route(3, 3),
167 ];
168 let result = merge_and_validate_region_wal_options(
169 &existing_wal_options,
170 new_wal_options,
171 &new_region_routes,
172 table_id,
173 )
174 .unwrap();
175
176 assert_eq!(result.len(), 3);
178 assert!(result.contains_key(&1));
179 assert!(result.contains_key(&2));
180 assert!(result.contains_key(&3));
181 assert_eq!(result.get(&1).unwrap(), &kafka_wal_option("topic_1"));
183 assert_eq!(result.get(&2).unwrap(), &kafka_wal_option("topic_2"));
184 assert_eq!(result.get(&3).unwrap(), &kafka_wal_option("topic_3"));
186 }
187
188 #[test]
189 fn test_merge_and_validate_region_wal_options_new_overrides_existing() {
190 let table_id = 1;
191 let existing_wal_options: HashMap<RegionNumber, String> =
192 vec![(1, kafka_wal_option("topic_1_old"))]
193 .into_iter()
194 .collect();
195 let new_wal_options: HashMap<RegionNumber, String> =
196 vec![(1, kafka_wal_option("topic_1_new"))]
197 .into_iter()
198 .collect();
199 let new_region_routes = vec![new_region_route(1, 1)];
200 merge_and_validate_region_wal_options(
201 &existing_wal_options,
202 new_wal_options,
203 &new_region_routes,
204 table_id,
205 )
206 .unwrap_err();
207 }
208
209 #[test]
210 fn test_merge_and_validate_region_wal_options_filters_removed_regions() {
211 let table_id = 1;
212 let existing_wal_options: HashMap<RegionNumber, String> = vec![
213 (1, kafka_wal_option("topic_1")),
214 (2, kafka_wal_option("topic_2")),
215 (3, kafka_wal_option("topic_3")),
216 ]
217 .into_iter()
218 .collect();
219 let new_wal_options = HashMap::new();
220 let new_region_routes = vec![new_region_route(1, 1), new_region_route(2, 2)];
222 let result = merge_and_validate_region_wal_options(
223 &existing_wal_options,
224 new_wal_options,
225 &new_region_routes,
226 table_id,
227 )
228 .unwrap();
229
230 assert_eq!(result.len(), 2);
232 assert!(result.contains_key(&1));
233 assert!(result.contains_key(&2));
234 assert!(!result.contains_key(&3));
235 }
236
237 #[test]
238 fn test_merge_and_validate_region_wal_options_missing_option() {
239 let table_id = 1;
240 let existing_wal_options: HashMap<RegionNumber, String> =
241 vec![(1, kafka_wal_option("topic_1"))].into_iter().collect();
242 let new_wal_options = HashMap::new();
243 let new_region_routes = vec![new_region_route(1, 1), new_region_route(2, 2)];
245 let result = merge_and_validate_region_wal_options(
246 &existing_wal_options,
247 new_wal_options,
248 &new_region_routes,
249 table_id,
250 );
251 assert!(result.is_err());
253 let error_msg = result.unwrap_err().to_string();
254 assert!(error_msg.contains("Mismatch"));
255 assert!(error_msg.contains(&table_id.to_string()));
256 }
257}