1use std::any::Any;
16
17use common_meta::key::table_route::PhysicalTableRouteValue;
18use common_procedure::{Context as ProcedureContext, Status};
19use common_telemetry::debug;
20use partition::collider::Collider;
21use partition::expr::PartitionExpr;
22use partition::subtask::{self, RepartitionSubtask};
23use serde::{Deserialize, Deserializer, Serialize};
24use snafu::{OptionExt, ResultExt, ensure};
25use tokio::time::Instant;
26use uuid::Uuid;
27
28use crate::error::{self, Result};
29use crate::procedure::repartition::allocate_region::AllocateRegion;
30use crate::procedure::repartition::plan::{AllocationPlanEntry, SourceRegionDescriptor};
31use crate::procedure::repartition::repartition_end::RepartitionEnd;
32use crate::procedure::repartition::update_partition_metadata::{
33 PartitionMetadataUpdate, UpdatePartitionMetadata,
34};
35use crate::procedure::repartition::{Context, State};
36
37#[derive(Debug, Clone, Serialize)]
38pub enum RepartitionFrom {
39 Partitioned {
40 exprs: Vec<PartitionExpr>,
41 target_partition_columns: Option<Vec<String>>,
47 },
48 Unpartitioned {
49 partition_columns: Vec<String>,
50 },
51}
52
53impl<'de> Deserialize<'de> for RepartitionFrom {
54 fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
55 where
56 D: Deserializer<'de>,
57 {
58 #[derive(Deserialize)]
59 enum CurrentRepartitionFrom {
60 Partitioned {
61 exprs: Vec<PartitionExpr>,
62 #[serde(default)]
63 target_partition_columns: Option<Vec<String>>,
64 },
65 Unpartitioned {
66 partition_columns: Vec<String>,
67 },
68 }
69
70 #[derive(Deserialize)]
71 #[serde(untagged)]
72 enum RepartitionFromRepr {
73 Current(CurrentRepartitionFrom),
74 Legacy(Vec<PartitionExpr>),
75 }
76
77 match RepartitionFromRepr::deserialize(deserializer)? {
78 RepartitionFromRepr::Current(CurrentRepartitionFrom::Partitioned {
79 exprs,
80 target_partition_columns,
81 }) => Ok(Self::Partitioned {
82 exprs,
83 target_partition_columns,
84 }),
85 RepartitionFromRepr::Current(CurrentRepartitionFrom::Unpartitioned {
86 partition_columns,
87 }) => Ok(Self::Unpartitioned { partition_columns }),
88 RepartitionFromRepr::Legacy(exprs) => Ok(Self::Partitioned {
89 exprs,
90 target_partition_columns: None,
91 }),
92 }
93 }
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct RepartitionStart {
98 #[serde(alias = "from_exprs")]
99 from: RepartitionFrom,
100 to_exprs: Vec<PartitionExpr>,
101}
102
103impl RepartitionStart {
104 pub fn new(from: RepartitionFrom, to_exprs: Vec<PartitionExpr>) -> Self {
105 Self { from, to_exprs }
106 }
107}
108
109#[async_trait::async_trait]
110#[typetag::serde]
111impl State for RepartitionStart {
112 async fn next(
113 &mut self,
114 ctx: &mut Context,
115 _: &ProcedureContext,
116 ) -> Result<(Box<dyn State>, Status)> {
117 ensure!(
118 !self.to_exprs.is_empty(),
119 error::InvalidArgumentsSnafu {
120 err_msg: "Repartition expects non-empty target partition expressions".to_string(),
121 }
122 );
123
124 let timer = Instant::now();
125 let (physical_table_id, table_route) = ctx
126 .table_metadata_manager
127 .table_route_manager()
128 .get_physical_table_route(ctx.persistent_ctx.table_id)
129 .await
130 .context(error::TableMetadataManagerSnafu)?;
131 let table_id = ctx.persistent_ctx.table_id;
132 ensure!(
133 physical_table_id == table_id,
134 error::UnexpectedSnafu {
135 violated: format!(
136 "Repartition only works on the physical table, but got logical table: {}, physical table id: {}",
137 table_id, physical_table_id
138 ),
139 }
140 );
141
142 let from_exprs = self.prepare_from(ctx).await?;
143 let plans = Self::build_plan(&table_route, from_exprs, &self.to_exprs)?;
144 let plan_count = plans.len();
145 let total_source_regions: usize = plans.iter().map(|p| p.source_regions.len()).sum();
146 let total_target_regions: usize =
147 plans.iter().map(|p| p.target_partition_exprs.len()).sum();
148 common_telemetry::info!(
149 "Repartition start, table_id: {}, plans: {}, total_source_regions: {}, total_target_regions: {}",
150 table_id,
151 plan_count,
152 total_source_regions,
153 total_target_regions
154 );
155
156 ctx.update_build_plan_elapsed(timer.elapsed());
157
158 if plans.is_empty() {
159 return Ok((Box::new(RepartitionEnd), Status::done()));
160 }
161
162 if ctx.persistent_ctx.partition_metadata_update.is_some() {
163 Ok((
164 Box::new(UpdatePartitionMetadata::new(plans)),
165 Status::executing(true),
166 ))
167 } else {
168 Ok((
169 Box::new(AllocateRegion::new(plans)),
170 Status::executing(false),
171 ))
172 }
173 }
174
175 fn as_any(&self) -> &dyn Any {
176 self
177 }
178}
179
180impl RepartitionStart {
181 async fn prepare_from<'a>(&'a self, ctx: &mut Context) -> Result<&'a [PartitionExpr]> {
182 match &self.from {
183 RepartitionFrom::Partitioned {
184 exprs,
185 target_partition_columns,
186 } => {
187 Self::prepare_partitioned(ctx, target_partition_columns.as_deref()).await?;
188 Ok(exprs)
189 }
190 RepartitionFrom::Unpartitioned { partition_columns } => {
191 Self::prepare_unpartitioned(ctx, partition_columns).await?;
192 Ok(&[])
193 }
194 }
195 }
196
197 async fn prepare_unpartitioned(ctx: &mut Context, partition_columns: &[String]) -> Result<()> {
198 if ctx.persistent_ctx.partition_metadata_update.is_some() {
199 return Ok(());
200 }
201
202 ensure!(
203 !partition_columns.is_empty(),
204 error::InvalidArgumentsSnafu {
205 err_msg: "Unpartitioned repartition expects non-empty partition columns"
206 .to_string(),
207 }
208 );
209
210 let table_info_value = ctx.get_table_info_value().await?;
211 ensure!(
212 table_info_value
213 .table_info
214 .meta
215 .partition_key_indices
216 .is_empty(),
217 error::InvalidArgumentsSnafu {
218 err_msg: format!(
219 "Unpartitioned repartition expects an unpartitioned table, but table {} has partition key indices: {:?}",
220 ctx.persistent_ctx.table_id,
221 table_info_value.table_info.meta.partition_key_indices
222 ),
223 }
224 );
225
226 let schema = &table_info_value.table_info.meta.schema;
227 let partition_key_indices = partition_columns
228 .iter()
229 .map(|column_name| {
230 schema.column_index_by_name(column_name).with_context(|| {
231 error::InvalidArgumentsSnafu {
232 err_msg: format!(
233 "Partition column {} not found in table {}",
234 column_name, ctx.persistent_ctx.table_id
235 ),
236 }
237 })
238 })
239 .collect::<Result<Vec<_>>>()?;
240 ctx.persistent_ctx.partition_metadata_update = Some(
241 PartitionMetadataUpdate::from_unpartitioned(partition_key_indices),
242 );
243
244 Ok(())
245 }
246
247 async fn prepare_partitioned(
248 ctx: &mut Context,
249 target_partition_columns: Option<&[String]>,
250 ) -> Result<()> {
251 let Some(target_partition_columns) = target_partition_columns else {
252 return Ok(());
253 };
254 if ctx.persistent_ctx.partition_metadata_update.is_some() {
255 return Ok(());
256 }
257
258 ensure!(
259 !target_partition_columns.is_empty(),
260 error::InvalidArgumentsSnafu {
261 err_msg: "Partitioned source expects non-empty target partition columns"
262 .to_string(),
263 }
264 );
265
266 let table_info_value = ctx.get_table_info_value().await?;
267 let schema = &table_info_value.table_info.meta.schema;
268 let target_partition_key_indices = target_partition_columns
269 .iter()
270 .map(|column_name| {
271 schema.column_index_by_name(column_name).with_context(|| {
272 error::InvalidArgumentsSnafu {
273 err_msg: format!(
274 "Target partition column {} not found in table {}",
275 column_name, ctx.persistent_ctx.table_id
276 ),
277 }
278 })
279 })
280 .collect::<Result<Vec<_>>>()?;
281 ctx.persistent_ctx.partition_metadata_update =
282 Some(PartitionMetadataUpdate::from_partitioned(
283 table_info_value.table_info.meta.partition_key_indices,
284 target_partition_key_indices,
285 ));
286
287 Ok(())
288 }
289
290 pub(crate) fn build_plan(
291 physical_route: &PhysicalTableRouteValue,
292 from_exprs: &[PartitionExpr],
293 to_exprs: &[PartitionExpr],
294 ) -> Result<Vec<AllocationPlanEntry>> {
295 let subtasks = if from_exprs.is_empty() {
296 Self::default_source_subtasks(to_exprs)?
297 } else {
298 subtask::create_subtasks(from_exprs, to_exprs)
299 .context(error::RepartitionCreateSubtasksSnafu)?
300 };
301 if subtasks.is_empty() {
302 return Ok(vec![]);
303 }
304
305 let src_descriptors = Self::source_region_descriptors(from_exprs, physical_route)?;
306 Ok(Self::build_plan_entries(
307 subtasks,
308 &src_descriptors,
309 to_exprs,
310 ))
311 }
312
313 fn build_plan_entries(
314 subtasks: Vec<RepartitionSubtask>,
315 source_index: &[SourceRegionDescriptor],
316 target_exprs: &[PartitionExpr],
317 ) -> Vec<AllocationPlanEntry> {
318 subtasks
319 .into_iter()
320 .map(|subtask| {
321 let group_id = Uuid::new_v4();
322 let source_regions = subtask
323 .from_expr_indices
324 .iter()
325 .map(|&idx| source_index[idx].clone())
326 .collect::<Vec<_>>();
327
328 let target_partition_exprs = subtask
329 .to_expr_indices
330 .iter()
331 .map(|&idx| target_exprs[idx].clone())
332 .collect::<Vec<_>>();
333 AllocationPlanEntry {
334 group_id,
335 source_regions,
336 target_partition_exprs,
337 transition_map: subtask.transition_map,
338 }
339 })
340 .collect::<Vec<_>>()
341 }
342
343 fn default_source_subtasks(to_exprs: &[PartitionExpr]) -> Result<Vec<RepartitionSubtask>> {
344 ensure!(
345 !to_exprs.is_empty(),
346 error::UnexpectedSnafu {
347 violated: "Default source repartition expects non-empty target partition exprs",
348 }
349 );
350
351 Collider::new(to_exprs).context(error::RepartitionCreateSubtasksSnafu)?;
352
353 let to_expr_indices = (0..to_exprs.len()).collect::<Vec<_>>();
354 Ok(vec![RepartitionSubtask {
355 from_expr_indices: vec![0],
356 to_expr_indices: to_expr_indices.clone(),
357 transition_map: vec![to_expr_indices],
358 }])
359 }
360
361 fn source_region_descriptors(
362 from_exprs: &[PartitionExpr],
363 physical_route: &PhysicalTableRouteValue,
364 ) -> Result<Vec<SourceRegionDescriptor>> {
365 if from_exprs.is_empty() {
366 return Self::default_source_region_descriptors(physical_route);
367 }
368
369 let existing_regions = physical_route
370 .region_routes
371 .iter()
372 .map(|route| (route.region.id, route.region.partition_expr()))
373 .collect::<Vec<_>>();
374
375 let descriptors = from_exprs
376 .iter()
377 .map(|expr| {
378 let expr_json = expr
379 .as_json_str()
380 .context(error::SerializePartitionExprSnafu)?;
381
382 let matched_region_id = existing_regions
383 .iter()
384 .find_map(|(region_id, existing_expr)| {
385 (existing_expr == &expr_json).then_some(*region_id)
386 })
387 .with_context(|| error::RepartitionSourceExprMismatchSnafu { expr: &expr_json })
388 .inspect_err(|_| {
389 debug!("Failed to find matching region for partition expression: {}, existing regions: {:?}", expr_json, existing_regions);
390 })?;
391
392 Ok(SourceRegionDescriptor::partitioned(
393 matched_region_id,
394 expr.clone(),
395 ))
396 })
397 .collect::<Result<Vec<_>>>()?;
398
399 Ok(descriptors)
400 }
401
402 fn default_source_region_descriptors(
403 physical_route: &PhysicalTableRouteValue,
404 ) -> Result<Vec<SourceRegionDescriptor>> {
405 ensure!(
406 physical_route.region_routes.len() == 1,
407 error::UnexpectedSnafu {
408 violated: format!(
409 "Default source repartition expects exactly one source region, but got {}",
410 physical_route.region_routes.len()
411 ),
412 }
413 );
414 let source_region = &physical_route.region_routes[0].region;
415 ensure!(
416 source_region.partition_expr().is_empty(),
417 error::UnexpectedSnafu {
418 violated: format!(
419 "Default source repartition expects an empty partition expr, but got {}",
420 source_region.partition_expr()
421 ),
422 }
423 );
424
425 Ok(vec![SourceRegionDescriptor::Default {
426 region_id: source_region.id,
427 }])
428 }
429}
430
431#[cfg(test)]
432mod tests {
433 use std::sync::Arc;
434
435 use common_meta::ddl::test_util::datanode_handler::NaiveDatanodeHandler;
436 use common_meta::key::table_route::PhysicalTableRouteValue;
437 use common_meta::peer::Peer;
438 use common_meta::rpc::router::{Region, RegionRoute};
439 use common_meta::test_util::MockDatanodeManager;
440 use datatypes::prelude::Value;
441 use partition::expr::{Operand, RestrictedOp};
442 use store_api::storage::RegionId;
443
444 use super::*;
445 use crate::procedure::repartition::test_util::{
446 TestingEnv, new_parent_context, range_expr, test_region_route, test_region_wal_options,
447 };
448
449 fn physical_route(region_routes: Vec<RegionRoute>) -> PhysicalTableRouteValue {
450 PhysicalTableRouteValue::new(region_routes)
451 }
452
453 async fn new_test_context(env: &TestingEnv, table_id: u32) -> Context {
454 env.create_physical_table_metadata_for_repartition(
455 table_id,
456 vec![test_region_route(RegionId::new(table_id, 1), "")],
457 test_region_wal_options(&[1]),
458 )
459 .await;
460 let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
461 new_parent_context(env, node_manager, table_id)
462 }
463
464 #[test]
465 fn test_build_plan_with_default_source_region() {
466 let table_id = 1024;
467 let physical_route =
468 physical_route(vec![test_region_route(RegionId::new(table_id, 1), "")]);
469 let to_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 100)];
470
471 let plans = RepartitionStart::build_plan(&physical_route, &[], &to_exprs).unwrap();
472
473 assert_eq!(plans.len(), 1);
474 let plan = &plans[0];
475 assert_eq!(
476 plan.source_regions,
477 vec![SourceRegionDescriptor::Default {
478 region_id: RegionId::new(table_id, 1)
479 }]
480 );
481 assert_eq!(plan.target_partition_exprs, to_exprs);
482 assert_eq!(plan.transition_map, vec![vec![0, 1]]);
483 }
484
485 #[test]
486 fn test_build_plan_with_default_source_rejects_non_empty_partition_expr() {
487 let table_id = 1024;
488 let physical_route = physical_route(vec![test_region_route(
489 RegionId::new(table_id, 1),
490 &range_expr("x", 0, 100).as_json_str().unwrap(),
491 )]);
492 let to_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 100)];
493
494 let err = RepartitionStart::build_plan(&physical_route, &[], &to_exprs).unwrap_err();
495
496 assert!(err.to_string().contains("empty partition expr"));
497 }
498
499 #[test]
500 fn test_build_plan_with_default_source_rejects_multiple_regions() {
501 let table_id = 1024;
502 let physical_route = physical_route(vec![
503 test_region_route(RegionId::new(table_id, 1), ""),
504 test_region_route(RegionId::new(table_id, 2), ""),
505 ]);
506 let to_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 100)];
507
508 let err = RepartitionStart::build_plan(&physical_route, &[], &to_exprs).unwrap_err();
509
510 assert!(err.to_string().contains("exactly one source region"));
511 }
512
513 #[test]
514 fn test_build_plan_with_default_source_rejects_empty_targets() {
515 let table_id = 1024;
516 let physical_route =
517 physical_route(vec![test_region_route(RegionId::new(table_id, 1), "")]);
518
519 let err = RepartitionStart::build_plan(&physical_route, &[], &[]).unwrap_err();
520
521 assert!(err.to_string().contains("non-empty target partition exprs"));
522 }
523
524 #[test]
525 fn test_build_plan_with_default_source_rejects_invalid_targets() {
526 let table_id = 1024;
527 let physical_route =
528 physical_route(vec![test_region_route(RegionId::new(table_id, 1), "")]);
529 let invalid_to_expr = PartitionExpr::new(
530 Operand::Value(Value::Int64(1)),
531 RestrictedOp::Eq,
532 Operand::Value(Value::Int64(2)),
533 );
534
535 let err =
536 RepartitionStart::build_plan(&physical_route, &[], &[invalid_to_expr]).unwrap_err();
537
538 assert!(
539 err.to_string()
540 .contains("Failed to create repartition subtasks")
541 );
542 }
543
544 #[test]
545 fn test_build_plan_keeps_partitioned_source_matching() {
546 let table_id = 1024;
547 let from_exprs = vec![range_expr("x", 0, 100)];
548 let to_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 100)];
549 let physical_route = physical_route(vec![RegionRoute {
550 region: Region {
551 id: RegionId::new(table_id, 1),
552 partition_expr: from_exprs[0].as_json_str().unwrap(),
553 ..Default::default()
554 },
555 leader_peer: Some(Peer::empty(1)),
556 ..Default::default()
557 }]);
558
559 let plans = RepartitionStart::build_plan(&physical_route, &from_exprs, &to_exprs).unwrap();
560
561 assert_eq!(plans.len(), 1);
562 assert_eq!(
563 plans[0].source_regions,
564 vec![SourceRegionDescriptor::partitioned(
565 RegionId::new(table_id, 1),
566 from_exprs[0].clone()
567 )]
568 );
569 }
570
571 #[test]
572 fn test_repartition_start_deserializes_legacy_from_exprs() {
573 let from_exprs = vec![range_expr("x", 0, 100)];
574 let to_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 100)];
575 let json = serde_json::json!({
576 "from_exprs": from_exprs,
577 "to_exprs": to_exprs,
578 })
579 .to_string();
580
581 let state: RepartitionStart = serde_json::from_str(&json).unwrap();
582
583 let RepartitionFrom::Partitioned {
584 exprs,
585 target_partition_columns,
586 } = state.from
587 else {
588 panic!("expected partition source");
589 };
590 assert_eq!(exprs, vec![range_expr("x", 0, 100)]);
591 assert!(target_partition_columns.is_none());
592 }
593
594 #[test]
595 fn test_repartition_start_deserializes_current_from() {
596 let state = RepartitionStart::new(
597 RepartitionFrom::Unpartitioned {
598 partition_columns: vec!["col1".to_string()],
599 },
600 vec![range_expr("col1", 0, 50)],
601 );
602 let json = serde_json::to_string(&state).unwrap();
603
604 let state: RepartitionStart = serde_json::from_str(&json).unwrap();
605
606 let RepartitionFrom::Unpartitioned { partition_columns } = state.from else {
607 panic!("expected unpartitioned source");
608 };
609 assert_eq!(partition_columns, vec!["col1"]);
610 }
611
612 #[tokio::test]
613 async fn test_partitioned_source_does_not_initialize_partition_metadata_update() {
614 let env = TestingEnv::new();
615 let table_id = 1024;
616 env.create_physical_table_metadata_for_repartition(
617 table_id,
618 vec![test_region_route(
619 RegionId::new(table_id, 1),
620 &range_expr("x", 0, 100).as_json_str().unwrap(),
621 )],
622 test_region_wal_options(&[1]),
623 )
624 .await;
625 let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
626 let mut ctx = new_parent_context(&env, node_manager, table_id);
627 let mut state = RepartitionStart::new(
628 RepartitionFrom::Partitioned {
629 exprs: vec![range_expr("x", 0, 100)],
630 target_partition_columns: None,
631 },
632 vec![range_expr("x", 0, 50), range_expr("x", 50, 100)],
633 );
634
635 let (next, status) = state
636 .next(&mut ctx, &TestingEnv::procedure_context())
637 .await
638 .unwrap();
639
640 assert!(!status.need_persist());
641 assert!(next.as_any().is::<AllocateRegion>());
642 assert!(ctx.persistent_ctx.partition_metadata_update.is_none());
643 }
644
645 #[tokio::test]
646 async fn test_partitioned_source_initializes_target_partition_metadata_update() {
647 let env = TestingEnv::new();
648 let table_id = 1024;
649 env.create_physical_table_metadata_for_repartition(
650 table_id,
651 vec![test_region_route(
652 RegionId::new(table_id, 1),
653 &range_expr("x", 0, 100).as_json_str().unwrap(),
654 )],
655 test_region_wal_options(&[1]),
656 )
657 .await;
658 let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
659 let mut ctx = new_parent_context(&env, node_manager, table_id);
660 let current = ctx.get_raw_table_info_value().await.unwrap();
661 let mut table_info = current.table_info.clone();
662 table_info.meta.partition_key_indices = vec![0];
663 ctx.update_table_info(¤t, current.update(table_info))
664 .await
665 .unwrap();
666 let mut state = RepartitionStart::new(
667 RepartitionFrom::Partitioned {
668 exprs: vec![range_expr("x", 0, 100)],
669 target_partition_columns: Some(vec!["col2".to_string(), "col1".to_string()]),
670 },
671 vec![range_expr("x", 0, 50), range_expr("x", 50, 100)],
672 );
673
674 let (next, status) = state
675 .next(&mut ctx, &TestingEnv::procedure_context())
676 .await
677 .unwrap();
678
679 assert!(status.need_persist());
680 assert!(next.as_any().is::<UpdatePartitionMetadata>());
681 let update = ctx
682 .persistent_ctx
683 .partition_metadata_update
684 .as_ref()
685 .unwrap();
686 assert_eq!(update.original_partition_key_indices, vec![0]);
687 assert_eq!(update.target_partition_key_indices, vec![2, 0]);
688 assert!(!update.expect_empty_partition_key_indices);
689 }
690
691 #[tokio::test]
692 async fn test_unpartitioned_source_initializes_partition_metadata_update() {
693 let env = TestingEnv::new();
694 let table_id = 1024;
695 let mut ctx = new_test_context(&env, table_id).await;
696 let mut state = RepartitionStart::new(
697 RepartitionFrom::Unpartitioned {
698 partition_columns: vec!["col2".to_string(), "col1".to_string()],
699 },
700 vec![range_expr("col2", 0, 50), range_expr("col2", 50, 100)],
701 );
702
703 let (next, status) = state
704 .next(&mut ctx, &TestingEnv::procedure_context())
705 .await
706 .unwrap();
707
708 assert!(status.need_persist());
709 assert!(next.as_any().is::<UpdatePartitionMetadata>());
710 assert_eq!(
711 ctx.persistent_ctx
712 .partition_metadata_update
713 .as_ref()
714 .unwrap()
715 .target_partition_key_indices,
716 vec![2, 0]
717 );
718 }
719
720 #[tokio::test]
721 async fn test_unpartitioned_source_rejects_existing_partition_metadata() {
722 let env = TestingEnv::new();
723 let table_id = 1024;
724 let mut ctx = new_test_context(&env, table_id).await;
725 let current = ctx.get_raw_table_info_value().await.unwrap();
726 let mut table_info = current.table_info.clone();
727 table_info.meta.partition_key_indices = vec![0];
728 ctx.update_table_info(¤t, current.update(table_info))
729 .await
730 .unwrap();
731 let mut state = RepartitionStart::new(
732 RepartitionFrom::Unpartitioned {
733 partition_columns: vec!["col1".to_string()],
734 },
735 vec![range_expr("col1", 0, 50)],
736 );
737
738 let err = state
739 .next(&mut ctx, &TestingEnv::procedure_context())
740 .await
741 .unwrap_err();
742
743 assert!(err.to_string().contains("expects an unpartitioned table"));
744 assert!(ctx.persistent_ctx.partition_metadata_update.is_none());
745 }
746
747 #[tokio::test]
748 async fn test_repartition_start_rejects_empty_target_partition_exprs() {
749 let env = TestingEnv::new();
750 let table_id = 1024;
751 let mut ctx = new_test_context(&env, table_id).await;
752 let mut state = RepartitionStart::new(
753 RepartitionFrom::Partitioned {
754 exprs: vec![],
755 target_partition_columns: None,
756 },
757 vec![],
758 );
759
760 let err = state
761 .next(&mut ctx, &TestingEnv::procedure_context())
762 .await
763 .unwrap_err();
764
765 assert!(
766 err.to_string()
767 .contains("non-empty target partition expressions")
768 );
769 }
770
771 #[tokio::test]
772 async fn test_unpartitioned_source_rejects_empty_target_partition_exprs() {
773 let env = TestingEnv::new();
774 let table_id = 1024;
775 let mut ctx = new_test_context(&env, table_id).await;
776 let mut state = RepartitionStart::new(
777 RepartitionFrom::Unpartitioned {
778 partition_columns: vec!["col1".to_string()],
779 },
780 vec![],
781 );
782
783 let err = state
784 .next(&mut ctx, &TestingEnv::procedure_context())
785 .await
786 .unwrap_err();
787
788 assert!(
789 err.to_string()
790 .contains("non-empty target partition expressions")
791 );
792 assert!(ctx.persistent_ctx.partition_metadata_update.is_none());
793 }
794
795 #[tokio::test]
796 async fn test_unpartitioned_source_rejects_empty_partition_columns() {
797 let env = TestingEnv::new();
798 let table_id = 1024;
799 let mut ctx = new_test_context(&env, table_id).await;
800 let mut state = RepartitionStart::new(
801 RepartitionFrom::Unpartitioned {
802 partition_columns: vec![],
803 },
804 vec![range_expr("col1", 0, 50)],
805 );
806
807 let err = state
808 .next(&mut ctx, &TestingEnv::procedure_context())
809 .await
810 .unwrap_err();
811
812 assert!(err.to_string().contains("non-empty partition columns"));
813 assert!(ctx.persistent_ctx.partition_metadata_update.is_none());
814 }
815
816 #[tokio::test]
817 async fn test_unpartitioned_source_rejects_missing_partition_column() {
818 let env = TestingEnv::new();
819 let table_id = 1024;
820 let mut ctx = new_test_context(&env, table_id).await;
821 let mut state = RepartitionStart::new(
822 RepartitionFrom::Unpartitioned {
823 partition_columns: vec!["missing_col".to_string()],
824 },
825 vec![range_expr("col1", 0, 50)],
826 );
827
828 let err = state
829 .next(&mut ctx, &TestingEnv::procedure_context())
830 .await
831 .unwrap_err();
832
833 assert!(
834 err.to_string()
835 .contains("Partition column missing_col not found")
836 );
837 assert!(ctx.persistent_ctx.partition_metadata_update.is_none());
838 }
839}