Skip to main content

meta_srv/procedure/
repartition.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod allocate_region;
16pub mod collect;
17pub mod deallocate_region;
18pub mod dispatch;
19pub mod group;
20pub mod plan;
21pub mod repartition_end;
22pub mod repartition_start;
23pub mod update_partition_metadata;
24pub mod utils;
25
26use std::any::Any;
27use std::collections::{HashMap, HashSet};
28use std::fmt::{Debug, Display};
29use std::time::{Duration, Instant};
30
31use common_error::ext::BoxedError;
32use common_meta::cache_invalidator::CacheInvalidatorRef;
33use common_meta::ddl::DdlContext;
34use common_meta::ddl::allocator::region_routes::RegionRoutesAllocatorRef;
35use common_meta::ddl::allocator::wal_options::WalOptionsAllocatorRef;
36use common_meta::ddl_manager::{RepartitionProcedureFactory, RepartitionSource};
37use common_meta::instruction::CacheIdent;
38use common_meta::key::datanode_table::RegionInfo;
39use common_meta::key::table_info::TableInfoValue;
40use common_meta::key::table_route::TableRouteValue;
41use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
42use common_meta::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
43use common_meta::node_manager::NodeManagerRef;
44use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard};
45use common_meta::region_registry::LeaderRegionRegistryRef;
46use common_meta::rpc::router::{RegionRoute, operating_leader_region_roles};
47use common_meta::wal_provider::RegionWalOptions;
48use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
49use common_procedure::{
50    BoxedProcedure, Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
51    ProcedureManagerRef, Result as ProcedureResult, Status, StringKey, UserMetadata,
52};
53use common_telemetry::{error, info, warn};
54use partition::expr::PartitionExpr;
55use serde::{Deserialize, Serialize};
56use snafu::{OptionExt, ResultExt};
57use store_api::storage::TableId;
58use table::table_name::TableName;
59
60use crate::error::{self, Result};
61use crate::procedure::repartition::collect::ProcedureMeta;
62use crate::procedure::repartition::deallocate_region::DeallocateRegion;
63use crate::procedure::repartition::group::{
64    Context as RepartitionGroupContext, RepartitionGroupProcedure, region_routes,
65};
66use crate::procedure::repartition::plan::RepartitionPlanEntry;
67use crate::procedure::repartition::repartition_start::{RepartitionFrom, RepartitionStart};
68use crate::procedure::repartition::update_partition_metadata::PartitionMetadataUpdate;
69use crate::procedure::repartition::utils::{
70    get_datanode_table_value, rollback_group_metadata_routes,
71};
72use crate::service::mailbox::MailboxRef;
73
74#[cfg(test)]
75pub mod test_util;
76
77#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
78pub struct PersistentContext {
79    pub catalog_name: String,
80    pub schema_name: String,
81    pub table_name: String,
82    pub table_id: TableId,
83    pub plans: Vec<RepartitionPlanEntry>,
84    #[serde(default)]
85    /// Records failed sub-procedures for parent rollback selection.
86    ///
87    /// The parent repartition procedure uses these entries to decide which plans
88    /// require group-metadata restoration and allocated-region cleanup.
89    pub failed_procedures: Vec<ProcedureMeta>,
90    #[serde(default)]
91    /// Records unknown sub-procedures for parent rollback selection.
92    ///
93    /// Unknown procedures are treated the same as failed ones when selecting the
94    /// plan subset that must be rolled back by the parent procedure.
95    pub unknown_procedures: Vec<ProcedureMeta>,
96    /// The timeout for repartition operations.
97    #[serde(with = "humantime_serde", default = "default_timeout")]
98    pub timeout: Duration,
99    #[serde(default)]
100    /// Records table-level partition metadata updated by this repartition.
101    pub partition_metadata_update: Option<PartitionMetadataUpdate>,
102}
103
104fn default_timeout() -> Duration {
105    Duration::from_mins(2)
106}
107
108impl PersistentContext {
109    /// Creates a new [PersistentContext] with the given table name, table id and timeout.
110    ///
111    /// If the timeout is not provided, the default timeout will be used.
112    pub fn new(
113        TableName {
114            catalog_name,
115            schema_name,
116            table_name,
117        }: TableName,
118        table_id: TableId,
119        timeout: Option<Duration>,
120    ) -> Self {
121        Self {
122            catalog_name,
123            schema_name,
124            table_name,
125            table_id,
126            plans: vec![],
127            failed_procedures: vec![],
128            unknown_procedures: vec![],
129            timeout: timeout.unwrap_or_else(default_timeout),
130            partition_metadata_update: None,
131        }
132    }
133
134    pub fn lock_key(&self) -> Vec<StringKey> {
135        vec![
136            CatalogLock::Read(&self.catalog_name).into(),
137            SchemaLock::read(&self.catalog_name, &self.schema_name).into(),
138            TableLock::Write(self.table_id).into(),
139            TableNameLock::new(&self.catalog_name, &self.schema_name, &self.table_name).into(),
140        ]
141    }
142}
143
144#[derive(Clone)]
145pub struct Context {
146    pub persistent_ctx: PersistentContext,
147    pub volatile_ctx: VolatileContext,
148    pub table_metadata_manager: TableMetadataManagerRef,
149    pub memory_region_keeper: MemoryRegionKeeperRef,
150    pub node_manager: NodeManagerRef,
151    pub leader_region_registry: LeaderRegionRegistryRef,
152    pub mailbox: MailboxRef,
153    pub server_addr: String,
154    pub cache_invalidator: CacheInvalidatorRef,
155    pub region_routes_allocator: RegionRoutesAllocatorRef,
156    pub wal_options_allocator: WalOptionsAllocatorRef,
157    pub start_time: Instant,
158}
159
160#[derive(Debug, Clone, Default)]
161pub struct VolatileContext {
162    pub metrics: Metrics,
163    pub dispatch_start_time: Option<Instant>,
164}
165
166/// Metrics of repartition.
167#[derive(Debug, Clone, Default)]
168pub struct Metrics {
169    /// Elapsed time of building plan.
170    build_plan_elapsed: Duration,
171    /// Elapsed time of allocating region.
172    allocate_region_elapsed: Duration,
173    /// Elapsed time of finishing groups.
174    finish_groups_elapsed: Duration,
175    /// Elapsed time of deallocating region.
176    deallocate_region_elapsed: Duration,
177}
178
179impl Display for Metrics {
180    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
181        let total = self.build_plan_elapsed
182            + self.allocate_region_elapsed
183            + self.finish_groups_elapsed
184            + self.deallocate_region_elapsed;
185        write!(f, "total: {:?}", total)?;
186        let mut parts = Vec::with_capacity(4);
187        if self.build_plan_elapsed > Duration::ZERO {
188            parts.push(format!("build_plan_elapsed: {:?}", self.build_plan_elapsed));
189        }
190        if self.allocate_region_elapsed > Duration::ZERO {
191            parts.push(format!(
192                "allocate_region_elapsed: {:?}",
193                self.allocate_region_elapsed
194            ));
195        }
196        if self.finish_groups_elapsed > Duration::ZERO {
197            parts.push(format!(
198                "finish_groups_elapsed: {:?}",
199                self.finish_groups_elapsed
200            ));
201        }
202        if self.deallocate_region_elapsed > Duration::ZERO {
203            parts.push(format!(
204                "deallocate_region_elapsed: {:?}",
205                self.deallocate_region_elapsed
206            ));
207        }
208
209        if !parts.is_empty() {
210            write!(f, ", {}", parts.join(", "))?;
211        }
212        Ok(())
213    }
214}
215
216impl Metrics {
217    /// Updates the elapsed time of building plan.
218    pub fn update_build_plan_elapsed(&mut self, elapsed: Duration) {
219        self.build_plan_elapsed += elapsed;
220    }
221
222    /// Updates the elapsed time of allocating region.
223    pub fn update_allocate_region_elapsed(&mut self, elapsed: Duration) {
224        self.allocate_region_elapsed += elapsed;
225    }
226
227    /// Updates the elapsed time of finishing groups.
228    pub fn update_finish_groups_elapsed(&mut self, elapsed: Duration) {
229        self.finish_groups_elapsed += elapsed;
230    }
231
232    /// Updates the elapsed time of deallocating region.
233    pub fn update_deallocate_region_elapsed(&mut self, elapsed: Duration) {
234        self.deallocate_region_elapsed += elapsed;
235    }
236}
237
238impl Context {
239    pub fn new(
240        ddl_ctx: &DdlContext,
241        mailbox: MailboxRef,
242        server_addr: String,
243        persistent_ctx: PersistentContext,
244    ) -> Self {
245        Self {
246            persistent_ctx,
247            table_metadata_manager: ddl_ctx.table_metadata_manager.clone(),
248            memory_region_keeper: ddl_ctx.memory_region_keeper.clone(),
249            node_manager: ddl_ctx.node_manager.clone(),
250            leader_region_registry: ddl_ctx.leader_region_registry.clone(),
251            mailbox,
252            server_addr,
253            cache_invalidator: ddl_ctx.cache_invalidator.clone(),
254            region_routes_allocator: ddl_ctx.table_metadata_allocator.region_routes_allocator(),
255            wal_options_allocator: ddl_ctx.table_metadata_allocator.wal_options_allocator(),
256            start_time: Instant::now(),
257            volatile_ctx: VolatileContext::default(),
258        }
259    }
260
261    /// Returns the next operation's timeout.
262    pub fn next_operation_timeout(&self) -> Option<Duration> {
263        self.persistent_ctx
264            .timeout
265            .checked_sub(self.start_time.elapsed())
266    }
267
268    /// Updates the elapsed time of building plan.
269    pub fn update_build_plan_elapsed(&mut self, elapsed: Duration) {
270        self.volatile_ctx.metrics.update_build_plan_elapsed(elapsed);
271    }
272
273    /// Updates the elapsed time of allocating region.
274    pub fn update_allocate_region_elapsed(&mut self, elapsed: Duration) {
275        self.volatile_ctx
276            .metrics
277            .update_allocate_region_elapsed(elapsed);
278    }
279
280    /// Updates the elapsed time of finishing groups.
281    pub fn update_finish_groups_elapsed(&mut self, elapsed: Duration) {
282        self.volatile_ctx
283            .metrics
284            .update_finish_groups_elapsed(elapsed);
285    }
286
287    /// Updates the elapsed time of deallocating region.
288    pub fn update_deallocate_region_elapsed(&mut self, elapsed: Duration) {
289        self.volatile_ctx
290            .metrics
291            .update_deallocate_region_elapsed(elapsed);
292    }
293
294    /// Retrieves the table route value for the given table id.
295    ///
296    /// Retry:
297    /// - Failed to retrieve the metadata of table.
298    ///
299    /// Abort:
300    /// - Table route not found.
301    pub async fn get_table_route_value(
302        &self,
303    ) -> Result<DeserializedValueWithBytes<TableRouteValue>> {
304        let table_id = self.persistent_ctx.table_id;
305        let table_route_value = self
306            .table_metadata_manager
307            .table_route_manager()
308            .table_route_storage()
309            .get_with_raw_bytes(table_id)
310            .await
311            .map_err(BoxedError::new)
312            .with_context(|_| error::RetryLaterWithSourceSnafu {
313                reason: format!("Failed to get table route for table: {}", table_id),
314            })?
315            .context(error::TableRouteNotFoundSnafu { table_id })?;
316
317        Ok(table_route_value)
318    }
319
320    /// Retrieves the table info value for the given table id.
321    ///
322    /// Retry:
323    /// - Failed to retrieve the metadata of table.
324    ///
325    /// Abort:
326    /// - Table info not found.
327    pub async fn get_raw_table_info_value(
328        &self,
329    ) -> Result<DeserializedValueWithBytes<TableInfoValue>> {
330        let table_id = self.persistent_ctx.table_id;
331        let table_info_value = self
332            .table_metadata_manager
333            .table_info_manager()
334            .get(table_id)
335            .await
336            .map_err(BoxedError::new)
337            .with_context(|_| error::RetryLaterWithSourceSnafu {
338                reason: format!("Failed to get table info for table: {}", table_id),
339            })?
340            .context(error::TableInfoNotFoundSnafu { table_id })?;
341
342        Ok(table_info_value)
343    }
344
345    pub async fn get_table_info_value(&self) -> Result<TableInfoValue> {
346        let table_info_value = self.get_raw_table_info_value().await?.into_inner();
347        Ok(table_info_value)
348    }
349
350    /// Updates the table info.
351    pub async fn update_table_info(
352        &self,
353        current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
354        new_table_info_value: TableInfoValue,
355    ) -> Result<()> {
356        let table_id = self.persistent_ctx.table_id;
357        self.table_metadata_manager
358            .update_table_info(
359                current_table_info_value,
360                None,
361                new_table_info_value.table_info,
362            )
363            .await
364            .map_err(BoxedError::new)
365            .with_context(|_| error::RetryLaterWithSourceSnafu {
366                reason: format!("Failed to update table info for table: {}", table_id),
367            })
368    }
369
370    /// Updates the table route.
371    ///
372    /// Retry:
373    /// - Failed to retrieve the metadata of datanode table.
374    ///
375    /// Abort:
376    /// - Table route not found.
377    /// - Failed to update the table route.
378    pub async fn update_table_route(
379        &self,
380        current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
381        new_region_routes: Vec<RegionRoute>,
382        new_region_wal_options: RegionWalOptions,
383    ) -> Result<()> {
384        let table_id = self.persistent_ctx.table_id;
385        if new_region_routes.is_empty() {
386            return error::UnexpectedSnafu {
387                violated: format!("new_region_routes is empty for table: {}", table_id),
388            }
389            .fail();
390        }
391        let datanode_id = new_region_routes
392            .first()
393            .unwrap()
394            .leader_peer
395            .as_ref()
396            .context(error::NoLeaderSnafu)?
397            .id;
398        let datanode_table_value =
399            get_datanode_table_value(&self.table_metadata_manager, table_id, datanode_id).await?;
400
401        let RegionInfo {
402            region_options,
403            region_wal_options,
404            ..
405        } = &datanode_table_value.region_info;
406
407        // Merge and validate the new region wal options.
408        let validated_region_wal_options =
409            crate::procedure::repartition::utils::merge_and_validate_region_wal_options(
410                region_wal_options,
411                new_region_wal_options,
412                &new_region_routes,
413                table_id,
414            )?;
415        info!(
416            "Updating table route for table: {}, new region routes: {:?}",
417            table_id, new_region_routes
418        );
419        self.table_metadata_manager
420            .update_table_route(
421                table_id,
422                datanode_table_value.region_info.clone(),
423                current_table_route_value,
424                new_region_routes,
425                region_options,
426                &validated_region_wal_options,
427            )
428            .await
429            .context(error::TableMetadataManagerSnafu)
430    }
431
432    /// Broadcasts the invalidate table cache message.
433    pub async fn invalidate_table_cache(&self) -> Result<()> {
434        let table_id = self.persistent_ctx.table_id;
435        let subject = format!(
436            "Invalidate table cache for repartition table, table: {}",
437            table_id,
438        );
439        let ctx = common_meta::cache_invalidator::Context {
440            subject: Some(subject),
441        };
442        let _ = self
443            .cache_invalidator
444            .invalidate(
445                &ctx,
446                &[
447                    CacheIdent::TableId(table_id),
448                    CacheIdent::TableName(TableName {
449                        catalog_name: self.persistent_ctx.catalog_name.clone(),
450                        schema_name: self.persistent_ctx.schema_name.clone(),
451                        table_name: self.persistent_ctx.table_name.clone(),
452                    }),
453                ],
454            )
455            .await;
456        Ok(())
457    }
458
459    pub fn register_operating_regions(
460        memory_region_keeper: &MemoryRegionKeeperRef,
461        region_routes: &[RegionRoute],
462    ) -> Result<Vec<OperatingRegionGuard>> {
463        let mut operating_guards = Vec::with_capacity(region_routes.len());
464        for (region_id, datanode_id, role) in operating_leader_region_roles(region_routes) {
465            let guard = memory_region_keeper
466                .register_with_role(datanode_id, region_id, role)
467                .context(error::RegionOperatingRaceSnafu {
468                    peer_id: datanode_id,
469                    region_id,
470                })?;
471            operating_guards.push(guard);
472        }
473        Ok(operating_guards)
474    }
475}
476
477#[async_trait::async_trait]
478#[typetag::serde(tag = "repartition_state")]
479pub(crate) trait State: Sync + Send + Debug {
480    fn name(&self) -> &'static str {
481        let type_name = std::any::type_name::<Self>();
482        // short name
483        type_name.split("::").last().unwrap_or(type_name)
484    }
485
486    /// Yields the next [State] and [Status].
487    async fn next(
488        &mut self,
489        ctx: &mut Context,
490        procedure_ctx: &ProcedureContext,
491    ) -> Result<(Box<dyn State>, Status)>;
492
493    fn as_any(&self) -> &dyn Any;
494}
495
496pub struct RepartitionProcedure {
497    state: Box<dyn State>,
498    context: Context,
499}
500
501#[derive(Debug, Serialize)]
502struct RepartitionData<'a> {
503    state: &'a dyn State,
504    persistent_ctx: &'a PersistentContext,
505}
506
507#[derive(Debug, Deserialize)]
508struct RepartitionDataOwned {
509    state: Box<dyn State>,
510    persistent_ctx: PersistentContext,
511}
512
513impl RepartitionProcedure {
514    const TYPE_NAME: &'static str = "metasrv-procedure::Repartition";
515
516    pub fn new(from: RepartitionFrom, to_exprs: Vec<PartitionExpr>, context: Context) -> Self {
517        let state = Box::new(RepartitionStart::new(from, to_exprs));
518
519        Self { state, context }
520    }
521
522    pub fn from_json<F>(json: &str, ctx_factory: F) -> ProcedureResult<Self>
523    where
524        F: FnOnce(PersistentContext) -> Context,
525    {
526        let RepartitionDataOwned {
527            state,
528            persistent_ctx,
529        } = serde_json::from_str(json).context(FromJsonSnafu)?;
530        let context = ctx_factory(persistent_ctx);
531
532        Ok(Self { state, context })
533    }
534
535    /// Returns whether parent rollback should run.
536    ///
537    /// This uses an "after repartition metadata update" semantic: once execution
538    /// reaches `UpdatePartitionMetadata` or any later rollback-active state,
539    /// rollback must try to clean metadata written by the repartition procedure.
540    ///
541    /// Notes:
542    /// - `RepartitionStart`: no-op, because no metadata has been updated yet.
543    /// - `UpdatePartitionMetadata`: rollback table partition metadata.
544    /// - `AllocateRegion` / `Dispatch` / `Collect`: rollback table partition metadata
545    ///   and allocated region metadata.
546    /// - `DeallocateRegion`: is not rollback-active.
547    /// - `RepartitionEnd`: no-op.
548    fn should_rollback(&self) -> bool {
549        self.state
550            .as_any()
551            .is::<update_partition_metadata::UpdatePartitionMetadata>()
552            || self.state.as_any().is::<allocate_region::AllocateRegion>()
553            || self.state.as_any().is::<dispatch::Dispatch>()
554            || self.state.as_any().is::<collect::Collect>()
555    }
556
557    fn rollback_plan_indices(&self) -> HashSet<usize> {
558        self.context
559            .persistent_ctx
560            .failed_procedures
561            .iter()
562            .chain(self.context.persistent_ctx.unknown_procedures.iter())
563            .map(|procedure_meta| procedure_meta.plan_index)
564            .collect()
565    }
566
567    /// Returns allocated region ids that parent rollback should remove.
568    ///
569    /// Rollback uses an "after region allocation" semantic:
570    /// - in `AllocateRegion` and `Dispatch`, all allocated regions belong to the
571    ///   current repartition attempt and must be cleaned up.
572    /// - in `Collect`, only the plans referenced by failed or unknown
573    ///   sub-procedures should be rolled back.
574    fn rollback_allocated_region_ids(&self) -> HashSet<store_api::storage::RegionId> {
575        if self.state.as_any().is::<allocate_region::AllocateRegion>()
576            || self.state.as_any().is::<dispatch::Dispatch>()
577        {
578            return self
579                .context
580                .persistent_ctx
581                .plans
582                .iter()
583                .flat_map(|plan| plan.allocated_region_ids.iter().copied())
584                .collect();
585        }
586
587        self.rollback_plan_indices()
588            .into_iter()
589            .flat_map(|plan_index| {
590                self.context.persistent_ctx.plans[plan_index]
591                    .allocated_region_ids
592                    .iter()
593                    .copied()
594            })
595            .collect()
596    }
597
598    /// Restores group-level staging metadata for failed/unknown plans.
599    ///
600    /// The helper mutates `region_routes` in memory.
601    async fn rollback_group_metadata_for_selected_plans(
602        &mut self,
603        region_routes: &mut [RegionRoute],
604    ) -> Result<()> {
605        let rollback_plan_indices = self.rollback_plan_indices();
606        if rollback_plan_indices.is_empty() {
607            return Ok(());
608        }
609
610        let mut region_routes_map = region_routes
611            .iter_mut()
612            .map(|route| (route.region.id, route))
613            .collect::<HashMap<_, _>>();
614        for plan_index in rollback_plan_indices {
615            let plan = &self.context.persistent_ctx.plans[plan_index];
616            rollback_group_metadata_routes(
617                plan.group_id,
618                &plan.source_regions,
619                &plan.original_target_routes,
620                &plan.allocated_region_ids,
621                &plan.pending_deallocate_region_ids,
622                &mut region_routes_map,
623            )?;
624        }
625
626        Ok(())
627    }
628
629    async fn rollback_partition_metadata(&mut self) -> Result<()> {
630        let Some(update) = self
631            .context
632            .persistent_ctx
633            .partition_metadata_update
634            .as_ref()
635        else {
636            return Ok(());
637        };
638        let table_info_value = self.context.get_raw_table_info_value().await?;
639        let current_partition_key_indices = &table_info_value.table_info.meta.partition_key_indices;
640        let Some(new_partition_key_indices) = update.rollback_partition_key_indices(
641            self.context.persistent_ctx.table_id,
642            current_partition_key_indices,
643        )?
644        else {
645            return Ok(());
646        };
647
648        let mut new_table_info = table_info_value.table_info.clone();
649        new_table_info.meta.partition_key_indices = new_partition_key_indices;
650        self.context
651            .update_table_info(&table_info_value, table_info_value.update(new_table_info))
652            .await?;
653
654        // Do not invalidate the table cache here. The table routes may still
655        // contain partition expressions until `rollback_inner` rolls them back.
656        // Exposing cleared partition columns with partitioned routes can build
657        // an inconsistent partition rule. The cache is invalidated once after
658        // both partition metadata and routes are rolled back.
659
660        Ok(())
661    }
662
663    async fn rollback_inner(&mut self, procedure_ctx: &ProcedureContext) -> Result<()> {
664        if !self.should_rollback() {
665            return Ok(());
666        }
667
668        let table_id = self.context.persistent_ctx.table_id;
669        let allocated_region_ids = self.rollback_allocated_region_ids();
670
671        let table_lock = TableLock::Write(table_id).into();
672        let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
673
674        self.rollback_partition_metadata().await?;
675        let table_route_value = self.context.get_table_route_value().await?;
676        let original_region_routes = region_routes(table_id, table_route_value.get_inner_ref())?;
677        let mut current_region_routes = original_region_routes.clone();
678        self.rollback_group_metadata_for_selected_plans(&mut current_region_routes)
679            .await?;
680        let allocated_region_routes = DeallocateRegion::filter_deallocatable_region_routes(
681            table_id,
682            &current_region_routes,
683            &allocated_region_ids,
684        );
685        if !allocated_region_routes.is_empty() {
686            let table = TableName {
687                catalog_name: self.context.persistent_ctx.catalog_name.clone(),
688                schema_name: self.context.persistent_ctx.schema_name.clone(),
689                table_name: self.context.persistent_ctx.table_name.clone(),
690            };
691            // Memory guards are not required here,
692            // because the table metadata still contains routes for the deallocating regions.
693            if let Err(err) = DeallocateRegion::deallocate_regions(
694                &self.context.node_manager,
695                &self.context.leader_region_registry,
696                table,
697                table_id,
698                &allocated_region_routes,
699            )
700            .await
701            {
702                warn!(err; "Failed to drop allocated regions during repartition rollback, table_id: {}, regions: {:?}", table_id, allocated_region_ids);
703            }
704        }
705
706        let new_region_routes =
707            DeallocateRegion::generate_region_routes(&current_region_routes, &allocated_region_ids);
708
709        if new_region_routes != *original_region_routes {
710            self.context
711                .update_table_route(&table_route_value, new_region_routes, HashMap::new())
712                .await
713                .map_err(BoxedError::new)
714                .with_context(|_| error::RetryLaterWithSourceSnafu {
715                    reason: format!(
716                        "Failed to rollback allocated region routes for repartition table: {}",
717                        table_id
718                    ),
719                })?;
720        }
721
722        if let Err(err) = self.context.invalidate_table_cache().await {
723            warn!(err; "Failed to invalidate table cache during repartition rollback, table_id: {}", table_id);
724        }
725
726        Ok(())
727    }
728}
729
730#[async_trait::async_trait]
731impl Procedure for RepartitionProcedure {
732    fn type_name(&self) -> &str {
733        Self::TYPE_NAME
734    }
735
736    #[tracing::instrument(skip_all, fields(
737        state = %self.state.name(),
738        table_id = %self.context.persistent_ctx.table_id
739    ))]
740    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
741        let state = &mut self.state;
742        let state_name = state.name();
743        // Log state transition
744        common_telemetry::info!(
745            "Repartition procedure executing state: {}, table_id: {}",
746            state_name,
747            self.context.persistent_ctx.table_id
748        );
749        match state.next(&mut self.context, _ctx).await {
750            Ok((next, status)) => {
751                *state = next;
752                Ok(status)
753            }
754            Err(e) => {
755                if e.is_retryable() {
756                    Err(ProcedureError::retry_later(e))
757                } else {
758                    error!(
759                        e;
760                        "Repartition procedure failed, table id: {}",
761                        self.context.persistent_ctx.table_id,
762                    );
763                    Err(ProcedureError::external(e))
764                }
765            }
766        }
767    }
768
769    async fn rollback(&mut self, ctx: &ProcedureContext) -> ProcedureResult<()> {
770        self.rollback_inner(ctx)
771            .await
772            .map_err(ProcedureError::external)
773    }
774
775    fn rollback_supported(&self) -> bool {
776        true
777    }
778
779    fn dump(&self) -> ProcedureResult<String> {
780        let data = RepartitionData {
781            state: self.state.as_ref(),
782            persistent_ctx: &self.context.persistent_ctx,
783        };
784        serde_json::to_string(&data).context(ToJsonSnafu)
785    }
786
787    fn lock_key(&self) -> LockKey {
788        LockKey::new(self.context.persistent_ctx.lock_key())
789    }
790
791    fn user_metadata(&self) -> Option<UserMetadata> {
792        // TODO(weny): support user metadata.
793        None
794    }
795}
796
797pub struct DefaultRepartitionProcedureFactory {
798    mailbox: MailboxRef,
799    server_addr: String,
800}
801
802impl DefaultRepartitionProcedureFactory {
803    pub fn new(mailbox: MailboxRef, server_addr: String) -> Self {
804        Self {
805            mailbox,
806            server_addr,
807        }
808    }
809}
810
811impl RepartitionProcedureFactory for DefaultRepartitionProcedureFactory {
812    fn create(
813        &self,
814        ddl_ctx: &DdlContext,
815        table_name: TableName,
816        table_id: TableId,
817        source: RepartitionSource,
818        to_exprs: Vec<String>,
819        timeout: Option<Duration>,
820    ) -> std::result::Result<BoxedProcedure, BoxedError> {
821        let persistent_ctx = PersistentContext::new(table_name, table_id, timeout);
822        let from = match source {
823            RepartitionSource::Partitioned {
824                exprs,
825                target_partition_columns,
826            } => {
827                let exprs = exprs
828                    .iter()
829                    .map(|e| {
830                        PartitionExpr::from_json_str(e)
831                            .context(error::DeserializePartitionExprSnafu)?
832                            .context(error::EmptyPartitionExprSnafu)
833                    })
834                    .collect::<Result<Vec<_>>>()
835                    .map_err(BoxedError::new)?;
836                RepartitionFrom::Partitioned {
837                    exprs,
838                    target_partition_columns,
839                }
840            }
841            RepartitionSource::Unpartitioned { partition_columns } => {
842                RepartitionFrom::Unpartitioned { partition_columns }
843            }
844        };
845        let to_exprs = to_exprs
846            .iter()
847            .map(|e| {
848                PartitionExpr::from_json_str(e)
849                    .context(error::DeserializePartitionExprSnafu)?
850                    .context(error::EmptyPartitionExprSnafu)
851            })
852            .collect::<Result<Vec<_>>>()
853            .map_err(BoxedError::new)?;
854
855        let procedure = RepartitionProcedure::new(
856            from,
857            to_exprs,
858            Context::new(
859                ddl_ctx,
860                self.mailbox.clone(),
861                self.server_addr.clone(),
862                persistent_ctx,
863            ),
864        );
865
866        Ok(Box::new(procedure))
867    }
868
869    fn register_loaders(
870        &self,
871        ddl_ctx: &DdlContext,
872        procedure_manager: &ProcedureManagerRef,
873    ) -> std::result::Result<(), BoxedError> {
874        // Registers the repartition procedure loader.
875        let mailbox = self.mailbox.clone();
876        let server_addr = self.server_addr.clone();
877        let moved_ddl_ctx = ddl_ctx.clone();
878        procedure_manager
879            .register_loader(
880                RepartitionProcedure::TYPE_NAME,
881                Box::new(move |json| {
882                    let mailbox = mailbox.clone();
883                    let server_addr = server_addr.clone();
884                    let ddl_ctx = moved_ddl_ctx.clone();
885                    let factory = move |persistent_ctx| {
886                        Context::new(&ddl_ctx, mailbox, server_addr, persistent_ctx)
887                    };
888                    RepartitionProcedure::from_json(json, factory).map(|p| Box::new(p) as _)
889                }),
890            )
891            .map_err(BoxedError::new)?;
892
893        // Registers the repartition group procedure loader.
894        let mailbox = self.mailbox.clone();
895        let server_addr = self.server_addr.clone();
896        let moved_ddl_ctx = ddl_ctx.clone();
897        procedure_manager
898            .register_loader(
899                RepartitionGroupProcedure::TYPE_NAME,
900                Box::new(move |json| {
901                    let mailbox = mailbox.clone();
902                    let server_addr = server_addr.clone();
903                    let ddl_ctx = moved_ddl_ctx.clone();
904                    let factory = move |persistent_ctx| {
905                        RepartitionGroupContext::new(&ddl_ctx, mailbox, server_addr, persistent_ctx)
906                    };
907                    RepartitionGroupProcedure::from_json(json, factory).map(|p| Box::new(p) as _)
908                }),
909            )
910            .map_err(BoxedError::new)?;
911
912        Ok(())
913    }
914}
915
916#[cfg(test)]
917mod tests {
918    use std::collections::HashMap;
919    use std::sync::Arc;
920    use std::sync::atomic::{AtomicBool, Ordering};
921
922    use common_error::ext::BoxedError;
923    use common_error::mock::MockError;
924    use common_error::status_code::StatusCode;
925    use common_meta::ddl::test_util::datanode_handler::{
926        DatanodeWatcher, NaiveDatanodeHandler, UnexpectedErrorDatanodeHandler,
927    };
928    use common_meta::error;
929    use common_meta::peer::Peer;
930    use common_meta::region_keeper::MemoryRegionKeeper;
931    use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
932    use common_meta::test_util::MockDatanodeManager;
933    use common_procedure::{Error as ProcedureError, Procedure, ProcedureId, ProcedureState};
934    use store_api::region_engine::RegionRole;
935    use store_api::storage::RegionId;
936    use table::table_name::TableName;
937    use tokio::sync::mpsc;
938    use uuid::Uuid;
939
940    use super::*;
941    use crate::procedure::repartition::allocate_region::AllocateRegion;
942    use crate::procedure::repartition::collect::Collect;
943    use crate::procedure::repartition::deallocate_region::DeallocateRegion;
944    use crate::procedure::repartition::dispatch::Dispatch;
945    use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
946    use crate::procedure::repartition::plan::{SourceRegionDescriptor, TargetRegionDescriptor};
947    use crate::procedure::repartition::repartition_end::RepartitionEnd;
948    use crate::procedure::repartition::test_util::{
949        TestingEnv, assert_parent_state, current_parent_region_routes, extract_subprocedure_ids,
950        new_parent_context, procedure_context_with_receivers, procedure_state_receiver, range_expr,
951        test_region_route, test_region_wal_options,
952    };
953    use crate::procedure::repartition::update_partition_metadata::{
954        PartitionMetadataUpdate, UpdatePartitionMetadata,
955    };
956
957    fn test_plan(table_id: TableId) -> RepartitionPlanEntry {
958        RepartitionPlanEntry {
959            group_id: uuid::Uuid::new_v4(),
960            source_regions: vec![SourceRegionDescriptor::partitioned(
961                RegionId::new(table_id, 1),
962                range_expr("x", 0, 100),
963            )],
964            target_regions: vec![
965                TargetRegionDescriptor {
966                    region_id: RegionId::new(table_id, 1),
967                    partition_expr: range_expr("x", 0, 50),
968                },
969                TargetRegionDescriptor {
970                    region_id: RegionId::new(table_id, 3),
971                    partition_expr: range_expr("x", 50, 100),
972                },
973            ],
974            allocated_region_ids: vec![RegionId::new(table_id, 3)],
975            pending_deallocate_region_ids: vec![],
976            transition_map: vec![vec![0, 1]],
977            original_target_routes: vec![],
978        }
979    }
980
981    fn with_rollback_metadata(
982        mut plan: RepartitionPlanEntry,
983        original_target_routes: Vec<RegionRoute>,
984    ) -> RepartitionPlanEntry {
985        plan.original_target_routes = original_target_routes;
986        plan
987    }
988
989    fn apply_group_staging(
990        plan: &RepartitionPlanEntry,
991        current_region_routes: &[RegionRoute],
992    ) -> Vec<RegionRoute> {
993        UpdateMetadata::apply_staging_region_routes(
994            plan.group_id,
995            &plan.source_regions,
996            &plan.target_regions,
997            &plan.pending_deallocate_region_ids,
998            current_region_routes,
999        )
1000        .unwrap()
1001    }
1002
1003    fn exit_group_staging(
1004        plan: &RepartitionPlanEntry,
1005        current_region_routes: &[RegionRoute],
1006    ) -> Vec<RegionRoute> {
1007        UpdateMetadata::exit_staging_region_routes(
1008            plan.group_id,
1009            &plan.source_regions,
1010            &plan.target_regions,
1011            current_region_routes,
1012        )
1013        .unwrap()
1014    }
1015
1016    fn region_route_by_id(region_routes: &[RegionRoute], region_id: RegionId) -> &RegionRoute {
1017        region_routes
1018            .iter()
1019            .find(|route| route.region.id == region_id)
1020            .unwrap()
1021    }
1022
1023    async fn table_partition_key_indices(ctx: &Context) -> Vec<usize> {
1024        ctx.get_table_info_value()
1025            .await
1026            .unwrap()
1027            .table_info
1028            .meta
1029            .partition_key_indices
1030    }
1031
1032    fn test_procedure(state: Box<dyn State>, context: Context) -> RepartitionProcedure {
1033        RepartitionProcedure { state, context }
1034    }
1035
1036    fn test_context(env: &TestingEnv, table_id: TableId) -> Context {
1037        let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
1038        let ddl_ctx = env.ddl_context(node_manager);
1039        let persistent_ctx = PersistentContext::new(
1040            TableName::new("test_catalog", "test_schema", "test_table"),
1041            table_id,
1042            None,
1043        );
1044
1045        Context::new(
1046            &ddl_ctx,
1047            env.mailbox_ctx.mailbox().clone(),
1048            env.server_addr.clone(),
1049            persistent_ctx,
1050        )
1051    }
1052
1053    #[test]
1054    fn test_filter_allocated_region_routes() {
1055        let table_id = 1024;
1056        let region_routes = vec![
1057            test_region_route(RegionId::new(table_id, 1), "a"),
1058            test_region_route(RegionId::new(table_id, 2), "b"),
1059        ];
1060        let allocated_region_ids = HashSet::from([RegionId::new(table_id, 2)]);
1061
1062        let new_region_routes =
1063            DeallocateRegion::generate_region_routes(&region_routes, &allocated_region_ids);
1064
1065        assert_eq!(new_region_routes.len(), 1);
1066        assert_eq!(new_region_routes[0].region.id, RegionId::new(table_id, 1));
1067    }
1068
1069    #[test]
1070    fn test_should_rollback_after_metadata_update() {
1071        let env = TestingEnv::new();
1072        let table_id = 1024;
1073
1074        let procedure = test_procedure(
1075            Box::new(RepartitionStart::new(
1076                RepartitionFrom::Partitioned {
1077                    exprs: vec![],
1078                    target_partition_columns: None,
1079                },
1080                vec![],
1081            )),
1082            test_context(&env, table_id),
1083        );
1084        assert!(!procedure.should_rollback());
1085
1086        let procedure = test_procedure(
1087            Box::new(UpdatePartitionMetadata::new(vec![])),
1088            test_context(&env, table_id),
1089        );
1090        assert!(procedure.should_rollback());
1091
1092        let procedure = test_procedure(
1093            Box::new(AllocateRegion::new(vec![])),
1094            test_context(&env, table_id),
1095        );
1096        assert!(procedure.should_rollback());
1097
1098        let procedure = test_procedure(Box::new(Dispatch), test_context(&env, table_id));
1099        assert!(procedure.should_rollback());
1100
1101        let procedure =
1102            test_procedure(Box::new(Collect::new(vec![])), test_context(&env, table_id));
1103        assert!(procedure.should_rollback());
1104
1105        let procedure = test_procedure(Box::new(DeallocateRegion), test_context(&env, table_id));
1106        assert!(!procedure.should_rollback());
1107
1108        let procedure = test_procedure(Box::new(RepartitionEnd), test_context(&env, table_id));
1109        assert!(!procedure.should_rollback());
1110    }
1111
1112    #[test]
1113    fn test_register_operating_regions_preserves_route_roles() {
1114        let keeper = Arc::new(MemoryRegionKeeper::new());
1115        let region_routes = vec![
1116            RegionRoute {
1117                region: Region::new_test(RegionId::new(1024, 1)),
1118                leader_peer: Some(Peer::empty(1)),
1119                follower_peers: vec![],
1120                leader_state: None,
1121                leader_down_since: None,
1122                write_route_policy: None,
1123            },
1124            RegionRoute {
1125                region: Region::new_test(RegionId::new(1024, 2)),
1126                leader_peer: Some(Peer::empty(2)),
1127                follower_peers: vec![],
1128                leader_state: Some(LeaderState::Staging),
1129                leader_down_since: None,
1130                write_route_policy: None,
1131            },
1132            RegionRoute {
1133                region: Region::new_test(RegionId::new(1024, 3)),
1134                leader_peer: Some(Peer::empty(3)),
1135                follower_peers: vec![],
1136                leader_state: Some(LeaderState::Downgrading),
1137                leader_down_since: None,
1138                write_route_policy: None,
1139            },
1140        ];
1141
1142        let _guards = Context::register_operating_regions(&keeper, &region_routes).unwrap();
1143
1144        let leader_roles =
1145            keeper.extract_operating_region_roles(1, &HashSet::from([RegionId::new(1024, 1)]));
1146        let staging_roles =
1147            keeper.extract_operating_region_roles(2, &HashSet::from([RegionId::new(1024, 2)]));
1148        let downgrading_roles =
1149            keeper.extract_operating_region_roles(3, &HashSet::from([RegionId::new(1024, 3)]));
1150
1151        assert_eq!(
1152            leader_roles.get(&RegionId::new(1024, 1)),
1153            Some(&RegionRole::Leader)
1154        );
1155        assert_eq!(
1156            staging_roles.get(&RegionId::new(1024, 2)),
1157            Some(&RegionRole::StagingLeader)
1158        );
1159        assert_eq!(
1160            downgrading_roles.get(&RegionId::new(1024, 3)),
1161            Some(&RegionRole::DowngradingLeader)
1162        );
1163    }
1164
1165    #[test]
1166    fn test_persistent_context_partition_metadata_update_serde_default() {
1167        let json = r#"{
1168            "catalog_name":"test_catalog",
1169            "schema_name":"test_schema",
1170            "table_name":"test_table",
1171            "table_id":1024,
1172            "plans":[],
1173            "timeout":"120s"
1174        }"#;
1175
1176        let persistent_ctx: PersistentContext = serde_json::from_str(json).unwrap();
1177
1178        assert!(persistent_ctx.partition_metadata_update.is_none());
1179    }
1180
1181    #[tokio::test]
1182    async fn test_repartition_rollback_removes_partition_metadata_indices() {
1183        let env = TestingEnv::new();
1184        let table_id = 1024;
1185        let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
1186        env.create_physical_table_metadata_for_repartition(
1187            table_id,
1188            vec![test_region_route(RegionId::new(table_id, 1), "")],
1189            test_region_wal_options(&[1]),
1190        )
1191        .await;
1192
1193        let mut context = new_parent_context(&env, node_manager, table_id);
1194        let current = context.get_raw_table_info_value().await.unwrap();
1195        let mut table_info = current.table_info.clone();
1196        table_info.meta.partition_key_indices = vec![0, 1];
1197        context
1198            .update_table_info(&current, current.update(table_info))
1199            .await
1200            .unwrap();
1201        context.persistent_ctx.partition_metadata_update = Some(
1202            PartitionMetadataUpdate::from_partitioned(vec![1], vec![0, 1]),
1203        );
1204        let mut procedure = RepartitionProcedure {
1205            state: Box::new(UpdatePartitionMetadata::new(vec![])),
1206            context,
1207        };
1208
1209        procedure
1210            .rollback(&TestingEnv::procedure_context())
1211            .await
1212            .unwrap();
1213
1214        assert_eq!(
1215            procedure
1216                .context
1217                .get_table_info_value()
1218                .await
1219                .unwrap()
1220                .table_info
1221                .meta
1222                .partition_key_indices,
1223            vec![1]
1224        );
1225    }
1226
1227    #[tokio::test]
1228    async fn test_repartition_rollback_removes_allocated_routes_from_dispatch() {
1229        let env = TestingEnv::new();
1230        let table_id = 1024;
1231        let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
1232        let ddl_ctx = env.ddl_context(node_manager);
1233        let original_region_routes = vec![
1234            test_region_route(
1235                RegionId::new(table_id, 1),
1236                &range_expr("x", 0, 100).as_json_str().unwrap(),
1237            ),
1238            test_region_route(
1239                RegionId::new(table_id, 2),
1240                &range_expr("x", 50, 100).as_json_str().unwrap(),
1241            ),
1242            test_region_route(RegionId::new(table_id, 3), ""),
1243        ];
1244        env.create_physical_table_metadata_with_wal_options(
1245            table_id,
1246            original_region_routes,
1247            test_region_wal_options(&[1, 2]),
1248        )
1249        .await;
1250
1251        let mut persistent_ctx = PersistentContext::new(
1252            TableName::new("test_catalog", "test_schema", "test_table"),
1253            table_id,
1254            None,
1255        );
1256        persistent_ctx.plans = vec![with_rollback_metadata(
1257            test_plan(table_id),
1258            vec![
1259                test_region_route(
1260                    RegionId::new(table_id, 1),
1261                    &range_expr("x", 0, 100).as_json_str().unwrap(),
1262                ),
1263                test_region_route(RegionId::new(table_id, 3), ""),
1264            ],
1265        )];
1266        persistent_ctx.failed_procedures = vec![ProcedureMeta {
1267            plan_index: 0,
1268            group_id: Uuid::new_v4(),
1269            procedure_id: ProcedureId::random(),
1270        }];
1271        let context = Context::new(
1272            &ddl_ctx,
1273            env.mailbox_ctx.mailbox().clone(),
1274            env.server_addr.clone(),
1275            persistent_ctx,
1276        );
1277        let mut procedure = RepartitionProcedure {
1278            state: Box::new(Dispatch),
1279            context,
1280        };
1281
1282        procedure
1283            .rollback(&TestingEnv::procedure_context())
1284            .await
1285            .unwrap();
1286
1287        let region_routes = current_parent_region_routes(&procedure.context).await;
1288        assert_eq!(region_routes.len(), 2);
1289        assert_eq!(region_routes[0].region.id, RegionId::new(table_id, 1));
1290        assert_eq!(region_routes[1].region.id, RegionId::new(table_id, 2));
1291    }
1292
1293    #[tokio::test]
1294    async fn test_repartition_rollback_removes_allocated_routes_from_allocate() {
1295        let env = TestingEnv::new();
1296        let table_id = 1024;
1297        let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
1298        let ddl_ctx = env.ddl_context(node_manager);
1299        let original_region_routes = vec![
1300            test_region_route(
1301                RegionId::new(table_id, 1),
1302                &range_expr("x", 0, 100).as_json_str().unwrap(),
1303            ),
1304            test_region_route(
1305                RegionId::new(table_id, 2),
1306                &range_expr("x", 50, 100).as_json_str().unwrap(),
1307            ),
1308            test_region_route(RegionId::new(table_id, 3), ""),
1309        ];
1310        env.create_physical_table_metadata_with_wal_options(
1311            table_id,
1312            original_region_routes,
1313            test_region_wal_options(&[1, 2]),
1314        )
1315        .await;
1316
1317        let mut persistent_ctx = PersistentContext::new(
1318            TableName::new("test_catalog", "test_schema", "test_table"),
1319            table_id,
1320            None,
1321        );
1322        persistent_ctx.plans = vec![test_plan(table_id)];
1323        let context = Context::new(
1324            &ddl_ctx,
1325            env.mailbox_ctx.mailbox().clone(),
1326            env.server_addr.clone(),
1327            persistent_ctx,
1328        );
1329        let mut procedure = RepartitionProcedure {
1330            state: Box::new(AllocateRegion::new(vec![])),
1331            context,
1332        };
1333
1334        procedure
1335            .rollback(&TestingEnv::procedure_context())
1336            .await
1337            .unwrap();
1338
1339        let region_routes = current_parent_region_routes(&procedure.context).await;
1340        assert_eq!(region_routes.len(), 2);
1341        assert_eq!(region_routes[0].region.id, RegionId::new(table_id, 1));
1342        assert_eq!(region_routes[1].region.id, RegionId::new(table_id, 2));
1343    }
1344
1345    #[tokio::test]
1346    async fn test_repartition_rollback_from_collect_only_removes_failed_allocated_routes() {
1347        let env = TestingEnv::new();
1348        let table_id = 1024;
1349        let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
1350        let ddl_ctx = env.ddl_context(node_manager);
1351        let original_region_routes = vec![
1352            test_region_route(
1353                RegionId::new(table_id, 1),
1354                &range_expr("x", 0, 100).as_json_str().unwrap(),
1355            ),
1356            test_region_route(
1357                RegionId::new(table_id, 2),
1358                &range_expr("x", 100, 200).as_json_str().unwrap(),
1359            ),
1360            test_region_route(RegionId::new(table_id, 3), ""),
1361            test_region_route(RegionId::new(table_id, 4), ""),
1362        ];
1363        env.create_physical_table_metadata_with_wal_options(
1364            table_id,
1365            original_region_routes,
1366            test_region_wal_options(&[1, 2, 3, 4]),
1367        )
1368        .await;
1369
1370        let mut persistent_ctx = PersistentContext::new(
1371            TableName::new("test_catalog", "test_schema", "test_table"),
1372            table_id,
1373            None,
1374        );
1375        let failed_plan = test_plan(table_id);
1376        let failed_plan = with_rollback_metadata(
1377            failed_plan,
1378            vec![
1379                test_region_route(
1380                    RegionId::new(table_id, 1),
1381                    &range_expr("x", 0, 100).as_json_str().unwrap(),
1382                ),
1383                test_region_route(RegionId::new(table_id, 3), ""),
1384            ],
1385        );
1386        let succeeded_plan = RepartitionPlanEntry {
1387            group_id: Uuid::new_v4(),
1388            source_regions: vec![SourceRegionDescriptor::partitioned(
1389                RegionId::new(table_id, 2),
1390                range_expr("x", 100, 200),
1391            )],
1392            target_regions: vec![
1393                TargetRegionDescriptor {
1394                    region_id: RegionId::new(table_id, 2),
1395                    partition_expr: range_expr("x", 100, 150),
1396                },
1397                TargetRegionDescriptor {
1398                    region_id: RegionId::new(table_id, 4),
1399                    partition_expr: range_expr("x", 150, 200),
1400                },
1401            ],
1402            allocated_region_ids: vec![RegionId::new(table_id, 4)],
1403            pending_deallocate_region_ids: vec![],
1404            transition_map: vec![vec![0]],
1405            original_target_routes: vec![
1406                test_region_route(
1407                    RegionId::new(table_id, 2),
1408                    &range_expr("x", 100, 200).as_json_str().unwrap(),
1409                ),
1410                test_region_route(RegionId::new(table_id, 4), ""),
1411            ],
1412        };
1413        persistent_ctx.plans = vec![failed_plan, succeeded_plan];
1414        persistent_ctx.failed_procedures = vec![ProcedureMeta {
1415            plan_index: 0,
1416            group_id: persistent_ctx.plans[0].group_id,
1417            procedure_id: ProcedureId::random(),
1418        }];
1419
1420        let context = Context::new(
1421            &ddl_ctx,
1422            env.mailbox_ctx.mailbox().clone(),
1423            env.server_addr.clone(),
1424            persistent_ctx,
1425        );
1426        let mut procedure = RepartitionProcedure {
1427            state: Box::new(Collect::new(vec![])),
1428            context,
1429        };
1430
1431        procedure
1432            .rollback(&TestingEnv::procedure_context())
1433            .await
1434            .unwrap();
1435
1436        let region_routes = current_parent_region_routes(&procedure.context).await;
1437        assert_eq!(region_routes.len(), 3);
1438        assert_eq!(region_routes[0].region.id, RegionId::new(table_id, 1));
1439        assert_eq!(region_routes[1].region.id, RegionId::new(table_id, 2));
1440        assert_eq!(region_routes[2].region.id, RegionId::new(table_id, 4));
1441    }
1442
1443    #[tokio::test]
1444    async fn test_repartition_rollback_from_collect_restores_failed_group_metadata_only() {
1445        let env = TestingEnv::new();
1446        let table_id = 1024;
1447        let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
1448        let ddl_ctx = env.ddl_context(node_manager);
1449        let original_region_routes = vec![
1450            test_region_route(
1451                RegionId::new(table_id, 1),
1452                &range_expr("x", 0, 100).as_json_str().unwrap(),
1453            ),
1454            test_region_route(
1455                RegionId::new(table_id, 2),
1456                &range_expr("x", 100, 200).as_json_str().unwrap(),
1457            ),
1458            test_region_route(RegionId::new(table_id, 3), ""),
1459            test_region_route(RegionId::new(table_id, 4), ""),
1460        ];
1461
1462        let failed_plan = with_rollback_metadata(
1463            test_plan(table_id),
1464            vec![
1465                original_region_routes[0].clone(),
1466                original_region_routes[2].clone(),
1467            ],
1468        );
1469        let succeeded_plan = RepartitionPlanEntry {
1470            group_id: Uuid::new_v4(),
1471            source_regions: vec![SourceRegionDescriptor::partitioned(
1472                RegionId::new(table_id, 2),
1473                range_expr("x", 100, 200),
1474            )],
1475            target_regions: vec![
1476                TargetRegionDescriptor {
1477                    region_id: RegionId::new(table_id, 2),
1478                    partition_expr: range_expr("x", 100, 150),
1479                },
1480                TargetRegionDescriptor {
1481                    region_id: RegionId::new(table_id, 4),
1482                    partition_expr: range_expr("x", 150, 200),
1483                },
1484            ],
1485            allocated_region_ids: vec![RegionId::new(table_id, 4)],
1486            pending_deallocate_region_ids: vec![],
1487            transition_map: vec![vec![0, 1]],
1488            original_target_routes: vec![
1489                original_region_routes[1].clone(),
1490                original_region_routes[3].clone(),
1491            ],
1492        };
1493        let current_region_routes = apply_group_staging(&failed_plan, &original_region_routes);
1494        let current_region_routes = apply_group_staging(&succeeded_plan, &current_region_routes);
1495        let current_region_routes = exit_group_staging(&succeeded_plan, &current_region_routes);
1496        env.create_physical_table_metadata_with_wal_options(
1497            table_id,
1498            current_region_routes,
1499            test_region_wal_options(&[1, 2, 3, 4]),
1500        )
1501        .await;
1502
1503        let mut persistent_ctx = PersistentContext::new(
1504            TableName::new("test_catalog", "test_schema", "test_table"),
1505            table_id,
1506            None,
1507        );
1508        persistent_ctx.plans = vec![failed_plan, succeeded_plan.clone()];
1509        persistent_ctx.failed_procedures = vec![ProcedureMeta {
1510            plan_index: 0,
1511            group_id: persistent_ctx.plans[0].group_id,
1512            procedure_id: ProcedureId::random(),
1513        }];
1514
1515        let context = Context::new(
1516            &ddl_ctx,
1517            env.mailbox_ctx.mailbox().clone(),
1518            env.server_addr.clone(),
1519            persistent_ctx,
1520        );
1521        let mut procedure = RepartitionProcedure {
1522            state: Box::new(Collect::new(vec![])),
1523            context,
1524        };
1525
1526        procedure
1527            .rollback(&TestingEnv::procedure_context())
1528            .await
1529            .unwrap();
1530
1531        assert_eq!(
1532            current_parent_region_routes(&procedure.context).await,
1533            vec![
1534                test_region_route(
1535                    RegionId::new(table_id, 1),
1536                    &range_expr("x", 0, 100).as_json_str().unwrap(),
1537                ),
1538                RegionRoute {
1539                    region: Region {
1540                        id: RegionId::new(table_id, 2),
1541                        partition_expr: range_expr("x", 100, 150).as_json_str().unwrap(),
1542                        ..Default::default()
1543                    },
1544                    leader_peer: Some(Peer::empty(1)),
1545                    ..Default::default()
1546                },
1547                RegionRoute {
1548                    region: Region {
1549                        id: RegionId::new(table_id, 4),
1550                        partition_expr: range_expr("x", 150, 200).as_json_str().unwrap(),
1551                        ..Default::default()
1552                    },
1553                    leader_peer: Some(Peer::empty(1)),
1554                    ..Default::default()
1555                },
1556            ]
1557        );
1558    }
1559
1560    #[tokio::test]
1561    async fn test_repartition_rollback_from_collect_restores_unknown_group_metadata() {
1562        let env = TestingEnv::new();
1563        let table_id = 1024;
1564        let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
1565        let ddl_ctx = env.ddl_context(node_manager);
1566        let original_region_routes = vec![
1567            test_region_route(
1568                RegionId::new(table_id, 1),
1569                &range_expr("x", 0, 100).as_json_str().unwrap(),
1570            ),
1571            test_region_route(
1572                RegionId::new(table_id, 2),
1573                &range_expr("x", 100, 200).as_json_str().unwrap(),
1574            ),
1575            test_region_route(RegionId::new(table_id, 3), ""),
1576        ];
1577        let plan = with_rollback_metadata(
1578            test_plan(table_id),
1579            vec![
1580                original_region_routes[0].clone(),
1581                original_region_routes[2].clone(),
1582            ],
1583        );
1584        let staged_region_routes = apply_group_staging(&plan, &original_region_routes);
1585        assert_eq!(
1586            region_route_by_id(&staged_region_routes, RegionId::new(table_id, 1))
1587                .region
1588                .partition_expr(),
1589            range_expr("x", 0, 50).as_json_str().unwrap()
1590        );
1591        assert!(
1592            region_route_by_id(&staged_region_routes, RegionId::new(table_id, 1))
1593                .is_leader_staging()
1594        );
1595        assert_eq!(
1596            region_route_by_id(&staged_region_routes, RegionId::new(table_id, 3))
1597                .region
1598                .partition_expr(),
1599            range_expr("x", 50, 100).as_json_str().unwrap()
1600        );
1601        assert!(
1602            region_route_by_id(&staged_region_routes, RegionId::new(table_id, 3))
1603                .is_leader_staging()
1604        );
1605        env.create_physical_table_metadata_with_wal_options(
1606            table_id,
1607            staged_region_routes,
1608            test_region_wal_options(&[1, 2, 3]),
1609        )
1610        .await;
1611
1612        let mut persistent_ctx = PersistentContext::new(
1613            TableName::new("test_catalog", "test_schema", "test_table"),
1614            table_id,
1615            None,
1616        );
1617        persistent_ctx.plans = vec![plan.clone()];
1618        persistent_ctx.unknown_procedures = vec![ProcedureMeta {
1619            plan_index: 0,
1620            group_id: plan.group_id,
1621            procedure_id: ProcedureId::random(),
1622        }];
1623
1624        let context = Context::new(
1625            &ddl_ctx,
1626            env.mailbox_ctx.mailbox().clone(),
1627            env.server_addr.clone(),
1628            persistent_ctx,
1629        );
1630        let mut procedure = RepartitionProcedure {
1631            state: Box::new(Collect::new(vec![])),
1632            context,
1633        };
1634
1635        procedure
1636            .rollback(&TestingEnv::procedure_context())
1637            .await
1638            .unwrap();
1639
1640        assert_eq!(
1641            current_parent_region_routes(&procedure.context).await,
1642            vec![
1643                original_region_routes[0].clone(),
1644                original_region_routes[1].clone()
1645            ]
1646        );
1647    }
1648
1649    #[tokio::test]
1650    async fn test_repartition_rollback_is_idempotent() {
1651        let env = TestingEnv::new();
1652        let table_id = 1024;
1653        let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
1654        let ddl_ctx = env.ddl_context(node_manager);
1655        let original_region_routes = vec![
1656            test_region_route(
1657                RegionId::new(table_id, 1),
1658                &range_expr("x", 0, 100).as_json_str().unwrap(),
1659            ),
1660            test_region_route(
1661                RegionId::new(table_id, 2),
1662                &range_expr("x", 50, 100).as_json_str().unwrap(),
1663            ),
1664            test_region_route(RegionId::new(table_id, 3), ""),
1665        ];
1666        env.create_physical_table_metadata_with_wal_options(
1667            table_id,
1668            original_region_routes,
1669            test_region_wal_options(&[1, 2]),
1670        )
1671        .await;
1672
1673        let mut persistent_ctx = PersistentContext::new(
1674            TableName::new("test_catalog", "test_schema", "test_table"),
1675            table_id,
1676            None,
1677        );
1678        persistent_ctx.plans = vec![with_rollback_metadata(
1679            test_plan(table_id),
1680            vec![
1681                test_region_route(
1682                    RegionId::new(table_id, 1),
1683                    &range_expr("x", 0, 100).as_json_str().unwrap(),
1684                ),
1685                test_region_route(RegionId::new(table_id, 3), ""),
1686            ],
1687        )];
1688        persistent_ctx.failed_procedures = vec![ProcedureMeta {
1689            plan_index: 0,
1690            group_id: Uuid::new_v4(),
1691            procedure_id: ProcedureId::random(),
1692        }];
1693        let context = Context::new(
1694            &ddl_ctx,
1695            env.mailbox_ctx.mailbox().clone(),
1696            env.server_addr.clone(),
1697            persistent_ctx,
1698        );
1699        let mut procedure = RepartitionProcedure {
1700            state: Box::new(Dispatch),
1701            context,
1702        };
1703
1704        procedure
1705            .rollback(&TestingEnv::procedure_context())
1706            .await
1707            .unwrap();
1708        let once = current_parent_region_routes(&procedure.context).await;
1709
1710        procedure
1711            .rollback(&TestingEnv::procedure_context())
1712            .await
1713            .unwrap();
1714        let twice = current_parent_region_routes(&procedure.context).await;
1715
1716        assert_eq!(once, twice);
1717        assert_eq!(once.len(), 2);
1718        assert_eq!(once[0].region.id, RegionId::new(table_id, 1));
1719        assert_eq!(once[1].region.id, RegionId::new(table_id, 2));
1720    }
1721
1722    #[tokio::test]
1723    async fn test_repartition_rollback_from_collect_restores_failed_merge_group_metadata_only() {
1724        let env = TestingEnv::new();
1725        let table_id = 1024;
1726        let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
1727        let ddl_ctx = env.ddl_context(node_manager);
1728        let original_region_routes = vec![
1729            test_region_route(
1730                RegionId::new(table_id, 1),
1731                &range_expr("x", 0, 100).as_json_str().unwrap(),
1732            ),
1733            test_region_route(
1734                RegionId::new(table_id, 2),
1735                &range_expr("x", 100, 200).as_json_str().unwrap(),
1736            ),
1737            test_region_route(
1738                RegionId::new(table_id, 3),
1739                &range_expr("x", 200, 300).as_json_str().unwrap(),
1740            ),
1741            test_region_route(RegionId::new(table_id, 4), ""),
1742        ];
1743        let failed_merge_plan = RepartitionPlanEntry {
1744            group_id: Uuid::new_v4(),
1745            source_regions: vec![
1746                SourceRegionDescriptor::partitioned(
1747                    RegionId::new(table_id, 1),
1748                    range_expr("x", 0, 100),
1749                ),
1750                SourceRegionDescriptor::partitioned(
1751                    RegionId::new(table_id, 2),
1752                    range_expr("x", 100, 200),
1753                ),
1754            ],
1755            target_regions: vec![TargetRegionDescriptor {
1756                region_id: RegionId::new(table_id, 1),
1757                partition_expr: range_expr("x", 0, 200),
1758            }],
1759            allocated_region_ids: vec![],
1760            pending_deallocate_region_ids: vec![RegionId::new(table_id, 2)],
1761            transition_map: vec![vec![0], vec![0]],
1762            original_target_routes: vec![original_region_routes[0].clone()],
1763        };
1764        let succeeded_split_plan = RepartitionPlanEntry {
1765            group_id: Uuid::new_v4(),
1766            source_regions: vec![SourceRegionDescriptor::partitioned(
1767                RegionId::new(table_id, 3),
1768                range_expr("x", 200, 300),
1769            )],
1770            target_regions: vec![
1771                TargetRegionDescriptor {
1772                    region_id: RegionId::new(table_id, 3),
1773                    partition_expr: range_expr("x", 200, 250),
1774                },
1775                TargetRegionDescriptor {
1776                    region_id: RegionId::new(table_id, 4),
1777                    partition_expr: range_expr("x", 250, 300),
1778                },
1779            ],
1780            allocated_region_ids: vec![RegionId::new(table_id, 4)],
1781            pending_deallocate_region_ids: vec![],
1782            transition_map: vec![vec![0, 1]],
1783            original_target_routes: vec![
1784                original_region_routes[2].clone(),
1785                original_region_routes[3].clone(),
1786            ],
1787        };
1788        let current_region_routes =
1789            apply_group_staging(&failed_merge_plan, &original_region_routes);
1790        let current_region_routes =
1791            apply_group_staging(&succeeded_split_plan, &current_region_routes);
1792        let staged_region_routes =
1793            exit_group_staging(&succeeded_split_plan, &current_region_routes);
1794        env.create_physical_table_metadata_with_wal_options(
1795            table_id,
1796            staged_region_routes,
1797            test_region_wal_options(&[1, 2, 3, 4]),
1798        )
1799        .await;
1800
1801        let mut persistent_ctx = PersistentContext::new(
1802            TableName::new("test_catalog", "test_schema", "test_table"),
1803            table_id,
1804            None,
1805        );
1806        persistent_ctx.plans = vec![failed_merge_plan, succeeded_split_plan.clone()];
1807        persistent_ctx.failed_procedures = vec![ProcedureMeta {
1808            plan_index: 0,
1809            group_id: persistent_ctx.plans[0].group_id,
1810            procedure_id: ProcedureId::random(),
1811        }];
1812
1813        let context = Context::new(
1814            &ddl_ctx,
1815            env.mailbox_ctx.mailbox().clone(),
1816            env.server_addr.clone(),
1817            persistent_ctx,
1818        );
1819        let mut procedure = RepartitionProcedure {
1820            state: Box::new(Collect::new(vec![])),
1821            context,
1822        };
1823
1824        procedure
1825            .rollback(&TestingEnv::procedure_context())
1826            .await
1827            .unwrap();
1828
1829        let region_routes = current_parent_region_routes(&procedure.context).await;
1830        assert_eq!(
1831            region_routes,
1832            vec![
1833                test_region_route(
1834                    RegionId::new(table_id, 1),
1835                    &range_expr("x", 0, 100).as_json_str().unwrap(),
1836                ),
1837                test_region_route(
1838                    RegionId::new(table_id, 2),
1839                    &range_expr("x", 100, 200).as_json_str().unwrap(),
1840                ),
1841                RegionRoute {
1842                    region: Region {
1843                        id: RegionId::new(table_id, 3),
1844                        partition_expr: range_expr("x", 200, 250).as_json_str().unwrap(),
1845                        ..Default::default()
1846                    },
1847                    leader_peer: Some(Peer::empty(1)),
1848                    ..Default::default()
1849                },
1850                RegionRoute {
1851                    region: Region {
1852                        id: RegionId::new(table_id, 4),
1853                        partition_expr: range_expr("x", 250, 300).as_json_str().unwrap(),
1854                        ..Default::default()
1855                    },
1856                    leader_peer: Some(Peer::empty(1)),
1857                    ..Default::default()
1858                },
1859            ]
1860        );
1861    }
1862
1863    #[tokio::test]
1864    async fn test_repartition_procedure_flow_split_failed_and_full_rollback() {
1865        let env = TestingEnv::new();
1866        let table_id = 1024;
1867        let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
1868
1869        env.create_physical_table_metadata_for_repartition(
1870            table_id,
1871            vec![
1872                test_region_route(
1873                    RegionId::new(table_id, 1),
1874                    &range_expr("x", 0, 100).as_json_str().unwrap(),
1875                ),
1876                test_region_route(
1877                    RegionId::new(table_id, 2),
1878                    &range_expr("x", 100, 200).as_json_str().unwrap(),
1879                ),
1880            ],
1881            test_region_wal_options(&[1, 2]),
1882        )
1883        .await;
1884
1885        let context = new_parent_context(&env, node_manager, table_id);
1886        let mut procedure = RepartitionProcedure::new(
1887            RepartitionFrom::Partitioned {
1888                exprs: vec![range_expr("x", 0, 100)],
1889                target_partition_columns: None,
1890            },
1891            vec![range_expr("x", 0, 50), range_expr("x", 50, 100)],
1892            context,
1893        );
1894
1895        let start_status = procedure
1896            .execute(&TestingEnv::procedure_context())
1897            .await
1898            .unwrap();
1899        assert!(!start_status.need_persist());
1900        let start_status = procedure
1901            .execute(&TestingEnv::procedure_context())
1902            .await
1903            .unwrap();
1904        assert!(start_status.need_persist());
1905        assert_parent_state::<AllocateRegion>(&procedure);
1906
1907        let allocate_status = procedure
1908            .execute(&TestingEnv::procedure_context())
1909            .await
1910            .unwrap();
1911        assert!(allocate_status.need_persist());
1912        assert_parent_state::<Dispatch>(&procedure);
1913        assert_eq!(procedure.context.persistent_ctx.plans.len(), 1);
1914        let plan = &procedure.context.persistent_ctx.plans[0];
1915        let expected_plan = test_plan(table_id);
1916        assert_eq!(plan.source_regions, expected_plan.source_regions);
1917        assert_eq!(plan.target_regions, expected_plan.target_regions);
1918        assert_eq!(
1919            plan.allocated_region_ids,
1920            expected_plan.allocated_region_ids
1921        );
1922        assert_eq!(
1923            plan.pending_deallocate_region_ids,
1924            expected_plan.pending_deallocate_region_ids
1925        );
1926        assert_eq!(plan.transition_map, expected_plan.transition_map);
1927        assert_eq!(
1928            current_parent_region_routes(&procedure.context).await,
1929            vec![
1930                test_region_route(
1931                    RegionId::new(table_id, 1),
1932                    &range_expr("x", 0, 100).as_json_str().unwrap(),
1933                ),
1934                test_region_route(
1935                    RegionId::new(table_id, 2),
1936                    &range_expr("x", 100, 200).as_json_str().unwrap(),
1937                ),
1938                RegionRoute {
1939                    region: Region {
1940                        id: RegionId::new(table_id, 3),
1941                        partition_expr: range_expr("x", 50, 100).as_json_str().unwrap(),
1942                        ..Default::default()
1943                    },
1944                    leader_peer: Some(Peer::empty(0)),
1945                    ..Default::default()
1946                },
1947            ]
1948        );
1949
1950        let dispatch_status = procedure
1951            .execute(&TestingEnv::procedure_context())
1952            .await
1953            .unwrap();
1954        assert!(!dispatch_status.need_persist());
1955        let subprocedure_ids = extract_subprocedure_ids(dispatch_status);
1956        assert_eq!(subprocedure_ids.len(), 1);
1957        assert_parent_state::<Collect>(&procedure);
1958
1959        let failed_state = ProcedureState::failed(Arc::new(ProcedureError::external(
1960            MockError::new(StatusCode::Internal),
1961        )));
1962        let collect_ctx = procedure_context_with_receivers(HashMap::from([(
1963            subprocedure_ids[0],
1964            procedure_state_receiver(failed_state),
1965        )]));
1966
1967        let err = procedure.execute(&collect_ctx).await.unwrap_err();
1968        assert!(!err.is_retry_later());
1969        assert_parent_state::<Collect>(&procedure);
1970
1971        procedure
1972            .rollback(&TestingEnv::procedure_context())
1973            .await
1974            .unwrap();
1975
1976        let region_routes = current_parent_region_routes(&procedure.context).await;
1977        assert_eq!(
1978            region_routes,
1979            vec![
1980                test_region_route(
1981                    RegionId::new(table_id, 1),
1982                    &range_expr("x", 0, 100).as_json_str().unwrap(),
1983                ),
1984                test_region_route(
1985                    RegionId::new(table_id, 2),
1986                    &range_expr("x", 100, 200).as_json_str().unwrap(),
1987                ),
1988            ]
1989        );
1990    }
1991
1992    #[tokio::test]
1993    async fn test_repartition_procedure_flow_unpartitioned_failed_and_full_rollback() {
1994        let env = TestingEnv::new();
1995        let table_id = 1024;
1996        let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
1997
1998        env.create_physical_table_metadata_for_repartition(
1999            table_id,
2000            vec![test_region_route(RegionId::new(table_id, 1), "")],
2001            test_region_wal_options(&[1]),
2002        )
2003        .await;
2004
2005        let context = new_parent_context(&env, node_manager, table_id);
2006        let to_exprs = vec![range_expr("col1", 0, 50), range_expr("col1", 50, 100)];
2007        let mut procedure = RepartitionProcedure::new(
2008            RepartitionFrom::Unpartitioned {
2009                partition_columns: vec!["col1".to_string()],
2010            },
2011            to_exprs.clone(),
2012            context,
2013        );
2014
2015        let start_status = procedure
2016            .execute(&TestingEnv::procedure_context())
2017            .await
2018            .unwrap();
2019        assert!(start_status.need_persist());
2020        assert_parent_state::<UpdatePartitionMetadata>(&procedure);
2021        assert_eq!(
2022            procedure
2023                .context
2024                .persistent_ctx
2025                .partition_metadata_update
2026                .as_ref()
2027                .unwrap()
2028                .target_partition_key_indices,
2029            vec![0]
2030        );
2031
2032        let update_status = procedure
2033            .execute(&TestingEnv::procedure_context())
2034            .await
2035            .unwrap();
2036        assert!(update_status.need_persist());
2037        assert_parent_state::<AllocateRegion>(&procedure);
2038        assert_eq!(
2039            table_partition_key_indices(&procedure.context).await,
2040            vec![0]
2041        );
2042
2043        let build_allocate_status = procedure
2044            .execute(&TestingEnv::procedure_context())
2045            .await
2046            .unwrap();
2047        assert!(build_allocate_status.need_persist());
2048        assert_parent_state::<AllocateRegion>(&procedure);
2049        assert_eq!(procedure.context.persistent_ctx.plans.len(), 1);
2050        let plan = &procedure.context.persistent_ctx.plans[0];
2051        assert_eq!(
2052            plan.source_regions,
2053            vec![SourceRegionDescriptor::Default {
2054                region_id: RegionId::new(table_id, 1)
2055            }]
2056        );
2057        assert_eq!(plan.target_regions.len(), 2);
2058        assert_eq!(plan.target_regions[0].region_id, RegionId::new(table_id, 1));
2059        assert_eq!(plan.target_regions[0].partition_expr, to_exprs[0]);
2060        assert_eq!(
2061            plan.allocated_region_ids,
2062            vec![plan.target_regions[1].region_id]
2063        );
2064        assert!(plan.pending_deallocate_region_ids.is_empty());
2065        assert_eq!(plan.transition_map, vec![vec![0, 1]]);
2066        let target_regions = plan.target_regions.clone();
2067
2068        let execute_allocate_status = procedure
2069            .execute(&TestingEnv::procedure_context())
2070            .await
2071            .unwrap();
2072        assert!(execute_allocate_status.need_persist());
2073        assert_parent_state::<Dispatch>(&procedure);
2074        let region_routes = current_parent_region_routes(&procedure.context).await;
2075        assert_eq!(region_routes.len(), 2);
2076        assert_eq!(
2077            region_route_by_id(&region_routes, target_regions[0].region_id)
2078                .region
2079                .partition_expr(),
2080            ""
2081        );
2082        assert_eq!(
2083            region_route_by_id(&region_routes, target_regions[1].region_id)
2084                .region
2085                .partition_expr(),
2086            to_exprs[1].as_json_str().unwrap()
2087        );
2088
2089        let dispatch_status = procedure
2090            .execute(&TestingEnv::procedure_context())
2091            .await
2092            .unwrap();
2093        let subprocedure_ids = extract_subprocedure_ids(dispatch_status);
2094        assert_eq!(subprocedure_ids.len(), 1);
2095        assert_parent_state::<Collect>(&procedure);
2096
2097        let failed_state = ProcedureState::failed(Arc::new(ProcedureError::external(
2098            MockError::new(StatusCode::Internal),
2099        )));
2100        let collect_ctx = procedure_context_with_receivers(HashMap::from([(
2101            subprocedure_ids[0],
2102            procedure_state_receiver(failed_state),
2103        )]));
2104        let err = procedure.execute(&collect_ctx).await.unwrap_err();
2105        assert!(!err.is_retry_later());
2106        assert_parent_state::<Collect>(&procedure);
2107
2108        procedure
2109            .rollback(&TestingEnv::procedure_context())
2110            .await
2111            .unwrap();
2112
2113        assert!(
2114            table_partition_key_indices(&procedure.context)
2115                .await
2116                .is_empty()
2117        );
2118        assert_eq!(
2119            current_parent_region_routes(&procedure.context).await,
2120            vec![test_region_route(RegionId::new(table_id, 1), "")]
2121        );
2122    }
2123
2124    #[tokio::test]
2125    async fn test_repartition_procedure_flow_unpartitioned_rollback_is_idempotent() {
2126        let env = TestingEnv::new();
2127        let table_id = 1024;
2128        let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
2129
2130        env.create_physical_table_metadata_for_repartition(
2131            table_id,
2132            vec![test_region_route(RegionId::new(table_id, 1), "")],
2133            test_region_wal_options(&[1]),
2134        )
2135        .await;
2136
2137        let context = new_parent_context(&env, node_manager, table_id);
2138        let mut procedure = RepartitionProcedure::new(
2139            RepartitionFrom::Unpartitioned {
2140                partition_columns: vec!["col1".to_string()],
2141            },
2142            vec![range_expr("col1", 0, 50), range_expr("col1", 50, 100)],
2143            context,
2144        );
2145
2146        procedure
2147            .execute(&TestingEnv::procedure_context())
2148            .await
2149            .unwrap();
2150        procedure
2151            .execute(&TestingEnv::procedure_context())
2152            .await
2153            .unwrap();
2154        procedure
2155            .execute(&TestingEnv::procedure_context())
2156            .await
2157            .unwrap();
2158        procedure
2159            .execute(&TestingEnv::procedure_context())
2160            .await
2161            .unwrap();
2162        assert_eq!(
2163            table_partition_key_indices(&procedure.context).await,
2164            vec![0]
2165        );
2166        assert_eq!(
2167            current_parent_region_routes(&procedure.context).await.len(),
2168            2
2169        );
2170
2171        let dispatch_status = procedure
2172            .execute(&TestingEnv::procedure_context())
2173            .await
2174            .unwrap();
2175        let subprocedure_ids = extract_subprocedure_ids(dispatch_status);
2176        assert_eq!(subprocedure_ids.len(), 1);
2177        assert_parent_state::<Collect>(&procedure);
2178
2179        let failed_state = ProcedureState::failed(Arc::new(ProcedureError::external(
2180            MockError::new(StatusCode::Internal),
2181        )));
2182        let collect_ctx = procedure_context_with_receivers(HashMap::from([(
2183            subprocedure_ids[0],
2184            procedure_state_receiver(failed_state),
2185        )]));
2186        let err = procedure.execute(&collect_ctx).await.unwrap_err();
2187        assert!(!err.is_retry_later());
2188
2189        procedure
2190            .rollback(&TestingEnv::procedure_context())
2191            .await
2192            .unwrap();
2193        let once_indices = table_partition_key_indices(&procedure.context).await;
2194        let once_routes = current_parent_region_routes(&procedure.context).await;
2195
2196        procedure
2197            .rollback(&TestingEnv::procedure_context())
2198            .await
2199            .unwrap();
2200        let twice_indices = table_partition_key_indices(&procedure.context).await;
2201        let twice_routes = current_parent_region_routes(&procedure.context).await;
2202
2203        assert_eq!(once_indices, twice_indices);
2204        assert_eq!(once_routes, twice_routes);
2205        assert!(twice_indices.is_empty());
2206        assert_eq!(
2207            twice_routes,
2208            vec![test_region_route(RegionId::new(table_id, 1), "")]
2209        );
2210    }
2211
2212    #[tokio::test]
2213    async fn test_repartition_procedure_flow_split_allocate_retryable_then_resume() {
2214        common_telemetry::init_default_ut_logging();
2215        let env = TestingEnv::new();
2216        let table_id = 1024;
2217        let (tx, _rx) = mpsc::channel(8);
2218        let should_retry = Arc::new(AtomicBool::new(true));
2219        let datanode_handler = DatanodeWatcher::new(tx).with_handler(move |_, _| {
2220            if should_retry.swap(false, Ordering::SeqCst) {
2221                return Err(error::Error::RetryLater {
2222                    source: BoxedError::new(
2223                        error::UnexpectedSnafu {
2224                            err_msg: "retry later",
2225                        }
2226                        .build(),
2227                    ),
2228                    clean_poisons: false,
2229                });
2230            }
2231
2232            Ok(api::region::RegionResponse::new(0))
2233        });
2234        let node_manager = Arc::new(MockDatanodeManager::new(datanode_handler));
2235
2236        env.create_physical_table_metadata_for_repartition(
2237            table_id,
2238            vec![
2239                test_region_route(
2240                    RegionId::new(table_id, 1),
2241                    &range_expr("x", 0, 100).as_json_str().unwrap(),
2242                ),
2243                test_region_route(
2244                    RegionId::new(table_id, 2),
2245                    &range_expr("x", 100, 200).as_json_str().unwrap(),
2246                ),
2247            ],
2248            test_region_wal_options(&[1, 2]),
2249        )
2250        .await;
2251
2252        let context = new_parent_context(&env, node_manager, table_id);
2253        let mut procedure = RepartitionProcedure::new(
2254            RepartitionFrom::Partitioned {
2255                exprs: vec![range_expr("x", 0, 100)],
2256                target_partition_columns: None,
2257            },
2258            vec![range_expr("x", 0, 50), range_expr("x", 50, 100)],
2259            context,
2260        );
2261
2262        let start_status = procedure
2263            .execute(&TestingEnv::procedure_context())
2264            .await
2265            .unwrap();
2266        assert!(!start_status.need_persist());
2267        let start_status = procedure
2268            .execute(&TestingEnv::procedure_context())
2269            .await
2270            .unwrap();
2271        assert!(start_status.need_persist());
2272        assert_parent_state::<AllocateRegion>(&procedure);
2273
2274        let err = procedure
2275            .execute(&TestingEnv::procedure_context())
2276            .await
2277            .unwrap_err();
2278        assert!(err.is_retry_later());
2279        assert_parent_state::<AllocateRegion>(&procedure);
2280        assert!(!procedure.context.persistent_ctx.plans.is_empty());
2281        assert_eq!(
2282            current_parent_region_routes(&procedure.context).await,
2283            vec![
2284                test_region_route(
2285                    RegionId::new(table_id, 1),
2286                    &range_expr("x", 0, 100).as_json_str().unwrap(),
2287                ),
2288                test_region_route(
2289                    RegionId::new(table_id, 2),
2290                    &range_expr("x", 100, 200).as_json_str().unwrap(),
2291                ),
2292            ]
2293        );
2294
2295        let allocate_status = procedure
2296            .execute(&TestingEnv::procedure_context())
2297            .await
2298            .unwrap();
2299        assert!(allocate_status.need_persist());
2300        assert_parent_state::<Dispatch>(&procedure);
2301
2302        assert_eq!(procedure.context.persistent_ctx.plans.len(), 1);
2303        let plan = &procedure.context.persistent_ctx.plans[0];
2304        let expected_plan = test_plan(table_id);
2305        assert_eq!(plan.source_regions, expected_plan.source_regions);
2306        assert_eq!(plan.target_regions, expected_plan.target_regions);
2307        assert_eq!(
2308            plan.allocated_region_ids,
2309            expected_plan.allocated_region_ids
2310        );
2311        assert_eq!(plan.transition_map, expected_plan.transition_map);
2312        assert_eq!(
2313            current_parent_region_routes(&procedure.context).await,
2314            vec![
2315                test_region_route(
2316                    RegionId::new(table_id, 1),
2317                    &range_expr("x", 0, 100).as_json_str().unwrap(),
2318                ),
2319                test_region_route(
2320                    RegionId::new(table_id, 2),
2321                    &range_expr("x", 100, 200).as_json_str().unwrap(),
2322                ),
2323                RegionRoute {
2324                    region: Region {
2325                        id: RegionId::new(table_id, 3),
2326                        partition_expr: range_expr("x", 50, 100).as_json_str().unwrap(),
2327                        ..Default::default()
2328                    },
2329                    leader_peer: Some(Peer::empty(0)),
2330                    ..Default::default()
2331                },
2332            ]
2333        );
2334
2335        let dispatch_status = procedure
2336            .execute(&TestingEnv::procedure_context())
2337            .await
2338            .unwrap();
2339        assert!(!dispatch_status.need_persist());
2340        let subprocedure_ids = extract_subprocedure_ids(dispatch_status);
2341        assert_eq!(subprocedure_ids.len(), 1);
2342        assert_parent_state::<Collect>(&procedure);
2343    }
2344}