1use std::collections::{HashMap, HashSet};
16
17use common_error::ext::BoxedError;
18use common_meta::key::TableMetadataManagerRef;
19use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue};
20use common_meta::rpc::router::RegionRoute;
21use snafu::{OptionExt, ResultExt, ensure};
22use store_api::storage::{RegionId, RegionNumber, TableId};
23
24use crate::error::{self, Result};
25use crate::procedure::repartition::group::GroupId;
26use crate::procedure::repartition::plan::RegionDescriptor;
27
28pub async fn get_datanode_table_value(
33 table_metadata_manager: &TableMetadataManagerRef,
34 table_id: TableId,
35 datanode_id: u64,
36) -> Result<DatanodeTableValue> {
37 let datanode_table_value = table_metadata_manager
38 .datanode_table_manager()
39 .get(&DatanodeTableKey {
40 datanode_id,
41 table_id,
42 })
43 .await
44 .context(error::TableMetadataManagerSnafu)
45 .map_err(BoxedError::new)
46 .with_context(|_| error::RetryLaterWithSourceSnafu {
47 reason: format!("Failed to get DatanodeTable: {table_id}"),
48 })?
49 .context(error::DatanodeTableNotFoundSnafu {
50 table_id,
51 datanode_id,
52 })?;
53 Ok(datanode_table_value)
54}
55
56pub fn merge_and_validate_region_wal_options(
78 region_wal_options: &HashMap<RegionNumber, String>,
79 mut new_region_wal_options: HashMap<RegionNumber, String>,
80 new_region_routes: &[RegionRoute],
81 table_id: TableId,
82) -> Result<HashMap<RegionNumber, String>> {
83 for (region_number, _) in new_region_wal_options.iter() {
85 if region_wal_options.contains_key(region_number) {
86 return error::UnexpectedSnafu {
87 violated: format!(
88 "Overwriting existing WAL option for region: {}",
89 RegionId::new(table_id, *region_number)
90 ),
91 }
92 .fail();
93 }
94 }
95
96 new_region_wal_options.extend(region_wal_options.clone());
97
98 let region_numbers: HashSet<RegionNumber> = new_region_routes
100 .iter()
101 .map(|r| r.region.id.region_number())
102 .collect();
103
104 new_region_wal_options.retain(|k, _| region_numbers.contains(k));
106
107 ensure!(
109 region_numbers.len() == new_region_wal_options.len(),
110 error::UnexpectedSnafu {
111 violated: format!(
112 "Mismatch between number of region_numbers ({}) and new_region_wal_options ({}) for table: {}",
113 region_numbers.len(),
114 new_region_wal_options.len(),
115 table_id
116 ),
117 }
118 );
119
120 Ok(new_region_wal_options)
121}
122
123pub fn rollback_group_metadata_routes(
140 group_id: GroupId,
141 source_regions: &[RegionDescriptor],
142 original_target_routes: &[RegionRoute],
143 allocated_region_ids: &[RegionId],
144 pending_deallocate_region_ids: &[RegionId],
145 region_routes_map: &mut HashMap<RegionId, &mut RegionRoute>,
146) -> Result<()> {
147 for source in source_regions {
148 let region_route = region_routes_map.get_mut(&source.region_id).context(
149 error::RepartitionSourceRegionMissingSnafu {
150 group_id,
151 region_id: source.region_id,
152 },
153 )?;
154 region_route.clear_leader_staging();
155 if pending_deallocate_region_ids.contains(&source.region_id) {
156 region_route.clear_ignore_all_writes();
157 }
158 }
159
160 for target in original_target_routes {
161 let Some(region_route) = region_routes_map.get_mut(&target.region.id) else {
162 if allocated_region_ids.contains(&target.region.id) {
165 continue;
166 }
167
168 return error::RepartitionTargetRegionMissingSnafu {
169 group_id,
170 region_id: target.region.id,
171 }
172 .fail();
173 };
174 region_route.region.partition_expr = target.region.partition_expr.clone();
175 region_route.write_route_policy = target.write_route_policy;
176 region_route.clear_leader_staging();
177 }
178
179 Ok(())
180}
181
182#[cfg(test)]
183mod tests {
184 use std::collections::HashSet;
185
186 use common_meta::peer::Peer;
187 use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
188 use common_wal::options::{KafkaWalOptions, WalOptions};
189 use store_api::storage::RegionId;
190 use uuid::Uuid;
191
192 use super::*;
193 use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
194 use crate::procedure::repartition::plan::RegionDescriptor;
195 use crate::procedure::repartition::test_util::range_expr;
196
197 fn kafka_wal_option(topic: &str) -> String {
199 serde_json::to_string(&WalOptions::Kafka(KafkaWalOptions {
200 topic: topic.to_string(),
201 }))
202 .unwrap()
203 }
204
205 fn new_region_route(region_id: u64, datanode_id: u64) -> RegionRoute {
206 RegionRoute {
207 region: Region {
208 id: RegionId::from_u64(region_id),
209 ..Default::default()
210 },
211 leader_peer: Some(Peer::empty(datanode_id)),
212 follower_peers: vec![],
213 leader_state: None,
214 leader_down_since: None,
215 write_route_policy: None,
216 }
217 }
218
219 fn new_staged_region_route(
220 region_id: RegionId,
221 partition_expr: &str,
222 leader_state: Option<LeaderState>,
223 ignore_all_writes: bool,
224 ) -> RegionRoute {
225 let mut route = RegionRoute {
226 region: Region {
227 id: region_id,
228 partition_expr: partition_expr.to_string(),
229 ..Default::default()
230 },
231 leader_peer: Some(Peer::empty(1)),
232 leader_state,
233 ..Default::default()
234 };
235
236 if ignore_all_writes {
237 route.set_ignore_all_writes();
238 }
239
240 route
241 }
242
243 fn original_target_routes(
244 region_routes: &[RegionRoute],
245 targets: &[RegionDescriptor],
246 ) -> Vec<RegionRoute> {
247 let target_ids = targets
248 .iter()
249 .map(|target| target.region_id)
250 .collect::<HashSet<_>>();
251 region_routes
252 .iter()
253 .filter(|route| target_ids.contains(&route.region.id))
254 .cloned()
255 .collect()
256 }
257
258 #[test]
259 fn test_merge_and_validate_region_wal_options_success() {
260 let table_id = 1;
261 let existing_wal_options: HashMap<RegionNumber, String> = vec![
262 (1, kafka_wal_option("topic_1")),
263 (2, kafka_wal_option("topic_2")),
264 ]
265 .into_iter()
266 .collect();
267 let new_wal_options: HashMap<RegionNumber, String> =
268 vec![(3, kafka_wal_option("topic_3"))].into_iter().collect();
269 let new_region_routes = vec![
270 new_region_route(1, 1),
271 new_region_route(2, 2),
272 new_region_route(3, 3),
273 ];
274 let result = merge_and_validate_region_wal_options(
275 &existing_wal_options,
276 new_wal_options,
277 &new_region_routes,
278 table_id,
279 )
280 .unwrap();
281
282 assert_eq!(result.len(), 3);
284 assert!(result.contains_key(&1));
285 assert!(result.contains_key(&2));
286 assert!(result.contains_key(&3));
287 assert_eq!(result.get(&1).unwrap(), &kafka_wal_option("topic_1"));
289 assert_eq!(result.get(&2).unwrap(), &kafka_wal_option("topic_2"));
290 assert_eq!(result.get(&3).unwrap(), &kafka_wal_option("topic_3"));
292 }
293
294 #[test]
295 fn test_merge_and_validate_region_wal_options_new_overrides_existing() {
296 let table_id = 1;
297 let existing_wal_options: HashMap<RegionNumber, String> =
298 vec![(1, kafka_wal_option("topic_1_old"))]
299 .into_iter()
300 .collect();
301 let new_wal_options: HashMap<RegionNumber, String> =
302 vec![(1, kafka_wal_option("topic_1_new"))]
303 .into_iter()
304 .collect();
305 let new_region_routes = vec![new_region_route(1, 1)];
306 merge_and_validate_region_wal_options(
307 &existing_wal_options,
308 new_wal_options,
309 &new_region_routes,
310 table_id,
311 )
312 .unwrap_err();
313 }
314
315 #[test]
316 fn test_merge_and_validate_region_wal_options_filters_removed_regions() {
317 let table_id = 1;
318 let existing_wal_options: HashMap<RegionNumber, String> = vec![
319 (1, kafka_wal_option("topic_1")),
320 (2, kafka_wal_option("topic_2")),
321 (3, kafka_wal_option("topic_3")),
322 ]
323 .into_iter()
324 .collect();
325 let new_wal_options = HashMap::new();
326 let new_region_routes = vec![new_region_route(1, 1), new_region_route(2, 2)];
328 let result = merge_and_validate_region_wal_options(
329 &existing_wal_options,
330 new_wal_options,
331 &new_region_routes,
332 table_id,
333 )
334 .unwrap();
335
336 assert_eq!(result.len(), 2);
338 assert!(result.contains_key(&1));
339 assert!(result.contains_key(&2));
340 assert!(!result.contains_key(&3));
341 }
342
343 #[test]
344 fn test_merge_and_validate_region_wal_options_missing_option() {
345 let table_id = 1;
346 let existing_wal_options: HashMap<RegionNumber, String> =
347 vec![(1, kafka_wal_option("topic_1"))].into_iter().collect();
348 let new_wal_options = HashMap::new();
349 let new_region_routes = vec![new_region_route(1, 1), new_region_route(2, 2)];
351 let result = merge_and_validate_region_wal_options(
352 &existing_wal_options,
353 new_wal_options,
354 &new_region_routes,
355 table_id,
356 );
357 assert!(result.is_err());
359 let error_msg = result.unwrap_err().to_string();
360 assert!(error_msg.contains("Mismatch"));
361 assert!(error_msg.contains(&table_id.to_string()));
362 }
363
364 #[test]
365 fn test_rollback_group_metadata_routes_split_case() {
366 let group_id = Uuid::new_v4();
367 let table_id = 1024;
368 let original_region_routes = vec![
369 new_staged_region_route(
370 RegionId::new(table_id, 1),
371 &range_expr("x", 0, 100).as_json_str().unwrap(),
372 None,
373 false,
374 ),
375 new_staged_region_route(
376 RegionId::new(table_id, 2),
377 &range_expr("x", 100, 200).as_json_str().unwrap(),
378 None,
379 false,
380 ),
381 new_staged_region_route(RegionId::new(table_id, 3), "", None, false),
382 ];
383 let sources = vec![RegionDescriptor {
384 region_id: RegionId::new(table_id, 1),
385 partition_expr: range_expr("x", 0, 100),
386 }];
387 let targets = vec![
388 RegionDescriptor {
389 region_id: RegionId::new(table_id, 1),
390 partition_expr: range_expr("x", 0, 50),
391 },
392 RegionDescriptor {
393 region_id: RegionId::new(table_id, 3),
394 partition_expr: range_expr("x", 50, 100),
395 },
396 ];
397 let mut applied_region_routes = UpdateMetadata::apply_staging_region_routes(
398 group_id,
399 &sources,
400 &targets,
401 &[],
402 &original_region_routes,
403 )
404 .unwrap();
405 let target_routes = original_target_routes(&original_region_routes, &targets);
406
407 rollback_group_metadata_routes(
408 group_id,
409 &sources,
410 &target_routes,
411 &[],
412 &[],
413 &mut applied_region_routes
414 .iter_mut()
415 .map(|route| (route.region.id, route))
416 .collect(),
417 )
418 .unwrap();
419
420 assert_eq!(applied_region_routes, original_region_routes);
421 }
422
423 #[test]
424 fn test_rollback_group_metadata_routes_merge_case_is_idempotent() {
425 let group_id = Uuid::new_v4();
426 let table_id = 1024;
427 let original_region_routes = vec![
428 new_staged_region_route(
429 RegionId::new(table_id, 1),
430 &range_expr("x", 0, 100).as_json_str().unwrap(),
431 None,
432 false,
433 ),
434 new_staged_region_route(
435 RegionId::new(table_id, 2),
436 &range_expr("x", 100, 200).as_json_str().unwrap(),
437 None,
438 false,
439 ),
440 new_staged_region_route(
441 RegionId::new(table_id, 3),
442 &range_expr("x", 200, 300).as_json_str().unwrap(),
443 None,
444 false,
445 ),
446 ];
447 let sources = vec![
448 RegionDescriptor {
449 region_id: RegionId::new(table_id, 1),
450 partition_expr: range_expr("x", 0, 100),
451 },
452 RegionDescriptor {
453 region_id: RegionId::new(table_id, 2),
454 partition_expr: range_expr("x", 100, 200),
455 },
456 ];
457 let targets = vec![RegionDescriptor {
458 region_id: RegionId::new(table_id, 1),
459 partition_expr: range_expr("x", 0, 200),
460 }];
461 let target_routes = original_target_routes(&original_region_routes, &targets);
462 let mut once = UpdateMetadata::apply_staging_region_routes(
463 group_id,
464 &sources,
465 &targets,
466 &[RegionId::new(table_id, 2)],
467 &original_region_routes,
468 )
469 .unwrap();
470
471 rollback_group_metadata_routes(
472 group_id,
473 &sources,
474 &target_routes,
475 &[],
476 &[RegionId::new(table_id, 2)],
477 &mut once
478 .iter_mut()
479 .map(|route| (route.region.id, route))
480 .collect(),
481 )
482 .unwrap();
483 let mut twice = once.clone();
484 rollback_group_metadata_routes(
485 group_id,
486 &sources,
487 &target_routes,
488 &[],
489 &[RegionId::new(table_id, 2)],
490 &mut twice
491 .iter_mut()
492 .map(|route| (route.region.id, route))
493 .collect(),
494 )
495 .unwrap();
496
497 assert_eq!(once, original_region_routes);
498 assert_eq!(once, twice);
499 }
500}