Skip to main content

meta_srv/procedure/repartition/
repartition_start.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use common_meta::key::table_route::PhysicalTableRouteValue;
18use common_procedure::{Context as ProcedureContext, Status};
19use common_telemetry::debug;
20use partition::collider::Collider;
21use partition::expr::PartitionExpr;
22use partition::subtask::{self, RepartitionSubtask};
23use serde::{Deserialize, Deserializer, Serialize};
24use snafu::{OptionExt, ResultExt, ensure};
25use tokio::time::Instant;
26use uuid::Uuid;
27
28use crate::error::{self, Result};
29use crate::procedure::repartition::allocate_region::AllocateRegion;
30use crate::procedure::repartition::plan::{AllocationPlanEntry, SourceRegionDescriptor};
31use crate::procedure::repartition::repartition_end::RepartitionEnd;
32use crate::procedure::repartition::update_partition_metadata::{
33    PartitionMetadataUpdate, UpdatePartitionMetadata,
34};
35use crate::procedure::repartition::{Context, State};
36
37#[derive(Debug, Clone, Serialize)]
38pub enum RepartitionFrom {
39    Partitioned {
40        exprs: Vec<PartitionExpr>,
41        /// Full target partition columns to overwrite table metadata.
42        ///
43        /// `None` means the repartition keeps using the current table
44        /// partition columns, so the procedure won't update
45        /// `partition_key_indices`.
46        target_partition_columns: Option<Vec<String>>,
47    },
48    Unpartitioned {
49        partition_columns: Vec<String>,
50    },
51}
52
53impl<'de> Deserialize<'de> for RepartitionFrom {
54    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
55    where
56        D: Deserializer<'de>,
57    {
58        #[derive(Deserialize)]
59        enum CurrentRepartitionFrom {
60            Partitioned {
61                exprs: Vec<PartitionExpr>,
62                #[serde(default)]
63                target_partition_columns: Option<Vec<String>>,
64            },
65            Unpartitioned {
66                partition_columns: Vec<String>,
67            },
68        }
69
70        #[derive(Deserialize)]
71        #[serde(untagged)]
72        enum RepartitionFromRepr {
73            Current(CurrentRepartitionFrom),
74            Legacy(Vec<PartitionExpr>),
75        }
76
77        match RepartitionFromRepr::deserialize(deserializer)? {
78            RepartitionFromRepr::Current(CurrentRepartitionFrom::Partitioned {
79                exprs,
80                target_partition_columns,
81            }) => Ok(Self::Partitioned {
82                exprs,
83                target_partition_columns,
84            }),
85            RepartitionFromRepr::Current(CurrentRepartitionFrom::Unpartitioned {
86                partition_columns,
87            }) => Ok(Self::Unpartitioned { partition_columns }),
88            RepartitionFromRepr::Legacy(exprs) => Ok(Self::Partitioned {
89                exprs,
90                target_partition_columns: None,
91            }),
92        }
93    }
94}
95
96#[derive(Debug, Clone, Serialize, Deserialize)]
97pub struct RepartitionStart {
98    #[serde(alias = "from_exprs")]
99    from: RepartitionFrom,
100    to_exprs: Vec<PartitionExpr>,
101}
102
103impl RepartitionStart {
104    pub fn new(from: RepartitionFrom, to_exprs: Vec<PartitionExpr>) -> Self {
105        Self { from, to_exprs }
106    }
107}
108
109#[async_trait::async_trait]
110#[typetag::serde]
111impl State for RepartitionStart {
112    async fn next(
113        &mut self,
114        ctx: &mut Context,
115        _: &ProcedureContext,
116    ) -> Result<(Box<dyn State>, Status)> {
117        ensure!(
118            !self.to_exprs.is_empty(),
119            error::InvalidArgumentsSnafu {
120                err_msg: "Repartition expects non-empty target partition expressions".to_string(),
121            }
122        );
123
124        let timer = Instant::now();
125        let (physical_table_id, table_route) = ctx
126            .table_metadata_manager
127            .table_route_manager()
128            .get_physical_table_route(ctx.persistent_ctx.table_id)
129            .await
130            .context(error::TableMetadataManagerSnafu)?;
131        let table_id = ctx.persistent_ctx.table_id;
132        ensure!(
133            physical_table_id == table_id,
134            error::UnexpectedSnafu {
135                violated: format!(
136                    "Repartition only works on the physical table, but got logical table: {}, physical table id: {}",
137                    table_id, physical_table_id
138                ),
139            }
140        );
141
142        let from_exprs = self.prepare_from(ctx).await?;
143        let plans = Self::build_plan(&table_route, from_exprs, &self.to_exprs)?;
144        let plan_count = plans.len();
145        let total_source_regions: usize = plans.iter().map(|p| p.source_regions.len()).sum();
146        let total_target_regions: usize =
147            plans.iter().map(|p| p.target_partition_exprs.len()).sum();
148        common_telemetry::info!(
149            "Repartition start, table_id: {}, plans: {}, total_source_regions: {}, total_target_regions: {}",
150            table_id,
151            plan_count,
152            total_source_regions,
153            total_target_regions
154        );
155
156        ctx.update_build_plan_elapsed(timer.elapsed());
157
158        if plans.is_empty() {
159            return Ok((Box::new(RepartitionEnd), Status::done()));
160        }
161
162        if ctx.persistent_ctx.partition_metadata_update.is_some() {
163            Ok((
164                Box::new(UpdatePartitionMetadata::new(plans)),
165                Status::executing(true),
166            ))
167        } else {
168            Ok((
169                Box::new(AllocateRegion::new(plans)),
170                Status::executing(false),
171            ))
172        }
173    }
174
175    fn as_any(&self) -> &dyn Any {
176        self
177    }
178}
179
180impl RepartitionStart {
181    async fn prepare_from<'a>(&'a self, ctx: &mut Context) -> Result<&'a [PartitionExpr]> {
182        match &self.from {
183            RepartitionFrom::Partitioned {
184                exprs,
185                target_partition_columns,
186            } => {
187                Self::prepare_partitioned(ctx, target_partition_columns.as_deref()).await?;
188                Ok(exprs)
189            }
190            RepartitionFrom::Unpartitioned { partition_columns } => {
191                Self::prepare_unpartitioned(ctx, partition_columns).await?;
192                Ok(&[])
193            }
194        }
195    }
196
197    async fn prepare_unpartitioned(ctx: &mut Context, partition_columns: &[String]) -> Result<()> {
198        if ctx.persistent_ctx.partition_metadata_update.is_some() {
199            return Ok(());
200        }
201
202        ensure!(
203            !partition_columns.is_empty(),
204            error::InvalidArgumentsSnafu {
205                err_msg: "Unpartitioned repartition expects non-empty partition columns"
206                    .to_string(),
207            }
208        );
209
210        let table_info_value = ctx.get_table_info_value().await?;
211        ensure!(
212            table_info_value
213                .table_info
214                .meta
215                .partition_key_indices
216                .is_empty(),
217            error::InvalidArgumentsSnafu {
218                err_msg: format!(
219                    "Unpartitioned repartition expects an unpartitioned table, but table {} has partition key indices: {:?}",
220                    ctx.persistent_ctx.table_id,
221                    table_info_value.table_info.meta.partition_key_indices
222                ),
223            }
224        );
225
226        let schema = &table_info_value.table_info.meta.schema;
227        let partition_key_indices = partition_columns
228            .iter()
229            .map(|column_name| {
230                schema.column_index_by_name(column_name).with_context(|| {
231                    error::InvalidArgumentsSnafu {
232                        err_msg: format!(
233                            "Partition column {} not found in table {}",
234                            column_name, ctx.persistent_ctx.table_id
235                        ),
236                    }
237                })
238            })
239            .collect::<Result<Vec<_>>>()?;
240        ctx.persistent_ctx.partition_metadata_update = Some(
241            PartitionMetadataUpdate::from_unpartitioned(partition_key_indices),
242        );
243
244        Ok(())
245    }
246
247    async fn prepare_partitioned(
248        ctx: &mut Context,
249        target_partition_columns: Option<&[String]>,
250    ) -> Result<()> {
251        let Some(target_partition_columns) = target_partition_columns else {
252            return Ok(());
253        };
254        if ctx.persistent_ctx.partition_metadata_update.is_some() {
255            return Ok(());
256        }
257
258        ensure!(
259            !target_partition_columns.is_empty(),
260            error::InvalidArgumentsSnafu {
261                err_msg: "Partitioned source expects non-empty target partition columns"
262                    .to_string(),
263            }
264        );
265
266        let table_info_value = ctx.get_table_info_value().await?;
267        let schema = &table_info_value.table_info.meta.schema;
268        let target_partition_key_indices = target_partition_columns
269            .iter()
270            .map(|column_name| {
271                schema.column_index_by_name(column_name).with_context(|| {
272                    error::InvalidArgumentsSnafu {
273                        err_msg: format!(
274                            "Target partition column {} not found in table {}",
275                            column_name, ctx.persistent_ctx.table_id
276                        ),
277                    }
278                })
279            })
280            .collect::<Result<Vec<_>>>()?;
281        ctx.persistent_ctx.partition_metadata_update =
282            Some(PartitionMetadataUpdate::from_partitioned(
283                table_info_value.table_info.meta.partition_key_indices,
284                target_partition_key_indices,
285            ));
286
287        Ok(())
288    }
289
290    pub(crate) fn build_plan(
291        physical_route: &PhysicalTableRouteValue,
292        from_exprs: &[PartitionExpr],
293        to_exprs: &[PartitionExpr],
294    ) -> Result<Vec<AllocationPlanEntry>> {
295        let subtasks = if from_exprs.is_empty() {
296            Self::default_source_subtasks(to_exprs)?
297        } else {
298            subtask::create_subtasks(from_exprs, to_exprs)
299                .context(error::RepartitionCreateSubtasksSnafu)?
300        };
301        if subtasks.is_empty() {
302            return Ok(vec![]);
303        }
304
305        let src_descriptors = Self::source_region_descriptors(from_exprs, physical_route)?;
306        Ok(Self::build_plan_entries(
307            subtasks,
308            &src_descriptors,
309            to_exprs,
310        ))
311    }
312
313    fn build_plan_entries(
314        subtasks: Vec<RepartitionSubtask>,
315        source_index: &[SourceRegionDescriptor],
316        target_exprs: &[PartitionExpr],
317    ) -> Vec<AllocationPlanEntry> {
318        subtasks
319            .into_iter()
320            .map(|subtask| {
321                let group_id = Uuid::new_v4();
322                let source_regions = subtask
323                    .from_expr_indices
324                    .iter()
325                    .map(|&idx| source_index[idx].clone())
326                    .collect::<Vec<_>>();
327
328                let target_partition_exprs = subtask
329                    .to_expr_indices
330                    .iter()
331                    .map(|&idx| target_exprs[idx].clone())
332                    .collect::<Vec<_>>();
333                AllocationPlanEntry {
334                    group_id,
335                    source_regions,
336                    target_partition_exprs,
337                    transition_map: subtask.transition_map,
338                }
339            })
340            .collect::<Vec<_>>()
341    }
342
343    fn default_source_subtasks(to_exprs: &[PartitionExpr]) -> Result<Vec<RepartitionSubtask>> {
344        ensure!(
345            !to_exprs.is_empty(),
346            error::UnexpectedSnafu {
347                violated: "Default source repartition expects non-empty target partition exprs",
348            }
349        );
350
351        Collider::new(to_exprs).context(error::RepartitionCreateSubtasksSnafu)?;
352
353        let to_expr_indices = (0..to_exprs.len()).collect::<Vec<_>>();
354        Ok(vec![RepartitionSubtask {
355            from_expr_indices: vec![0],
356            to_expr_indices: to_expr_indices.clone(),
357            transition_map: vec![to_expr_indices],
358        }])
359    }
360
361    fn source_region_descriptors(
362        from_exprs: &[PartitionExpr],
363        physical_route: &PhysicalTableRouteValue,
364    ) -> Result<Vec<SourceRegionDescriptor>> {
365        if from_exprs.is_empty() {
366            return Self::default_source_region_descriptors(physical_route);
367        }
368
369        let existing_regions = physical_route
370            .region_routes
371            .iter()
372            .map(|route| (route.region.id, route.region.partition_expr()))
373            .collect::<Vec<_>>();
374
375        let descriptors = from_exprs
376            .iter()
377            .map(|expr| {
378                let expr_json = expr
379                    .as_json_str()
380                    .context(error::SerializePartitionExprSnafu)?;
381
382                let matched_region_id = existing_regions
383                    .iter()
384                    .find_map(|(region_id, existing_expr)| {
385                        (existing_expr == &expr_json).then_some(*region_id)
386                    })
387                    .with_context(|| error::RepartitionSourceExprMismatchSnafu { expr: &expr_json })
388                    .inspect_err(|_| {
389                        debug!("Failed to find matching region for partition expression: {}, existing regions: {:?}", expr_json, existing_regions);
390                    })?;
391
392                Ok(SourceRegionDescriptor::partitioned(
393                    matched_region_id,
394                    expr.clone(),
395                ))
396            })
397            .collect::<Result<Vec<_>>>()?;
398
399        Ok(descriptors)
400    }
401
402    fn default_source_region_descriptors(
403        physical_route: &PhysicalTableRouteValue,
404    ) -> Result<Vec<SourceRegionDescriptor>> {
405        ensure!(
406            physical_route.region_routes.len() == 1,
407            error::UnexpectedSnafu {
408                violated: format!(
409                    "Default source repartition expects exactly one source region, but got {}",
410                    physical_route.region_routes.len()
411                ),
412            }
413        );
414        let source_region = &physical_route.region_routes[0].region;
415        ensure!(
416            source_region.partition_expr().is_empty(),
417            error::UnexpectedSnafu {
418                violated: format!(
419                    "Default source repartition expects an empty partition expr, but got {}",
420                    source_region.partition_expr()
421                ),
422            }
423        );
424
425        Ok(vec![SourceRegionDescriptor::Default {
426            region_id: source_region.id,
427        }])
428    }
429}
430
431#[cfg(test)]
432mod tests {
433    use std::sync::Arc;
434
435    use common_meta::ddl::test_util::datanode_handler::NaiveDatanodeHandler;
436    use common_meta::key::table_route::PhysicalTableRouteValue;
437    use common_meta::peer::Peer;
438    use common_meta::rpc::router::{Region, RegionRoute};
439    use common_meta::test_util::MockDatanodeManager;
440    use datatypes::prelude::Value;
441    use partition::expr::{Operand, RestrictedOp};
442    use store_api::storage::RegionId;
443
444    use super::*;
445    use crate::procedure::repartition::test_util::{
446        TestingEnv, new_parent_context, range_expr, test_region_route, test_region_wal_options,
447    };
448
449    fn physical_route(region_routes: Vec<RegionRoute>) -> PhysicalTableRouteValue {
450        PhysicalTableRouteValue::new(region_routes)
451    }
452
453    async fn new_test_context(env: &TestingEnv, table_id: u32) -> Context {
454        env.create_physical_table_metadata_for_repartition(
455            table_id,
456            vec![test_region_route(RegionId::new(table_id, 1), "")],
457            test_region_wal_options(&[1]),
458        )
459        .await;
460        let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
461        new_parent_context(env, node_manager, table_id)
462    }
463
464    #[test]
465    fn test_build_plan_with_default_source_region() {
466        let table_id = 1024;
467        let physical_route =
468            physical_route(vec![test_region_route(RegionId::new(table_id, 1), "")]);
469        let to_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 100)];
470
471        let plans = RepartitionStart::build_plan(&physical_route, &[], &to_exprs).unwrap();
472
473        assert_eq!(plans.len(), 1);
474        let plan = &plans[0];
475        assert_eq!(
476            plan.source_regions,
477            vec![SourceRegionDescriptor::Default {
478                region_id: RegionId::new(table_id, 1)
479            }]
480        );
481        assert_eq!(plan.target_partition_exprs, to_exprs);
482        assert_eq!(plan.transition_map, vec![vec![0, 1]]);
483    }
484
485    #[test]
486    fn test_build_plan_with_default_source_rejects_non_empty_partition_expr() {
487        let table_id = 1024;
488        let physical_route = physical_route(vec![test_region_route(
489            RegionId::new(table_id, 1),
490            &range_expr("x", 0, 100).as_json_str().unwrap(),
491        )]);
492        let to_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 100)];
493
494        let err = RepartitionStart::build_plan(&physical_route, &[], &to_exprs).unwrap_err();
495
496        assert!(err.to_string().contains("empty partition expr"));
497    }
498
499    #[test]
500    fn test_build_plan_with_default_source_rejects_multiple_regions() {
501        let table_id = 1024;
502        let physical_route = physical_route(vec![
503            test_region_route(RegionId::new(table_id, 1), ""),
504            test_region_route(RegionId::new(table_id, 2), ""),
505        ]);
506        let to_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 100)];
507
508        let err = RepartitionStart::build_plan(&physical_route, &[], &to_exprs).unwrap_err();
509
510        assert!(err.to_string().contains("exactly one source region"));
511    }
512
513    #[test]
514    fn test_build_plan_with_default_source_rejects_empty_targets() {
515        let table_id = 1024;
516        let physical_route =
517            physical_route(vec![test_region_route(RegionId::new(table_id, 1), "")]);
518
519        let err = RepartitionStart::build_plan(&physical_route, &[], &[]).unwrap_err();
520
521        assert!(err.to_string().contains("non-empty target partition exprs"));
522    }
523
524    #[test]
525    fn test_build_plan_with_default_source_rejects_invalid_targets() {
526        let table_id = 1024;
527        let physical_route =
528            physical_route(vec![test_region_route(RegionId::new(table_id, 1), "")]);
529        let invalid_to_expr = PartitionExpr::new(
530            Operand::Value(Value::Int64(1)),
531            RestrictedOp::Eq,
532            Operand::Value(Value::Int64(2)),
533        );
534
535        let err =
536            RepartitionStart::build_plan(&physical_route, &[], &[invalid_to_expr]).unwrap_err();
537
538        assert!(
539            err.to_string()
540                .contains("Failed to create repartition subtasks")
541        );
542    }
543
544    #[test]
545    fn test_build_plan_keeps_partitioned_source_matching() {
546        let table_id = 1024;
547        let from_exprs = vec![range_expr("x", 0, 100)];
548        let to_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 100)];
549        let physical_route = physical_route(vec![RegionRoute {
550            region: Region {
551                id: RegionId::new(table_id, 1),
552                partition_expr: from_exprs[0].as_json_str().unwrap(),
553                ..Default::default()
554            },
555            leader_peer: Some(Peer::empty(1)),
556            ..Default::default()
557        }]);
558
559        let plans = RepartitionStart::build_plan(&physical_route, &from_exprs, &to_exprs).unwrap();
560
561        assert_eq!(plans.len(), 1);
562        assert_eq!(
563            plans[0].source_regions,
564            vec![SourceRegionDescriptor::partitioned(
565                RegionId::new(table_id, 1),
566                from_exprs[0].clone()
567            )]
568        );
569    }
570
571    #[test]
572    fn test_repartition_start_deserializes_legacy_from_exprs() {
573        let from_exprs = vec![range_expr("x", 0, 100)];
574        let to_exprs = vec![range_expr("x", 0, 50), range_expr("x", 50, 100)];
575        let json = serde_json::json!({
576            "from_exprs": from_exprs,
577            "to_exprs": to_exprs,
578        })
579        .to_string();
580
581        let state: RepartitionStart = serde_json::from_str(&json).unwrap();
582
583        let RepartitionFrom::Partitioned {
584            exprs,
585            target_partition_columns,
586        } = state.from
587        else {
588            panic!("expected partition source");
589        };
590        assert_eq!(exprs, vec![range_expr("x", 0, 100)]);
591        assert!(target_partition_columns.is_none());
592    }
593
594    #[test]
595    fn test_repartition_start_deserializes_current_from() {
596        let state = RepartitionStart::new(
597            RepartitionFrom::Unpartitioned {
598                partition_columns: vec!["col1".to_string()],
599            },
600            vec![range_expr("col1", 0, 50)],
601        );
602        let json = serde_json::to_string(&state).unwrap();
603
604        let state: RepartitionStart = serde_json::from_str(&json).unwrap();
605
606        let RepartitionFrom::Unpartitioned { partition_columns } = state.from else {
607            panic!("expected unpartitioned source");
608        };
609        assert_eq!(partition_columns, vec!["col1"]);
610    }
611
612    #[tokio::test]
613    async fn test_partitioned_source_does_not_initialize_partition_metadata_update() {
614        let env = TestingEnv::new();
615        let table_id = 1024;
616        env.create_physical_table_metadata_for_repartition(
617            table_id,
618            vec![test_region_route(
619                RegionId::new(table_id, 1),
620                &range_expr("x", 0, 100).as_json_str().unwrap(),
621            )],
622            test_region_wal_options(&[1]),
623        )
624        .await;
625        let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
626        let mut ctx = new_parent_context(&env, node_manager, table_id);
627        let mut state = RepartitionStart::new(
628            RepartitionFrom::Partitioned {
629                exprs: vec![range_expr("x", 0, 100)],
630                target_partition_columns: None,
631            },
632            vec![range_expr("x", 0, 50), range_expr("x", 50, 100)],
633        );
634
635        let (next, status) = state
636            .next(&mut ctx, &TestingEnv::procedure_context())
637            .await
638            .unwrap();
639
640        assert!(!status.need_persist());
641        assert!(next.as_any().is::<AllocateRegion>());
642        assert!(ctx.persistent_ctx.partition_metadata_update.is_none());
643    }
644
645    #[tokio::test]
646    async fn test_partitioned_source_initializes_target_partition_metadata_update() {
647        let env = TestingEnv::new();
648        let table_id = 1024;
649        env.create_physical_table_metadata_for_repartition(
650            table_id,
651            vec![test_region_route(
652                RegionId::new(table_id, 1),
653                &range_expr("x", 0, 100).as_json_str().unwrap(),
654            )],
655            test_region_wal_options(&[1]),
656        )
657        .await;
658        let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
659        let mut ctx = new_parent_context(&env, node_manager, table_id);
660        let current = ctx.get_raw_table_info_value().await.unwrap();
661        let mut table_info = current.table_info.clone();
662        table_info.meta.partition_key_indices = vec![0];
663        ctx.update_table_info(&current, current.update(table_info))
664            .await
665            .unwrap();
666        let mut state = RepartitionStart::new(
667            RepartitionFrom::Partitioned {
668                exprs: vec![range_expr("x", 0, 100)],
669                target_partition_columns: Some(vec!["col2".to_string(), "col1".to_string()]),
670            },
671            vec![range_expr("x", 0, 50), range_expr("x", 50, 100)],
672        );
673
674        let (next, status) = state
675            .next(&mut ctx, &TestingEnv::procedure_context())
676            .await
677            .unwrap();
678
679        assert!(status.need_persist());
680        assert!(next.as_any().is::<UpdatePartitionMetadata>());
681        let update = ctx
682            .persistent_ctx
683            .partition_metadata_update
684            .as_ref()
685            .unwrap();
686        assert_eq!(update.original_partition_key_indices, vec![0]);
687        assert_eq!(update.target_partition_key_indices, vec![2, 0]);
688        assert!(!update.expect_empty_partition_key_indices);
689    }
690
691    #[tokio::test]
692    async fn test_unpartitioned_source_initializes_partition_metadata_update() {
693        let env = TestingEnv::new();
694        let table_id = 1024;
695        let mut ctx = new_test_context(&env, table_id).await;
696        let mut state = RepartitionStart::new(
697            RepartitionFrom::Unpartitioned {
698                partition_columns: vec!["col2".to_string(), "col1".to_string()],
699            },
700            vec![range_expr("col2", 0, 50), range_expr("col2", 50, 100)],
701        );
702
703        let (next, status) = state
704            .next(&mut ctx, &TestingEnv::procedure_context())
705            .await
706            .unwrap();
707
708        assert!(status.need_persist());
709        assert!(next.as_any().is::<UpdatePartitionMetadata>());
710        assert_eq!(
711            ctx.persistent_ctx
712                .partition_metadata_update
713                .as_ref()
714                .unwrap()
715                .target_partition_key_indices,
716            vec![2, 0]
717        );
718    }
719
720    #[tokio::test]
721    async fn test_unpartitioned_source_rejects_existing_partition_metadata() {
722        let env = TestingEnv::new();
723        let table_id = 1024;
724        let mut ctx = new_test_context(&env, table_id).await;
725        let current = ctx.get_raw_table_info_value().await.unwrap();
726        let mut table_info = current.table_info.clone();
727        table_info.meta.partition_key_indices = vec![0];
728        ctx.update_table_info(&current, current.update(table_info))
729            .await
730            .unwrap();
731        let mut state = RepartitionStart::new(
732            RepartitionFrom::Unpartitioned {
733                partition_columns: vec!["col1".to_string()],
734            },
735            vec![range_expr("col1", 0, 50)],
736        );
737
738        let err = state
739            .next(&mut ctx, &TestingEnv::procedure_context())
740            .await
741            .unwrap_err();
742
743        assert!(err.to_string().contains("expects an unpartitioned table"));
744        assert!(ctx.persistent_ctx.partition_metadata_update.is_none());
745    }
746
747    #[tokio::test]
748    async fn test_repartition_start_rejects_empty_target_partition_exprs() {
749        let env = TestingEnv::new();
750        let table_id = 1024;
751        let mut ctx = new_test_context(&env, table_id).await;
752        let mut state = RepartitionStart::new(
753            RepartitionFrom::Partitioned {
754                exprs: vec![],
755                target_partition_columns: None,
756            },
757            vec![],
758        );
759
760        let err = state
761            .next(&mut ctx, &TestingEnv::procedure_context())
762            .await
763            .unwrap_err();
764
765        assert!(
766            err.to_string()
767                .contains("non-empty target partition expressions")
768        );
769    }
770
771    #[tokio::test]
772    async fn test_unpartitioned_source_rejects_empty_target_partition_exprs() {
773        let env = TestingEnv::new();
774        let table_id = 1024;
775        let mut ctx = new_test_context(&env, table_id).await;
776        let mut state = RepartitionStart::new(
777            RepartitionFrom::Unpartitioned {
778                partition_columns: vec!["col1".to_string()],
779            },
780            vec![],
781        );
782
783        let err = state
784            .next(&mut ctx, &TestingEnv::procedure_context())
785            .await
786            .unwrap_err();
787
788        assert!(
789            err.to_string()
790                .contains("non-empty target partition expressions")
791        );
792        assert!(ctx.persistent_ctx.partition_metadata_update.is_none());
793    }
794
795    #[tokio::test]
796    async fn test_unpartitioned_source_rejects_empty_partition_columns() {
797        let env = TestingEnv::new();
798        let table_id = 1024;
799        let mut ctx = new_test_context(&env, table_id).await;
800        let mut state = RepartitionStart::new(
801            RepartitionFrom::Unpartitioned {
802                partition_columns: vec![],
803            },
804            vec![range_expr("col1", 0, 50)],
805        );
806
807        let err = state
808            .next(&mut ctx, &TestingEnv::procedure_context())
809            .await
810            .unwrap_err();
811
812        assert!(err.to_string().contains("non-empty partition columns"));
813        assert!(ctx.persistent_ctx.partition_metadata_update.is_none());
814    }
815
816    #[tokio::test]
817    async fn test_unpartitioned_source_rejects_missing_partition_column() {
818        let env = TestingEnv::new();
819        let table_id = 1024;
820        let mut ctx = new_test_context(&env, table_id).await;
821        let mut state = RepartitionStart::new(
822            RepartitionFrom::Unpartitioned {
823                partition_columns: vec!["missing_col".to_string()],
824            },
825            vec![range_expr("col1", 0, 50)],
826        );
827
828        let err = state
829            .next(&mut ctx, &TestingEnv::procedure_context())
830            .await
831            .unwrap_err();
832
833        assert!(
834            err.to_string()
835                .contains("Partition column missing_col not found")
836        );
837        assert!(ctx.persistent_ctx.partition_metadata_update.is_none());
838    }
839}