1use std::collections::{HashMap, HashSet};
16
17use common_error::ext::BoxedError;
18use common_meta::key::TableMetadataManagerRef;
19use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue};
20use common_meta::rpc::router::RegionRoute;
21use common_meta::wal_provider::RegionWalOptions;
22use snafu::{OptionExt, ResultExt, ensure};
23use store_api::storage::{RegionId, RegionNumber, TableId};
24
25use crate::error::{self, Result};
26use crate::procedure::repartition::group::GroupId;
27use crate::procedure::repartition::plan::SourceRegionDescriptor;
28
29pub async fn get_datanode_table_value(
34 table_metadata_manager: &TableMetadataManagerRef,
35 table_id: TableId,
36 datanode_id: u64,
37) -> Result<DatanodeTableValue> {
38 let datanode_table_value = table_metadata_manager
39 .datanode_table_manager()
40 .get(&DatanodeTableKey {
41 datanode_id,
42 table_id,
43 })
44 .await
45 .context(error::TableMetadataManagerSnafu)
46 .map_err(BoxedError::new)
47 .with_context(|_| error::RetryLaterWithSourceSnafu {
48 reason: format!("Failed to get DatanodeTable: {table_id}"),
49 })?
50 .context(error::DatanodeTableNotFoundSnafu {
51 table_id,
52 datanode_id,
53 })?;
54 Ok(datanode_table_value)
55}
56
57pub fn merge_and_validate_region_wal_options(
79 region_wal_options: &RegionWalOptions,
80 mut new_region_wal_options: RegionWalOptions,
81 new_region_routes: &[RegionRoute],
82 table_id: TableId,
83) -> Result<RegionWalOptions> {
84 for (region_number, _) in new_region_wal_options.iter() {
86 if region_wal_options.contains_key(region_number) {
87 return error::UnexpectedSnafu {
88 violated: format!(
89 "Overwriting existing WAL option for region: {}",
90 RegionId::new(table_id, *region_number)
91 ),
92 }
93 .fail();
94 }
95 }
96
97 new_region_wal_options.extend(region_wal_options.clone());
98
99 let region_numbers: HashSet<RegionNumber> = new_region_routes
101 .iter()
102 .map(|r| r.region.id.region_number())
103 .collect();
104
105 new_region_wal_options.retain(|k, _| region_numbers.contains(k));
107
108 ensure!(
110 region_numbers.len() == new_region_wal_options.len(),
111 error::UnexpectedSnafu {
112 violated: format!(
113 "Mismatch between number of region_numbers ({}) and new_region_wal_options ({}) for table: {}",
114 region_numbers.len(),
115 new_region_wal_options.len(),
116 table_id
117 ),
118 }
119 );
120
121 Ok(new_region_wal_options)
122}
123
124pub fn rollback_group_metadata_routes(
141 group_id: GroupId,
142 source_regions: &[SourceRegionDescriptor],
143 original_target_routes: &[RegionRoute],
144 allocated_region_ids: &[RegionId],
145 pending_deallocate_region_ids: &[RegionId],
146 region_routes_map: &mut HashMap<RegionId, &mut RegionRoute>,
147) -> Result<()> {
148 for source in source_regions {
149 let region_id = source.region_id();
150 let region_route = region_routes_map.get_mut(®ion_id).context(
151 error::RepartitionSourceRegionMissingSnafu {
152 group_id,
153 region_id,
154 },
155 )?;
156 region_route.clear_leader_staging();
157 region_route.region.partition_expr = source.route_expr_for_rollback()?;
158 if pending_deallocate_region_ids.contains(®ion_id) {
159 region_route.clear_ignore_all_writes();
160 }
161 }
162
163 for target in original_target_routes {
164 let Some(region_route) = region_routes_map.get_mut(&target.region.id) else {
165 if allocated_region_ids.contains(&target.region.id) {
168 continue;
169 }
170
171 return error::RepartitionTargetRegionMissingSnafu {
172 group_id,
173 region_id: target.region.id,
174 }
175 .fail();
176 };
177 region_route.region.partition_expr = target.region.partition_expr.clone();
178 region_route.write_route_policy = target.write_route_policy;
179 region_route.clear_leader_staging();
180 }
181
182 Ok(())
183}
184
185#[cfg(test)]
186mod tests {
187 use std::collections::HashSet;
188
189 use common_meta::peer::Peer;
190 use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
191 use common_wal::options::{KafkaWalOptions, WalOptions};
192 use store_api::storage::RegionId;
193 use uuid::Uuid;
194
195 use super::*;
196 use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
197 use crate::procedure::repartition::plan::{SourceRegionDescriptor, TargetRegionDescriptor};
198 use crate::procedure::repartition::test_util::range_expr;
199
200 fn kafka_wal_option(topic: &str) -> WalOptions {
202 WalOptions::Kafka(KafkaWalOptions::new(topic.to_string()))
203 }
204
205 fn new_region_route(region_id: u64, datanode_id: u64) -> RegionRoute {
206 RegionRoute {
207 region: Region {
208 id: RegionId::from_u64(region_id),
209 ..Default::default()
210 },
211 leader_peer: Some(Peer::empty(datanode_id)),
212 follower_peers: vec![],
213 leader_state: None,
214 leader_down_since: None,
215 write_route_policy: None,
216 }
217 }
218
219 fn new_staged_region_route(
220 region_id: RegionId,
221 partition_expr: &str,
222 leader_state: Option<LeaderState>,
223 ignore_all_writes: bool,
224 ) -> RegionRoute {
225 let mut route = RegionRoute {
226 region: Region {
227 id: region_id,
228 partition_expr: partition_expr.to_string(),
229 ..Default::default()
230 },
231 leader_peer: Some(Peer::empty(1)),
232 leader_state,
233 ..Default::default()
234 };
235
236 if ignore_all_writes {
237 route.set_ignore_all_writes();
238 }
239
240 route
241 }
242
243 fn original_target_routes(
244 region_routes: &[RegionRoute],
245 targets: &[TargetRegionDescriptor],
246 ) -> Vec<RegionRoute> {
247 let target_ids = targets
248 .iter()
249 .map(|target| target.region_id)
250 .collect::<HashSet<_>>();
251 region_routes
252 .iter()
253 .filter(|route| target_ids.contains(&route.region.id))
254 .cloned()
255 .collect()
256 }
257
258 #[test]
259 fn test_merge_and_validate_region_wal_options_success() {
260 let table_id = 1;
261 let existing_wal_options: RegionWalOptions = vec![
262 (1, kafka_wal_option("topic_1")),
263 (2, kafka_wal_option("topic_2")),
264 ]
265 .into_iter()
266 .collect();
267 let new_wal_options: RegionWalOptions =
268 vec![(3, kafka_wal_option("topic_3"))].into_iter().collect();
269 let new_region_routes = vec![
270 new_region_route(1, 1),
271 new_region_route(2, 2),
272 new_region_route(3, 3),
273 ];
274 let result = merge_and_validate_region_wal_options(
275 &existing_wal_options,
276 new_wal_options,
277 &new_region_routes,
278 table_id,
279 )
280 .unwrap();
281
282 assert_eq!(result.len(), 3);
284 assert!(result.contains_key(&1));
285 assert!(result.contains_key(&2));
286 assert!(result.contains_key(&3));
287 assert_eq!(result.get(&1).unwrap(), &kafka_wal_option("topic_1"));
289 assert_eq!(result.get(&2).unwrap(), &kafka_wal_option("topic_2"));
290 assert_eq!(result.get(&3).unwrap(), &kafka_wal_option("topic_3"));
292 }
293
294 #[test]
295 fn test_merge_and_validate_region_wal_options_new_overrides_existing() {
296 let table_id = 1;
297 let existing_wal_options: RegionWalOptions = vec![(1, kafka_wal_option("topic_1_old"))]
298 .into_iter()
299 .collect();
300 let new_wal_options: RegionWalOptions = vec![(1, kafka_wal_option("topic_1_new"))]
301 .into_iter()
302 .collect();
303 let new_region_routes = vec![new_region_route(1, 1)];
304 merge_and_validate_region_wal_options(
305 &existing_wal_options,
306 new_wal_options,
307 &new_region_routes,
308 table_id,
309 )
310 .unwrap_err();
311 }
312
313 #[test]
314 fn test_merge_and_validate_region_wal_options_filters_removed_regions() {
315 let table_id = 1;
316 let existing_wal_options: RegionWalOptions = vec![
317 (1, kafka_wal_option("topic_1")),
318 (2, kafka_wal_option("topic_2")),
319 (3, kafka_wal_option("topic_3")),
320 ]
321 .into_iter()
322 .collect();
323 let new_wal_options = HashMap::new();
324 let new_region_routes = vec![new_region_route(1, 1), new_region_route(2, 2)];
326 let result = merge_and_validate_region_wal_options(
327 &existing_wal_options,
328 new_wal_options,
329 &new_region_routes,
330 table_id,
331 )
332 .unwrap();
333
334 assert_eq!(result.len(), 2);
336 assert!(result.contains_key(&1));
337 assert!(result.contains_key(&2));
338 assert!(!result.contains_key(&3));
339 }
340
341 #[test]
342 fn test_merge_and_validate_region_wal_options_missing_option() {
343 let table_id = 1;
344 let existing_wal_options: RegionWalOptions =
345 vec![(1, kafka_wal_option("topic_1"))].into_iter().collect();
346 let new_wal_options = HashMap::new();
347 let new_region_routes = vec![new_region_route(1, 1), new_region_route(2, 2)];
349 let result = merge_and_validate_region_wal_options(
350 &existing_wal_options,
351 new_wal_options,
352 &new_region_routes,
353 table_id,
354 );
355 assert!(result.is_err());
357 let error_msg = result.unwrap_err().to_string();
358 assert!(error_msg.contains("Mismatch"));
359 assert!(error_msg.contains(&table_id.to_string()));
360 }
361
362 #[test]
363 fn test_rollback_group_metadata_routes_split_case() {
364 let group_id = Uuid::new_v4();
365 let table_id = 1024;
366 let original_region_routes = vec![
367 new_staged_region_route(
368 RegionId::new(table_id, 1),
369 &range_expr("x", 0, 100).as_json_str().unwrap(),
370 None,
371 false,
372 ),
373 new_staged_region_route(
374 RegionId::new(table_id, 2),
375 &range_expr("x", 100, 200).as_json_str().unwrap(),
376 None,
377 false,
378 ),
379 new_staged_region_route(RegionId::new(table_id, 3), "", None, false),
380 ];
381 let sources = vec![SourceRegionDescriptor::partitioned(
382 RegionId::new(table_id, 1),
383 range_expr("x", 0, 100),
384 )];
385 let targets = vec![
386 TargetRegionDescriptor {
387 region_id: RegionId::new(table_id, 1),
388 partition_expr: range_expr("x", 0, 50),
389 },
390 TargetRegionDescriptor {
391 region_id: RegionId::new(table_id, 3),
392 partition_expr: range_expr("x", 50, 100),
393 },
394 ];
395 let mut applied_region_routes = UpdateMetadata::apply_staging_region_routes(
396 group_id,
397 &sources,
398 &targets,
399 &[],
400 &original_region_routes,
401 )
402 .unwrap();
403 let target_routes = original_target_routes(&original_region_routes, &targets);
404
405 rollback_group_metadata_routes(
406 group_id,
407 &sources,
408 &target_routes,
409 &[],
410 &[],
411 &mut applied_region_routes
412 .iter_mut()
413 .map(|route| (route.region.id, route))
414 .collect(),
415 )
416 .unwrap();
417
418 assert_eq!(applied_region_routes, original_region_routes);
419 }
420
421 #[test]
422 fn test_rollback_group_metadata_routes_default_source_restores_empty_expr() {
423 let group_id = Uuid::new_v4();
424 let table_id = 1024;
425 let default_region_id = RegionId::new(table_id, 1);
426 let allocated_region_id = RegionId::new(table_id, 2);
427 let source_regions = vec![SourceRegionDescriptor::Default {
428 region_id: default_region_id,
429 }];
430 let target_regions = vec![
431 TargetRegionDescriptor {
432 region_id: default_region_id,
433 partition_expr: range_expr("x", 0, 50),
434 },
435 TargetRegionDescriptor {
436 region_id: allocated_region_id,
437 partition_expr: range_expr("x", 50, 100),
438 },
439 ];
440 let current_region_routes = vec![
441 new_staged_region_route(default_region_id, "", None, false),
442 new_staged_region_route(allocated_region_id, "", None, false),
443 ];
444 let original_target_routes = vec![current_region_routes[0].clone()];
445 let mut applied_region_routes = UpdateMetadata::apply_staging_region_routes(
446 group_id,
447 &source_regions,
448 &target_regions,
449 &[],
450 ¤t_region_routes,
451 )
452 .unwrap();
453 assert_eq!(
454 applied_region_routes[0].region.partition_expr,
455 range_expr("x", 0, 50).as_json_str().unwrap()
456 );
457
458 rollback_group_metadata_routes(
459 group_id,
460 &source_regions,
461 &original_target_routes,
462 &[allocated_region_id],
463 &[],
464 &mut applied_region_routes
465 .iter_mut()
466 .map(|route| (route.region.id, route))
467 .collect(),
468 )
469 .unwrap();
470
471 assert_eq!(applied_region_routes[0].region.partition_expr, "");
472 assert!(!applied_region_routes[0].is_leader_staging());
473 }
474
475 #[test]
476 fn test_rollback_group_metadata_routes_merge_case_is_idempotent() {
477 let group_id = Uuid::new_v4();
478 let table_id = 1024;
479 let original_region_routes = vec![
480 new_staged_region_route(
481 RegionId::new(table_id, 1),
482 &range_expr("x", 0, 100).as_json_str().unwrap(),
483 None,
484 false,
485 ),
486 new_staged_region_route(
487 RegionId::new(table_id, 2),
488 &range_expr("x", 100, 200).as_json_str().unwrap(),
489 None,
490 false,
491 ),
492 new_staged_region_route(
493 RegionId::new(table_id, 3),
494 &range_expr("x", 200, 300).as_json_str().unwrap(),
495 None,
496 false,
497 ),
498 ];
499 let sources = vec![
500 SourceRegionDescriptor::partitioned(
501 RegionId::new(table_id, 1),
502 range_expr("x", 0, 100),
503 ),
504 SourceRegionDescriptor::partitioned(
505 RegionId::new(table_id, 2),
506 range_expr("x", 100, 200),
507 ),
508 ];
509 let targets = vec![TargetRegionDescriptor {
510 region_id: RegionId::new(table_id, 1),
511 partition_expr: range_expr("x", 0, 200),
512 }];
513 let target_routes = original_target_routes(&original_region_routes, &targets);
514 let mut once = UpdateMetadata::apply_staging_region_routes(
515 group_id,
516 &sources,
517 &targets,
518 &[RegionId::new(table_id, 2)],
519 &original_region_routes,
520 )
521 .unwrap();
522
523 rollback_group_metadata_routes(
524 group_id,
525 &sources,
526 &target_routes,
527 &[],
528 &[RegionId::new(table_id, 2)],
529 &mut once
530 .iter_mut()
531 .map(|route| (route.region.id, route))
532 .collect(),
533 )
534 .unwrap();
535 let mut twice = once.clone();
536 rollback_group_metadata_routes(
537 group_id,
538 &sources,
539 &target_routes,
540 &[],
541 &[RegionId::new(table_id, 2)],
542 &mut twice
543 .iter_mut()
544 .map(|route| (route.region.id, route))
545 .collect(),
546 )
547 .unwrap();
548
549 assert_eq!(once, original_region_routes);
550 assert_eq!(once, twice);
551 }
552}