Skip to main content

meta_srv/procedure/
repartition.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod allocate_region;
16pub mod collect;
17pub mod deallocate_region;
18pub mod dispatch;
19pub mod group;
20pub mod plan;
21pub mod repartition_end;
22pub mod repartition_start;
23pub mod utils;
24
25use std::any::Any;
26use std::collections::{HashMap, HashSet};
27use std::fmt::{Debug, Display};
28use std::time::{Duration, Instant};
29
30use common_error::ext::BoxedError;
31use common_meta::cache_invalidator::CacheInvalidatorRef;
32use common_meta::ddl::DdlContext;
33use common_meta::ddl::allocator::region_routes::RegionRoutesAllocatorRef;
34use common_meta::ddl::allocator::wal_options::WalOptionsAllocatorRef;
35use common_meta::ddl_manager::RepartitionProcedureFactory;
36use common_meta::instruction::CacheIdent;
37use common_meta::key::datanode_table::RegionInfo;
38use common_meta::key::table_info::TableInfoValue;
39use common_meta::key::table_route::TableRouteValue;
40use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
41use common_meta::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
42use common_meta::node_manager::NodeManagerRef;
43use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard};
44use common_meta::region_registry::LeaderRegionRegistryRef;
45use common_meta::rpc::router::{RegionRoute, operating_leader_region_roles};
46use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
47use common_procedure::{
48    BoxedProcedure, Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
49    ProcedureManagerRef, Result as ProcedureResult, Status, StringKey, UserMetadata,
50};
51use common_telemetry::{error, info, warn};
52use partition::expr::PartitionExpr;
53use serde::{Deserialize, Serialize};
54use snafu::{OptionExt, ResultExt};
55use store_api::storage::{RegionNumber, TableId};
56use table::table_name::TableName;
57
58use crate::error::{self, Result};
59use crate::procedure::repartition::collect::ProcedureMeta;
60use crate::procedure::repartition::deallocate_region::DeallocateRegion;
61use crate::procedure::repartition::group::{
62    Context as RepartitionGroupContext, RepartitionGroupProcedure, region_routes,
63};
64use crate::procedure::repartition::plan::RepartitionPlanEntry;
65use crate::procedure::repartition::repartition_start::RepartitionStart;
66use crate::procedure::repartition::utils::{
67    get_datanode_table_value, rollback_group_metadata_routes,
68};
69use crate::service::mailbox::MailboxRef;
70
71#[cfg(test)]
72pub mod test_util;
73
74#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
75pub struct PersistentContext {
76    pub catalog_name: String,
77    pub schema_name: String,
78    pub table_name: String,
79    pub table_id: TableId,
80    pub plans: Vec<RepartitionPlanEntry>,
81    #[serde(default)]
82    /// Records failed sub-procedures for parent rollback selection.
83    ///
84    /// The parent repartition procedure uses these entries to decide which plans
85    /// require group-metadata restoration and allocated-region cleanup.
86    pub failed_procedures: Vec<ProcedureMeta>,
87    #[serde(default)]
88    /// Records unknown sub-procedures for parent rollback selection.
89    ///
90    /// Unknown procedures are treated the same as failed ones when selecting the
91    /// plan subset that must be rolled back by the parent procedure.
92    pub unknown_procedures: Vec<ProcedureMeta>,
93    /// The timeout for repartition operations.
94    #[serde(with = "humantime_serde", default = "default_timeout")]
95    pub timeout: Duration,
96}
97
98fn default_timeout() -> Duration {
99    Duration::from_mins(2)
100}
101
102impl PersistentContext {
103    /// Creates a new [PersistentContext] with the given table name, table id and timeout.
104    ///
105    /// If the timeout is not provided, the default timeout will be used.
106    pub fn new(
107        TableName {
108            catalog_name,
109            schema_name,
110            table_name,
111        }: TableName,
112        table_id: TableId,
113        timeout: Option<Duration>,
114    ) -> Self {
115        Self {
116            catalog_name,
117            schema_name,
118            table_name,
119            table_id,
120            plans: vec![],
121            failed_procedures: vec![],
122            unknown_procedures: vec![],
123            timeout: timeout.unwrap_or_else(default_timeout),
124        }
125    }
126
127    pub fn lock_key(&self) -> Vec<StringKey> {
128        vec![
129            CatalogLock::Read(&self.catalog_name).into(),
130            SchemaLock::read(&self.catalog_name, &self.schema_name).into(),
131            TableLock::Write(self.table_id).into(),
132            TableNameLock::new(&self.catalog_name, &self.schema_name, &self.table_name).into(),
133        ]
134    }
135}
136
137#[derive(Clone)]
138pub struct Context {
139    pub persistent_ctx: PersistentContext,
140    pub volatile_ctx: VolatileContext,
141    pub table_metadata_manager: TableMetadataManagerRef,
142    pub memory_region_keeper: MemoryRegionKeeperRef,
143    pub node_manager: NodeManagerRef,
144    pub leader_region_registry: LeaderRegionRegistryRef,
145    pub mailbox: MailboxRef,
146    pub server_addr: String,
147    pub cache_invalidator: CacheInvalidatorRef,
148    pub region_routes_allocator: RegionRoutesAllocatorRef,
149    pub wal_options_allocator: WalOptionsAllocatorRef,
150    pub start_time: Instant,
151}
152
153#[derive(Debug, Clone, Default)]
154pub struct VolatileContext {
155    pub metrics: Metrics,
156    pub dispatch_start_time: Option<Instant>,
157}
158
159/// Metrics of repartition.
160#[derive(Debug, Clone, Default)]
161pub struct Metrics {
162    /// Elapsed time of building plan.
163    build_plan_elapsed: Duration,
164    /// Elapsed time of allocating region.
165    allocate_region_elapsed: Duration,
166    /// Elapsed time of finishing groups.
167    finish_groups_elapsed: Duration,
168    /// Elapsed time of deallocating region.
169    deallocate_region_elapsed: Duration,
170}
171
172impl Display for Metrics {
173    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174        let total = self.build_plan_elapsed
175            + self.allocate_region_elapsed
176            + self.finish_groups_elapsed
177            + self.deallocate_region_elapsed;
178        write!(f, "total: {:?}", total)?;
179        let mut parts = Vec::with_capacity(4);
180        if self.build_plan_elapsed > Duration::ZERO {
181            parts.push(format!("build_plan_elapsed: {:?}", self.build_plan_elapsed));
182        }
183        if self.allocate_region_elapsed > Duration::ZERO {
184            parts.push(format!(
185                "allocate_region_elapsed: {:?}",
186                self.allocate_region_elapsed
187            ));
188        }
189        if self.finish_groups_elapsed > Duration::ZERO {
190            parts.push(format!(
191                "finish_groups_elapsed: {:?}",
192                self.finish_groups_elapsed
193            ));
194        }
195        if self.deallocate_region_elapsed > Duration::ZERO {
196            parts.push(format!(
197                "deallocate_region_elapsed: {:?}",
198                self.deallocate_region_elapsed
199            ));
200        }
201
202        if !parts.is_empty() {
203            write!(f, ", {}", parts.join(", "))?;
204        }
205        Ok(())
206    }
207}
208
209impl Metrics {
210    /// Updates the elapsed time of building plan.
211    pub fn update_build_plan_elapsed(&mut self, elapsed: Duration) {
212        self.build_plan_elapsed += elapsed;
213    }
214
215    /// Updates the elapsed time of allocating region.
216    pub fn update_allocate_region_elapsed(&mut self, elapsed: Duration) {
217        self.allocate_region_elapsed += elapsed;
218    }
219
220    /// Updates the elapsed time of finishing groups.
221    pub fn update_finish_groups_elapsed(&mut self, elapsed: Duration) {
222        self.finish_groups_elapsed += elapsed;
223    }
224
225    /// Updates the elapsed time of deallocating region.
226    pub fn update_deallocate_region_elapsed(&mut self, elapsed: Duration) {
227        self.deallocate_region_elapsed += elapsed;
228    }
229}
230
231impl Context {
232    pub fn new(
233        ddl_ctx: &DdlContext,
234        mailbox: MailboxRef,
235        server_addr: String,
236        persistent_ctx: PersistentContext,
237    ) -> Self {
238        Self {
239            persistent_ctx,
240            table_metadata_manager: ddl_ctx.table_metadata_manager.clone(),
241            memory_region_keeper: ddl_ctx.memory_region_keeper.clone(),
242            node_manager: ddl_ctx.node_manager.clone(),
243            leader_region_registry: ddl_ctx.leader_region_registry.clone(),
244            mailbox,
245            server_addr,
246            cache_invalidator: ddl_ctx.cache_invalidator.clone(),
247            region_routes_allocator: ddl_ctx.table_metadata_allocator.region_routes_allocator(),
248            wal_options_allocator: ddl_ctx.table_metadata_allocator.wal_options_allocator(),
249            start_time: Instant::now(),
250            volatile_ctx: VolatileContext::default(),
251        }
252    }
253
254    /// Returns the next operation's timeout.
255    pub fn next_operation_timeout(&self) -> Option<Duration> {
256        self.persistent_ctx
257            .timeout
258            .checked_sub(self.start_time.elapsed())
259    }
260
261    /// Updates the elapsed time of building plan.
262    pub fn update_build_plan_elapsed(&mut self, elapsed: Duration) {
263        self.volatile_ctx.metrics.update_build_plan_elapsed(elapsed);
264    }
265
266    /// Updates the elapsed time of allocating region.
267    pub fn update_allocate_region_elapsed(&mut self, elapsed: Duration) {
268        self.volatile_ctx
269            .metrics
270            .update_allocate_region_elapsed(elapsed);
271    }
272
273    /// Updates the elapsed time of finishing groups.
274    pub fn update_finish_groups_elapsed(&mut self, elapsed: Duration) {
275        self.volatile_ctx
276            .metrics
277            .update_finish_groups_elapsed(elapsed);
278    }
279
280    /// Updates the elapsed time of deallocating region.
281    pub fn update_deallocate_region_elapsed(&mut self, elapsed: Duration) {
282        self.volatile_ctx
283            .metrics
284            .update_deallocate_region_elapsed(elapsed);
285    }
286
287    /// Retrieves the table route value for the given table id.
288    ///
289    /// Retry:
290    /// - Failed to retrieve the metadata of table.
291    ///
292    /// Abort:
293    /// - Table route not found.
294    pub async fn get_table_route_value(
295        &self,
296    ) -> Result<DeserializedValueWithBytes<TableRouteValue>> {
297        let table_id = self.persistent_ctx.table_id;
298        let table_route_value = self
299            .table_metadata_manager
300            .table_route_manager()
301            .table_route_storage()
302            .get_with_raw_bytes(table_id)
303            .await
304            .map_err(BoxedError::new)
305            .with_context(|_| error::RetryLaterWithSourceSnafu {
306                reason: format!("Failed to get table route for table: {}", table_id),
307            })?
308            .context(error::TableRouteNotFoundSnafu { table_id })?;
309
310        Ok(table_route_value)
311    }
312
313    /// Retrieves the table info value for the given table id.
314    ///
315    /// Retry:
316    /// - Failed to retrieve the metadata of table.
317    ///
318    /// Abort:
319    /// - Table info not found.
320    pub async fn get_table_info_value(&self) -> Result<TableInfoValue> {
321        let table_id = self.persistent_ctx.table_id;
322        let table_info_value = self
323            .table_metadata_manager
324            .table_info_manager()
325            .get(table_id)
326            .await
327            .map_err(BoxedError::new)
328            .with_context(|_| error::RetryLaterWithSourceSnafu {
329                reason: format!("Failed to get table info for table: {}", table_id),
330            })?
331            .context(error::TableInfoNotFoundSnafu { table_id })?
332            .into_inner();
333        Ok(table_info_value)
334    }
335
336    /// Updates the table route.
337    ///
338    /// Retry:
339    /// - Failed to retrieve the metadata of datanode table.
340    ///
341    /// Abort:
342    /// - Table route not found.
343    /// - Failed to update the table route.
344    pub async fn update_table_route(
345        &self,
346        current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
347        new_region_routes: Vec<RegionRoute>,
348        new_region_wal_options: HashMap<RegionNumber, String>,
349    ) -> Result<()> {
350        let table_id = self.persistent_ctx.table_id;
351        if new_region_routes.is_empty() {
352            return error::UnexpectedSnafu {
353                violated: format!("new_region_routes is empty for table: {}", table_id),
354            }
355            .fail();
356        }
357        let datanode_id = new_region_routes
358            .first()
359            .unwrap()
360            .leader_peer
361            .as_ref()
362            .context(error::NoLeaderSnafu)?
363            .id;
364        let datanode_table_value =
365            get_datanode_table_value(&self.table_metadata_manager, table_id, datanode_id).await?;
366
367        let RegionInfo {
368            region_options,
369            region_wal_options,
370            ..
371        } = &datanode_table_value.region_info;
372
373        // Merge and validate the new region wal options.
374        let validated_region_wal_options =
375            crate::procedure::repartition::utils::merge_and_validate_region_wal_options(
376                region_wal_options,
377                new_region_wal_options,
378                &new_region_routes,
379                table_id,
380            )?;
381        info!(
382            "Updating table route for table: {}, new region routes: {:?}",
383            table_id, new_region_routes
384        );
385        self.table_metadata_manager
386            .update_table_route(
387                table_id,
388                datanode_table_value.region_info.clone(),
389                current_table_route_value,
390                new_region_routes,
391                region_options,
392                &validated_region_wal_options,
393            )
394            .await
395            .context(error::TableMetadataManagerSnafu)
396    }
397
398    /// Broadcasts the invalidate table cache message.
399    pub async fn invalidate_table_cache(&self) -> Result<()> {
400        let table_id = self.persistent_ctx.table_id;
401        let subject = format!(
402            "Invalidate table cache for repartition table, table: {}",
403            table_id,
404        );
405        let ctx = common_meta::cache_invalidator::Context {
406            subject: Some(subject),
407        };
408        let _ = self
409            .cache_invalidator
410            .invalidate(&ctx, &[CacheIdent::TableId(table_id)])
411            .await;
412        Ok(())
413    }
414
415    pub fn register_operating_regions(
416        memory_region_keeper: &MemoryRegionKeeperRef,
417        region_routes: &[RegionRoute],
418    ) -> Result<Vec<OperatingRegionGuard>> {
419        let mut operating_guards = Vec::with_capacity(region_routes.len());
420        for (region_id, datanode_id, role) in operating_leader_region_roles(region_routes) {
421            let guard = memory_region_keeper
422                .register_with_role(datanode_id, region_id, role)
423                .context(error::RegionOperatingRaceSnafu {
424                    peer_id: datanode_id,
425                    region_id,
426                })?;
427            operating_guards.push(guard);
428        }
429        Ok(operating_guards)
430    }
431}
432
433#[async_trait::async_trait]
434#[typetag::serde(tag = "repartition_state")]
435pub(crate) trait State: Sync + Send + Debug {
436    fn name(&self) -> &'static str {
437        let type_name = std::any::type_name::<Self>();
438        // short name
439        type_name.split("::").last().unwrap_or(type_name)
440    }
441
442    /// Yields the next [State] and [Status].
443    async fn next(
444        &mut self,
445        ctx: &mut Context,
446        procedure_ctx: &ProcedureContext,
447    ) -> Result<(Box<dyn State>, Status)>;
448
449    fn as_any(&self) -> &dyn Any;
450}
451
452pub struct RepartitionProcedure {
453    state: Box<dyn State>,
454    context: Context,
455}
456
457#[derive(Debug, Serialize)]
458struct RepartitionData<'a> {
459    state: &'a dyn State,
460    persistent_ctx: &'a PersistentContext,
461}
462
463#[derive(Debug, Deserialize)]
464struct RepartitionDataOwned {
465    state: Box<dyn State>,
466    persistent_ctx: PersistentContext,
467}
468
469impl RepartitionProcedure {
470    const TYPE_NAME: &'static str = "metasrv-procedure::Repartition";
471
472    pub fn new(
473        from_exprs: Vec<PartitionExpr>,
474        to_exprs: Vec<PartitionExpr>,
475        context: Context,
476    ) -> Self {
477        let state = Box::new(RepartitionStart::new(from_exprs, to_exprs));
478
479        Self { state, context }
480    }
481
482    pub fn from_json<F>(json: &str, ctx_factory: F) -> ProcedureResult<Self>
483    where
484        F: FnOnce(PersistentContext) -> Context,
485    {
486        let RepartitionDataOwned {
487            state,
488            persistent_ctx,
489        } = serde_json::from_str(json).context(FromJsonSnafu)?;
490        let context = ctx_factory(persistent_ctx);
491
492        Ok(Self { state, context })
493    }
494
495    /// Returns whether parent rollback should remove this repartition's allocated regions.
496    ///
497    /// This uses an "after AllocateRegion" semantic: once execution reaches
498    /// `AllocateRegion` or any later state, rollback must try to remove this round's
499    /// `allocated_region_ids` from table-route metadata when they exist.
500    ///
501    /// State flow:
502    /// `RepartitionStart -> AllocateRegion -> Dispatch -> Collect -> DeallocateRegion -> RepartitionEnd`
503    ///                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
504    ///                     rollback allocated regions in metadata
505    ///
506    /// Notes:
507    /// - `RepartitionStart`: no-op, because allocation has not happened yet.
508    /// - `AllocateRegion` / `Dispatch` / `Collect`  rollback-active.
509    /// - `DeallocateRegion`: is not rollback-active.
510    /// - `RepartitionEnd`: no-op.
511    fn should_rollback_allocated_regions(&self) -> bool {
512        self.state.as_any().is::<allocate_region::AllocateRegion>()
513            || self.state.as_any().is::<dispatch::Dispatch>()
514            || self.state.as_any().is::<collect::Collect>()
515    }
516
517    fn rollback_plan_indices(&self) -> HashSet<usize> {
518        self.context
519            .persistent_ctx
520            .failed_procedures
521            .iter()
522            .chain(self.context.persistent_ctx.unknown_procedures.iter())
523            .map(|procedure_meta| procedure_meta.plan_index)
524            .collect()
525    }
526
527    /// Returns allocated region ids that parent rollback should remove.
528    ///
529    /// Rollback uses an "after AllocateRegion" semantic:
530    /// - in `AllocateRegion` and `Dispatch`, all allocated regions belong to the
531    ///   current repartition attempt and must be cleaned up.
532    /// - in `Collect`, only the plans referenced by failed or unknown
533    ///   sub-procedures should be rolled back.
534    fn rollback_allocated_region_ids(&self) -> HashSet<store_api::storage::RegionId> {
535        if self.state.as_any().is::<allocate_region::AllocateRegion>()
536            || self.state.as_any().is::<dispatch::Dispatch>()
537        {
538            return self
539                .context
540                .persistent_ctx
541                .plans
542                .iter()
543                .flat_map(|plan| plan.allocated_region_ids.iter().copied())
544                .collect();
545        }
546
547        self.rollback_plan_indices()
548            .into_iter()
549            .flat_map(|plan_index| {
550                self.context.persistent_ctx.plans[plan_index]
551                    .allocated_region_ids
552                    .iter()
553                    .copied()
554            })
555            .collect()
556    }
557
558    /// Restores group-level staging metadata for failed/unknown plans.
559    ///
560    /// The helper mutates `region_routes` in memory.
561    async fn rollback_group_metadata_for_selected_plans(
562        &mut self,
563        region_routes: &mut [RegionRoute],
564    ) -> Result<()> {
565        let rollback_plan_indices = self.rollback_plan_indices();
566        if rollback_plan_indices.is_empty() {
567            return Ok(());
568        }
569
570        let mut region_routes_map = region_routes
571            .iter_mut()
572            .map(|route| (route.region.id, route))
573            .collect::<HashMap<_, _>>();
574        for plan_index in rollback_plan_indices {
575            let plan = &self.context.persistent_ctx.plans[plan_index];
576            rollback_group_metadata_routes(
577                plan.group_id,
578                &plan.source_regions,
579                &plan.original_target_routes,
580                &plan.allocated_region_ids,
581                &plan.pending_deallocate_region_ids,
582                &mut region_routes_map,
583            )?;
584        }
585
586        Ok(())
587    }
588
589    async fn rollback_inner(&mut self, procedure_ctx: &ProcedureContext) -> Result<()> {
590        if !self.should_rollback_allocated_regions() {
591            return Ok(());
592        }
593
594        let table_id = self.context.persistent_ctx.table_id;
595        let allocated_region_ids = self.rollback_allocated_region_ids();
596
597        let table_lock = TableLock::Write(table_id).into();
598        let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
599        let table_route_value = self.context.get_table_route_value().await?;
600        let original_region_routes = region_routes(table_id, table_route_value.get_inner_ref())?;
601        let mut current_region_routes = original_region_routes.clone();
602        self.rollback_group_metadata_for_selected_plans(&mut current_region_routes)
603            .await?;
604        let allocated_region_routes = DeallocateRegion::filter_deallocatable_region_routes(
605            table_id,
606            &current_region_routes,
607            &allocated_region_ids,
608        );
609        if !allocated_region_routes.is_empty() {
610            let table = TableName {
611                catalog_name: self.context.persistent_ctx.catalog_name.clone(),
612                schema_name: self.context.persistent_ctx.schema_name.clone(),
613                table_name: self.context.persistent_ctx.table_name.clone(),
614            };
615            // Memory guards are not required here,
616            // because the table metadata still contains routes for the deallocating regions.
617            if let Err(err) = DeallocateRegion::deallocate_regions(
618                &self.context.node_manager,
619                &self.context.leader_region_registry,
620                table,
621                table_id,
622                &allocated_region_routes,
623            )
624            .await
625            {
626                warn!(err; "Failed to drop allocated regions during repartition rollback, table_id: {}, regions: {:?}", table_id, allocated_region_ids);
627            }
628        }
629
630        let new_region_routes =
631            DeallocateRegion::generate_region_routes(&current_region_routes, &allocated_region_ids);
632
633        if new_region_routes != *original_region_routes {
634            self.context
635                .update_table_route(&table_route_value, new_region_routes, HashMap::new())
636                .await
637                .map_err(BoxedError::new)
638                .with_context(|_| error::RetryLaterWithSourceSnafu {
639                    reason: format!(
640                        "Failed to rollback allocated region routes for repartition table: {}",
641                        table_id
642                    ),
643                })?;
644        }
645
646        if let Err(err) = self.context.invalidate_table_cache().await {
647            warn!(err; "Failed to invalidate table cache during repartition rollback, table_id: {}", table_id);
648        }
649
650        Ok(())
651    }
652}
653
654#[async_trait::async_trait]
655impl Procedure for RepartitionProcedure {
656    fn type_name(&self) -> &str {
657        Self::TYPE_NAME
658    }
659
660    #[tracing::instrument(skip_all, fields(
661        state = %self.state.name(),
662        table_id = %self.context.persistent_ctx.table_id
663    ))]
664    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
665        let state = &mut self.state;
666        let state_name = state.name();
667        // Log state transition
668        common_telemetry::info!(
669            "Repartition procedure executing state: {}, table_id: {}",
670            state_name,
671            self.context.persistent_ctx.table_id
672        );
673        match state.next(&mut self.context, _ctx).await {
674            Ok((next, status)) => {
675                *state = next;
676                Ok(status)
677            }
678            Err(e) => {
679                if e.is_retryable() {
680                    Err(ProcedureError::retry_later(e))
681                } else {
682                    error!(
683                        e;
684                        "Repartition procedure failed, table id: {}",
685                        self.context.persistent_ctx.table_id,
686                    );
687                    Err(ProcedureError::external(e))
688                }
689            }
690        }
691    }
692
693    async fn rollback(&mut self, ctx: &ProcedureContext) -> ProcedureResult<()> {
694        self.rollback_inner(ctx)
695            .await
696            .map_err(ProcedureError::external)
697    }
698
699    fn rollback_supported(&self) -> bool {
700        true
701    }
702
703    fn dump(&self) -> ProcedureResult<String> {
704        let data = RepartitionData {
705            state: self.state.as_ref(),
706            persistent_ctx: &self.context.persistent_ctx,
707        };
708        serde_json::to_string(&data).context(ToJsonSnafu)
709    }
710
711    fn lock_key(&self) -> LockKey {
712        LockKey::new(self.context.persistent_ctx.lock_key())
713    }
714
715    fn user_metadata(&self) -> Option<UserMetadata> {
716        // TODO(weny): support user metadata.
717        None
718    }
719}
720
721pub struct DefaultRepartitionProcedureFactory {
722    mailbox: MailboxRef,
723    server_addr: String,
724}
725
726impl DefaultRepartitionProcedureFactory {
727    pub fn new(mailbox: MailboxRef, server_addr: String) -> Self {
728        Self {
729            mailbox,
730            server_addr,
731        }
732    }
733}
734
735impl RepartitionProcedureFactory for DefaultRepartitionProcedureFactory {
736    fn create(
737        &self,
738        ddl_ctx: &DdlContext,
739        table_name: TableName,
740        table_id: TableId,
741        from_exprs: Vec<String>,
742        to_exprs: Vec<String>,
743        timeout: Option<Duration>,
744    ) -> std::result::Result<BoxedProcedure, BoxedError> {
745        let persistent_ctx = PersistentContext::new(table_name, table_id, timeout);
746        let from_exprs = from_exprs
747            .iter()
748            .map(|e| {
749                PartitionExpr::from_json_str(e)
750                    .context(error::DeserializePartitionExprSnafu)?
751                    .context(error::EmptyPartitionExprSnafu)
752            })
753            .collect::<Result<Vec<_>>>()
754            .map_err(BoxedError::new)?;
755        let to_exprs = to_exprs
756            .iter()
757            .map(|e| {
758                PartitionExpr::from_json_str(e)
759                    .context(error::DeserializePartitionExprSnafu)?
760                    .context(error::EmptyPartitionExprSnafu)
761            })
762            .collect::<Result<Vec<_>>>()
763            .map_err(BoxedError::new)?;
764
765        let procedure = RepartitionProcedure::new(
766            from_exprs,
767            to_exprs,
768            Context::new(
769                ddl_ctx,
770                self.mailbox.clone(),
771                self.server_addr.clone(),
772                persistent_ctx,
773            ),
774        );
775
776        Ok(Box::new(procedure))
777    }
778
779    fn register_loaders(
780        &self,
781        ddl_ctx: &DdlContext,
782        procedure_manager: &ProcedureManagerRef,
783    ) -> std::result::Result<(), BoxedError> {
784        // Registers the repartition procedure loader.
785        let mailbox = self.mailbox.clone();
786        let server_addr = self.server_addr.clone();
787        let moved_ddl_ctx = ddl_ctx.clone();
788        procedure_manager
789            .register_loader(
790                RepartitionProcedure::TYPE_NAME,
791                Box::new(move |json| {
792                    let mailbox = mailbox.clone();
793                    let server_addr = server_addr.clone();
794                    let ddl_ctx = moved_ddl_ctx.clone();
795                    let factory = move |persistent_ctx| {
796                        Context::new(&ddl_ctx, mailbox, server_addr, persistent_ctx)
797                    };
798                    RepartitionProcedure::from_json(json, factory).map(|p| Box::new(p) as _)
799                }),
800            )
801            .map_err(BoxedError::new)?;
802
803        // Registers the repartition group procedure loader.
804        let mailbox = self.mailbox.clone();
805        let server_addr = self.server_addr.clone();
806        let moved_ddl_ctx = ddl_ctx.clone();
807        procedure_manager
808            .register_loader(
809                RepartitionGroupProcedure::TYPE_NAME,
810                Box::new(move |json| {
811                    let mailbox = mailbox.clone();
812                    let server_addr = server_addr.clone();
813                    let ddl_ctx = moved_ddl_ctx.clone();
814                    let factory = move |persistent_ctx| {
815                        RepartitionGroupContext::new(&ddl_ctx, mailbox, server_addr, persistent_ctx)
816                    };
817                    RepartitionGroupProcedure::from_json(json, factory).map(|p| Box::new(p) as _)
818                }),
819            )
820            .map_err(BoxedError::new)?;
821
822        Ok(())
823    }
824}
825
826#[cfg(test)]
827mod tests {
828    use std::collections::HashMap;
829    use std::sync::Arc;
830    use std::sync::atomic::{AtomicBool, Ordering};
831
832    use common_error::ext::BoxedError;
833    use common_error::mock::MockError;
834    use common_error::status_code::StatusCode;
835    use common_meta::ddl::test_util::datanode_handler::{
836        DatanodeWatcher, NaiveDatanodeHandler, UnexpectedErrorDatanodeHandler,
837    };
838    use common_meta::error;
839    use common_meta::peer::Peer;
840    use common_meta::region_keeper::MemoryRegionKeeper;
841    use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
842    use common_meta::test_util::MockDatanodeManager;
843    use common_procedure::{Error as ProcedureError, Procedure, ProcedureId, ProcedureState};
844    use store_api::region_engine::RegionRole;
845    use store_api::storage::RegionId;
846    use table::table_name::TableName;
847    use tokio::sync::mpsc;
848    use uuid::Uuid;
849
850    use super::*;
851    use crate::procedure::repartition::allocate_region::AllocateRegion;
852    use crate::procedure::repartition::collect::Collect;
853    use crate::procedure::repartition::deallocate_region::DeallocateRegion;
854    use crate::procedure::repartition::dispatch::Dispatch;
855    use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
856    use crate::procedure::repartition::plan::RegionDescriptor;
857    use crate::procedure::repartition::repartition_end::RepartitionEnd;
858    use crate::procedure::repartition::test_util::{
859        TestingEnv, assert_parent_state, current_parent_region_routes, extract_subprocedure_ids,
860        new_parent_context, procedure_context_with_receivers, procedure_state_receiver, range_expr,
861        test_region_route, test_region_wal_options,
862    };
863
864    fn test_plan(table_id: TableId) -> RepartitionPlanEntry {
865        RepartitionPlanEntry {
866            group_id: uuid::Uuid::new_v4(),
867            source_regions: vec![RegionDescriptor {
868                region_id: RegionId::new(table_id, 1),
869                partition_expr: range_expr("x", 0, 100),
870            }],
871            target_regions: vec![
872                RegionDescriptor {
873                    region_id: RegionId::new(table_id, 1),
874                    partition_expr: range_expr("x", 0, 50),
875                },
876                RegionDescriptor {
877                    region_id: RegionId::new(table_id, 3),
878                    partition_expr: range_expr("x", 50, 100),
879                },
880            ],
881            allocated_region_ids: vec![RegionId::new(table_id, 3)],
882            pending_deallocate_region_ids: vec![],
883            transition_map: vec![vec![0, 1]],
884            original_target_routes: vec![],
885        }
886    }
887
888    fn with_rollback_metadata(
889        mut plan: RepartitionPlanEntry,
890        original_target_routes: Vec<RegionRoute>,
891    ) -> RepartitionPlanEntry {
892        plan.original_target_routes = original_target_routes;
893        plan
894    }
895
896    fn apply_group_staging(
897        plan: &RepartitionPlanEntry,
898        current_region_routes: &[RegionRoute],
899    ) -> Vec<RegionRoute> {
900        UpdateMetadata::apply_staging_region_routes(
901            plan.group_id,
902            &plan.source_regions,
903            &plan.target_regions,
904            &plan.pending_deallocate_region_ids,
905            current_region_routes,
906        )
907        .unwrap()
908    }
909
910    fn exit_group_staging(
911        plan: &RepartitionPlanEntry,
912        current_region_routes: &[RegionRoute],
913    ) -> Vec<RegionRoute> {
914        UpdateMetadata::exit_staging_region_routes(
915            plan.group_id,
916            &plan.source_regions,
917            &plan.target_regions,
918            current_region_routes,
919        )
920        .unwrap()
921    }
922
923    fn region_route_by_id(region_routes: &[RegionRoute], region_id: RegionId) -> &RegionRoute {
924        region_routes
925            .iter()
926            .find(|route| route.region.id == region_id)
927            .unwrap()
928    }
929
930    fn test_procedure(state: Box<dyn State>, context: Context) -> RepartitionProcedure {
931        RepartitionProcedure { state, context }
932    }
933
934    fn test_context(env: &TestingEnv, table_id: TableId) -> Context {
935        let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
936        let ddl_ctx = env.ddl_context(node_manager);
937        let persistent_ctx = PersistentContext::new(
938            TableName::new("test_catalog", "test_schema", "test_table"),
939            table_id,
940            None,
941        );
942
943        Context::new(
944            &ddl_ctx,
945            env.mailbox_ctx.mailbox().clone(),
946            env.server_addr.clone(),
947            persistent_ctx,
948        )
949    }
950
951    #[test]
952    fn test_filter_allocated_region_routes() {
953        let table_id = 1024;
954        let region_routes = vec![
955            test_region_route(RegionId::new(table_id, 1), "a"),
956            test_region_route(RegionId::new(table_id, 2), "b"),
957        ];
958        let allocated_region_ids = HashSet::from([RegionId::new(table_id, 2)]);
959
960        let new_region_routes =
961            DeallocateRegion::generate_region_routes(&region_routes, &allocated_region_ids);
962
963        assert_eq!(new_region_routes.len(), 1);
964        assert_eq!(new_region_routes[0].region.id, RegionId::new(table_id, 1));
965    }
966
967    #[test]
968    fn test_should_rollback_allocated_regions() {
969        let env = TestingEnv::new();
970        let table_id = 1024;
971
972        let procedure = test_procedure(
973            Box::new(RepartitionStart::new(vec![], vec![])),
974            test_context(&env, table_id),
975        );
976        assert!(!procedure.should_rollback_allocated_regions());
977
978        let procedure = test_procedure(
979            Box::new(AllocateRegion::new(vec![])),
980            test_context(&env, table_id),
981        );
982        assert!(procedure.should_rollback_allocated_regions());
983
984        let procedure = test_procedure(Box::new(Dispatch), test_context(&env, table_id));
985        assert!(procedure.should_rollback_allocated_regions());
986
987        let procedure =
988            test_procedure(Box::new(Collect::new(vec![])), test_context(&env, table_id));
989        assert!(procedure.should_rollback_allocated_regions());
990
991        let procedure = test_procedure(Box::new(DeallocateRegion), test_context(&env, table_id));
992        assert!(!procedure.should_rollback_allocated_regions());
993
994        let procedure = test_procedure(Box::new(RepartitionEnd), test_context(&env, table_id));
995        assert!(!procedure.should_rollback_allocated_regions());
996    }
997
998    #[test]
999    fn test_register_operating_regions_preserves_route_roles() {
1000        let keeper = Arc::new(MemoryRegionKeeper::new());
1001        let region_routes = vec![
1002            RegionRoute {
1003                region: Region::new_test(RegionId::new(1024, 1)),
1004                leader_peer: Some(Peer::empty(1)),
1005                follower_peers: vec![],
1006                leader_state: None,
1007                leader_down_since: None,
1008                write_route_policy: None,
1009            },
1010            RegionRoute {
1011                region: Region::new_test(RegionId::new(1024, 2)),
1012                leader_peer: Some(Peer::empty(2)),
1013                follower_peers: vec![],
1014                leader_state: Some(LeaderState::Staging),
1015                leader_down_since: None,
1016                write_route_policy: None,
1017            },
1018            RegionRoute {
1019                region: Region::new_test(RegionId::new(1024, 3)),
1020                leader_peer: Some(Peer::empty(3)),
1021                follower_peers: vec![],
1022                leader_state: Some(LeaderState::Downgrading),
1023                leader_down_since: None,
1024                write_route_policy: None,
1025            },
1026        ];
1027
1028        let _guards = Context::register_operating_regions(&keeper, &region_routes).unwrap();
1029
1030        let leader_roles =
1031            keeper.extract_operating_region_roles(1, &HashSet::from([RegionId::new(1024, 1)]));
1032        let staging_roles =
1033            keeper.extract_operating_region_roles(2, &HashSet::from([RegionId::new(1024, 2)]));
1034        let downgrading_roles =
1035            keeper.extract_operating_region_roles(3, &HashSet::from([RegionId::new(1024, 3)]));
1036
1037        assert_eq!(
1038            leader_roles.get(&RegionId::new(1024, 1)),
1039            Some(&RegionRole::Leader)
1040        );
1041        assert_eq!(
1042            staging_roles.get(&RegionId::new(1024, 2)),
1043            Some(&RegionRole::StagingLeader)
1044        );
1045        assert_eq!(
1046            downgrading_roles.get(&RegionId::new(1024, 3)),
1047            Some(&RegionRole::DowngradingLeader)
1048        );
1049    }
1050
1051    #[tokio::test]
1052    async fn test_repartition_rollback_removes_allocated_routes_from_dispatch() {
1053        let env = TestingEnv::new();
1054        let table_id = 1024;
1055        let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
1056        let ddl_ctx = env.ddl_context(node_manager);
1057        let original_region_routes = vec![
1058            test_region_route(
1059                RegionId::new(table_id, 1),
1060                &range_expr("x", 0, 100).as_json_str().unwrap(),
1061            ),
1062            test_region_route(
1063                RegionId::new(table_id, 2),
1064                &range_expr("x", 50, 100).as_json_str().unwrap(),
1065            ),
1066            test_region_route(RegionId::new(table_id, 3), ""),
1067        ];
1068        env.create_physical_table_metadata_with_wal_options(
1069            table_id,
1070            original_region_routes,
1071            test_region_wal_options(&[1, 2]),
1072        )
1073        .await;
1074
1075        let mut persistent_ctx = PersistentContext::new(
1076            TableName::new("test_catalog", "test_schema", "test_table"),
1077            table_id,
1078            None,
1079        );
1080        persistent_ctx.plans = vec![with_rollback_metadata(
1081            test_plan(table_id),
1082            vec![
1083                test_region_route(
1084                    RegionId::new(table_id, 1),
1085                    &range_expr("x", 0, 100).as_json_str().unwrap(),
1086                ),
1087                test_region_route(RegionId::new(table_id, 3), ""),
1088            ],
1089        )];
1090        persistent_ctx.failed_procedures = vec![ProcedureMeta {
1091            plan_index: 0,
1092            group_id: Uuid::new_v4(),
1093            procedure_id: ProcedureId::random(),
1094        }];
1095        let context = Context::new(
1096            &ddl_ctx,
1097            env.mailbox_ctx.mailbox().clone(),
1098            env.server_addr.clone(),
1099            persistent_ctx,
1100        );
1101        let mut procedure = RepartitionProcedure {
1102            state: Box::new(Dispatch),
1103            context,
1104        };
1105
1106        procedure
1107            .rollback(&TestingEnv::procedure_context())
1108            .await
1109            .unwrap();
1110
1111        let region_routes = current_parent_region_routes(&procedure.context).await;
1112        assert_eq!(region_routes.len(), 2);
1113        assert_eq!(region_routes[0].region.id, RegionId::new(table_id, 1));
1114        assert_eq!(region_routes[1].region.id, RegionId::new(table_id, 2));
1115    }
1116
1117    #[tokio::test]
1118    async fn test_repartition_rollback_removes_allocated_routes_from_allocate() {
1119        let env = TestingEnv::new();
1120        let table_id = 1024;
1121        let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
1122        let ddl_ctx = env.ddl_context(node_manager);
1123        let original_region_routes = vec![
1124            test_region_route(
1125                RegionId::new(table_id, 1),
1126                &range_expr("x", 0, 100).as_json_str().unwrap(),
1127            ),
1128            test_region_route(
1129                RegionId::new(table_id, 2),
1130                &range_expr("x", 50, 100).as_json_str().unwrap(),
1131            ),
1132            test_region_route(RegionId::new(table_id, 3), ""),
1133        ];
1134        env.create_physical_table_metadata_with_wal_options(
1135            table_id,
1136            original_region_routes,
1137            test_region_wal_options(&[1, 2]),
1138        )
1139        .await;
1140
1141        let mut persistent_ctx = PersistentContext::new(
1142            TableName::new("test_catalog", "test_schema", "test_table"),
1143            table_id,
1144            None,
1145        );
1146        persistent_ctx.plans = vec![test_plan(table_id)];
1147        let context = Context::new(
1148            &ddl_ctx,
1149            env.mailbox_ctx.mailbox().clone(),
1150            env.server_addr.clone(),
1151            persistent_ctx,
1152        );
1153        let mut procedure = RepartitionProcedure {
1154            state: Box::new(AllocateRegion::new(vec![])),
1155            context,
1156        };
1157
1158        procedure
1159            .rollback(&TestingEnv::procedure_context())
1160            .await
1161            .unwrap();
1162
1163        let region_routes = current_parent_region_routes(&procedure.context).await;
1164        assert_eq!(region_routes.len(), 2);
1165        assert_eq!(region_routes[0].region.id, RegionId::new(table_id, 1));
1166        assert_eq!(region_routes[1].region.id, RegionId::new(table_id, 2));
1167    }
1168
1169    #[tokio::test]
1170    async fn test_repartition_rollback_from_collect_only_removes_failed_allocated_routes() {
1171        let env = TestingEnv::new();
1172        let table_id = 1024;
1173        let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
1174        let ddl_ctx = env.ddl_context(node_manager);
1175        let original_region_routes = vec![
1176            test_region_route(
1177                RegionId::new(table_id, 1),
1178                &range_expr("x", 0, 100).as_json_str().unwrap(),
1179            ),
1180            test_region_route(
1181                RegionId::new(table_id, 2),
1182                &range_expr("x", 100, 200).as_json_str().unwrap(),
1183            ),
1184            test_region_route(RegionId::new(table_id, 3), ""),
1185            test_region_route(RegionId::new(table_id, 4), ""),
1186        ];
1187        env.create_physical_table_metadata_with_wal_options(
1188            table_id,
1189            original_region_routes,
1190            test_region_wal_options(&[1, 2, 3, 4]),
1191        )
1192        .await;
1193
1194        let mut persistent_ctx = PersistentContext::new(
1195            TableName::new("test_catalog", "test_schema", "test_table"),
1196            table_id,
1197            None,
1198        );
1199        let failed_plan = test_plan(table_id);
1200        let failed_plan = with_rollback_metadata(
1201            failed_plan,
1202            vec![
1203                test_region_route(
1204                    RegionId::new(table_id, 1),
1205                    &range_expr("x", 0, 100).as_json_str().unwrap(),
1206                ),
1207                test_region_route(RegionId::new(table_id, 3), ""),
1208            ],
1209        );
1210        let succeeded_plan = RepartitionPlanEntry {
1211            group_id: Uuid::new_v4(),
1212            source_regions: vec![RegionDescriptor {
1213                region_id: RegionId::new(table_id, 2),
1214                partition_expr: range_expr("x", 100, 200),
1215            }],
1216            target_regions: vec![
1217                RegionDescriptor {
1218                    region_id: RegionId::new(table_id, 2),
1219                    partition_expr: range_expr("x", 100, 150),
1220                },
1221                RegionDescriptor {
1222                    region_id: RegionId::new(table_id, 4),
1223                    partition_expr: range_expr("x", 150, 200),
1224                },
1225            ],
1226            allocated_region_ids: vec![RegionId::new(table_id, 4)],
1227            pending_deallocate_region_ids: vec![],
1228            transition_map: vec![vec![0]],
1229            original_target_routes: vec![
1230                test_region_route(
1231                    RegionId::new(table_id, 2),
1232                    &range_expr("x", 100, 200).as_json_str().unwrap(),
1233                ),
1234                test_region_route(RegionId::new(table_id, 4), ""),
1235            ],
1236        };
1237        persistent_ctx.plans = vec![failed_plan, succeeded_plan];
1238        persistent_ctx.failed_procedures = vec![ProcedureMeta {
1239            plan_index: 0,
1240            group_id: persistent_ctx.plans[0].group_id,
1241            procedure_id: ProcedureId::random(),
1242        }];
1243
1244        let context = Context::new(
1245            &ddl_ctx,
1246            env.mailbox_ctx.mailbox().clone(),
1247            env.server_addr.clone(),
1248            persistent_ctx,
1249        );
1250        let mut procedure = RepartitionProcedure {
1251            state: Box::new(Collect::new(vec![])),
1252            context,
1253        };
1254
1255        procedure
1256            .rollback(&TestingEnv::procedure_context())
1257            .await
1258            .unwrap();
1259
1260        let region_routes = current_parent_region_routes(&procedure.context).await;
1261        assert_eq!(region_routes.len(), 3);
1262        assert_eq!(region_routes[0].region.id, RegionId::new(table_id, 1));
1263        assert_eq!(region_routes[1].region.id, RegionId::new(table_id, 2));
1264        assert_eq!(region_routes[2].region.id, RegionId::new(table_id, 4));
1265    }
1266
1267    #[tokio::test]
1268    async fn test_repartition_rollback_from_collect_restores_failed_group_metadata_only() {
1269        let env = TestingEnv::new();
1270        let table_id = 1024;
1271        let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
1272        let ddl_ctx = env.ddl_context(node_manager);
1273        let original_region_routes = vec![
1274            test_region_route(
1275                RegionId::new(table_id, 1),
1276                &range_expr("x", 0, 100).as_json_str().unwrap(),
1277            ),
1278            test_region_route(
1279                RegionId::new(table_id, 2),
1280                &range_expr("x", 100, 200).as_json_str().unwrap(),
1281            ),
1282            test_region_route(RegionId::new(table_id, 3), ""),
1283            test_region_route(RegionId::new(table_id, 4), ""),
1284        ];
1285
1286        let failed_plan = with_rollback_metadata(
1287            test_plan(table_id),
1288            vec![
1289                original_region_routes[0].clone(),
1290                original_region_routes[2].clone(),
1291            ],
1292        );
1293        let succeeded_plan = RepartitionPlanEntry {
1294            group_id: Uuid::new_v4(),
1295            source_regions: vec![RegionDescriptor {
1296                region_id: RegionId::new(table_id, 2),
1297                partition_expr: range_expr("x", 100, 200),
1298            }],
1299            target_regions: vec![
1300                RegionDescriptor {
1301                    region_id: RegionId::new(table_id, 2),
1302                    partition_expr: range_expr("x", 100, 150),
1303                },
1304                RegionDescriptor {
1305                    region_id: RegionId::new(table_id, 4),
1306                    partition_expr: range_expr("x", 150, 200),
1307                },
1308            ],
1309            allocated_region_ids: vec![RegionId::new(table_id, 4)],
1310            pending_deallocate_region_ids: vec![],
1311            transition_map: vec![vec![0, 1]],
1312            original_target_routes: vec![
1313                original_region_routes[1].clone(),
1314                original_region_routes[3].clone(),
1315            ],
1316        };
1317        let current_region_routes = apply_group_staging(&failed_plan, &original_region_routes);
1318        let current_region_routes = apply_group_staging(&succeeded_plan, &current_region_routes);
1319        let current_region_routes = exit_group_staging(&succeeded_plan, &current_region_routes);
1320        env.create_physical_table_metadata_with_wal_options(
1321            table_id,
1322            current_region_routes,
1323            test_region_wal_options(&[1, 2, 3, 4]),
1324        )
1325        .await;
1326
1327        let mut persistent_ctx = PersistentContext::new(
1328            TableName::new("test_catalog", "test_schema", "test_table"),
1329            table_id,
1330            None,
1331        );
1332        persistent_ctx.plans = vec![failed_plan, succeeded_plan.clone()];
1333        persistent_ctx.failed_procedures = vec![ProcedureMeta {
1334            plan_index: 0,
1335            group_id: persistent_ctx.plans[0].group_id,
1336            procedure_id: ProcedureId::random(),
1337        }];
1338
1339        let context = Context::new(
1340            &ddl_ctx,
1341            env.mailbox_ctx.mailbox().clone(),
1342            env.server_addr.clone(),
1343            persistent_ctx,
1344        );
1345        let mut procedure = RepartitionProcedure {
1346            state: Box::new(Collect::new(vec![])),
1347            context,
1348        };
1349
1350        procedure
1351            .rollback(&TestingEnv::procedure_context())
1352            .await
1353            .unwrap();
1354
1355        assert_eq!(
1356            current_parent_region_routes(&procedure.context).await,
1357            vec![
1358                test_region_route(
1359                    RegionId::new(table_id, 1),
1360                    &range_expr("x", 0, 100).as_json_str().unwrap(),
1361                ),
1362                RegionRoute {
1363                    region: Region {
1364                        id: RegionId::new(table_id, 2),
1365                        partition_expr: range_expr("x", 100, 150).as_json_str().unwrap(),
1366                        ..Default::default()
1367                    },
1368                    leader_peer: Some(Peer::empty(1)),
1369                    ..Default::default()
1370                },
1371                RegionRoute {
1372                    region: Region {
1373                        id: RegionId::new(table_id, 4),
1374                        partition_expr: range_expr("x", 150, 200).as_json_str().unwrap(),
1375                        ..Default::default()
1376                    },
1377                    leader_peer: Some(Peer::empty(1)),
1378                    ..Default::default()
1379                },
1380            ]
1381        );
1382    }
1383
1384    #[tokio::test]
1385    async fn test_repartition_rollback_from_collect_restores_unknown_group_metadata() {
1386        let env = TestingEnv::new();
1387        let table_id = 1024;
1388        let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
1389        let ddl_ctx = env.ddl_context(node_manager);
1390        let original_region_routes = vec![
1391            test_region_route(
1392                RegionId::new(table_id, 1),
1393                &range_expr("x", 0, 100).as_json_str().unwrap(),
1394            ),
1395            test_region_route(
1396                RegionId::new(table_id, 2),
1397                &range_expr("x", 100, 200).as_json_str().unwrap(),
1398            ),
1399            test_region_route(RegionId::new(table_id, 3), ""),
1400        ];
1401        let plan = with_rollback_metadata(
1402            test_plan(table_id),
1403            vec![
1404                original_region_routes[0].clone(),
1405                original_region_routes[2].clone(),
1406            ],
1407        );
1408        let staged_region_routes = apply_group_staging(&plan, &original_region_routes);
1409        assert_eq!(
1410            region_route_by_id(&staged_region_routes, RegionId::new(table_id, 1))
1411                .region
1412                .partition_expr(),
1413            range_expr("x", 0, 50).as_json_str().unwrap()
1414        );
1415        assert!(
1416            region_route_by_id(&staged_region_routes, RegionId::new(table_id, 1))
1417                .is_leader_staging()
1418        );
1419        assert_eq!(
1420            region_route_by_id(&staged_region_routes, RegionId::new(table_id, 3))
1421                .region
1422                .partition_expr(),
1423            range_expr("x", 50, 100).as_json_str().unwrap()
1424        );
1425        assert!(
1426            region_route_by_id(&staged_region_routes, RegionId::new(table_id, 3))
1427                .is_leader_staging()
1428        );
1429        env.create_physical_table_metadata_with_wal_options(
1430            table_id,
1431            staged_region_routes,
1432            test_region_wal_options(&[1, 2, 3]),
1433        )
1434        .await;
1435
1436        let mut persistent_ctx = PersistentContext::new(
1437            TableName::new("test_catalog", "test_schema", "test_table"),
1438            table_id,
1439            None,
1440        );
1441        persistent_ctx.plans = vec![plan.clone()];
1442        persistent_ctx.unknown_procedures = vec![ProcedureMeta {
1443            plan_index: 0,
1444            group_id: plan.group_id,
1445            procedure_id: ProcedureId::random(),
1446        }];
1447
1448        let context = Context::new(
1449            &ddl_ctx,
1450            env.mailbox_ctx.mailbox().clone(),
1451            env.server_addr.clone(),
1452            persistent_ctx,
1453        );
1454        let mut procedure = RepartitionProcedure {
1455            state: Box::new(Collect::new(vec![])),
1456            context,
1457        };
1458
1459        procedure
1460            .rollback(&TestingEnv::procedure_context())
1461            .await
1462            .unwrap();
1463
1464        assert_eq!(
1465            current_parent_region_routes(&procedure.context).await,
1466            vec![
1467                original_region_routes[0].clone(),
1468                original_region_routes[1].clone()
1469            ]
1470        );
1471    }
1472
1473    #[tokio::test]
1474    async fn test_repartition_rollback_is_idempotent() {
1475        let env = TestingEnv::new();
1476        let table_id = 1024;
1477        let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
1478        let ddl_ctx = env.ddl_context(node_manager);
1479        let original_region_routes = vec![
1480            test_region_route(
1481                RegionId::new(table_id, 1),
1482                &range_expr("x", 0, 100).as_json_str().unwrap(),
1483            ),
1484            test_region_route(
1485                RegionId::new(table_id, 2),
1486                &range_expr("x", 50, 100).as_json_str().unwrap(),
1487            ),
1488            test_region_route(RegionId::new(table_id, 3), ""),
1489        ];
1490        env.create_physical_table_metadata_with_wal_options(
1491            table_id,
1492            original_region_routes,
1493            test_region_wal_options(&[1, 2]),
1494        )
1495        .await;
1496
1497        let mut persistent_ctx = PersistentContext::new(
1498            TableName::new("test_catalog", "test_schema", "test_table"),
1499            table_id,
1500            None,
1501        );
1502        persistent_ctx.plans = vec![with_rollback_metadata(
1503            test_plan(table_id),
1504            vec![
1505                test_region_route(
1506                    RegionId::new(table_id, 1),
1507                    &range_expr("x", 0, 100).as_json_str().unwrap(),
1508                ),
1509                test_region_route(RegionId::new(table_id, 3), ""),
1510            ],
1511        )];
1512        persistent_ctx.failed_procedures = vec![ProcedureMeta {
1513            plan_index: 0,
1514            group_id: Uuid::new_v4(),
1515            procedure_id: ProcedureId::random(),
1516        }];
1517        let context = Context::new(
1518            &ddl_ctx,
1519            env.mailbox_ctx.mailbox().clone(),
1520            env.server_addr.clone(),
1521            persistent_ctx,
1522        );
1523        let mut procedure = RepartitionProcedure {
1524            state: Box::new(Dispatch),
1525            context,
1526        };
1527
1528        procedure
1529            .rollback(&TestingEnv::procedure_context())
1530            .await
1531            .unwrap();
1532        let once = current_parent_region_routes(&procedure.context).await;
1533
1534        procedure
1535            .rollback(&TestingEnv::procedure_context())
1536            .await
1537            .unwrap();
1538        let twice = current_parent_region_routes(&procedure.context).await;
1539
1540        assert_eq!(once, twice);
1541        assert_eq!(once.len(), 2);
1542        assert_eq!(once[0].region.id, RegionId::new(table_id, 1));
1543        assert_eq!(once[1].region.id, RegionId::new(table_id, 2));
1544    }
1545
1546    #[tokio::test]
1547    async fn test_repartition_rollback_from_collect_restores_failed_merge_group_metadata_only() {
1548        let env = TestingEnv::new();
1549        let table_id = 1024;
1550        let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
1551        let ddl_ctx = env.ddl_context(node_manager);
1552        let original_region_routes = vec![
1553            test_region_route(
1554                RegionId::new(table_id, 1),
1555                &range_expr("x", 0, 100).as_json_str().unwrap(),
1556            ),
1557            test_region_route(
1558                RegionId::new(table_id, 2),
1559                &range_expr("x", 100, 200).as_json_str().unwrap(),
1560            ),
1561            test_region_route(
1562                RegionId::new(table_id, 3),
1563                &range_expr("x", 200, 300).as_json_str().unwrap(),
1564            ),
1565            test_region_route(RegionId::new(table_id, 4), ""),
1566        ];
1567        let failed_merge_plan = RepartitionPlanEntry {
1568            group_id: Uuid::new_v4(),
1569            source_regions: vec![
1570                RegionDescriptor {
1571                    region_id: RegionId::new(table_id, 1),
1572                    partition_expr: range_expr("x", 0, 100),
1573                },
1574                RegionDescriptor {
1575                    region_id: RegionId::new(table_id, 2),
1576                    partition_expr: range_expr("x", 100, 200),
1577                },
1578            ],
1579            target_regions: vec![RegionDescriptor {
1580                region_id: RegionId::new(table_id, 1),
1581                partition_expr: range_expr("x", 0, 200),
1582            }],
1583            allocated_region_ids: vec![],
1584            pending_deallocate_region_ids: vec![RegionId::new(table_id, 2)],
1585            transition_map: vec![vec![0], vec![0]],
1586            original_target_routes: vec![original_region_routes[0].clone()],
1587        };
1588        let succeeded_split_plan = RepartitionPlanEntry {
1589            group_id: Uuid::new_v4(),
1590            source_regions: vec![RegionDescriptor {
1591                region_id: RegionId::new(table_id, 3),
1592                partition_expr: range_expr("x", 200, 300),
1593            }],
1594            target_regions: vec![
1595                RegionDescriptor {
1596                    region_id: RegionId::new(table_id, 3),
1597                    partition_expr: range_expr("x", 200, 250),
1598                },
1599                RegionDescriptor {
1600                    region_id: RegionId::new(table_id, 4),
1601                    partition_expr: range_expr("x", 250, 300),
1602                },
1603            ],
1604            allocated_region_ids: vec![RegionId::new(table_id, 4)],
1605            pending_deallocate_region_ids: vec![],
1606            transition_map: vec![vec![0, 1]],
1607            original_target_routes: vec![
1608                original_region_routes[2].clone(),
1609                original_region_routes[3].clone(),
1610            ],
1611        };
1612        let current_region_routes =
1613            apply_group_staging(&failed_merge_plan, &original_region_routes);
1614        let current_region_routes =
1615            apply_group_staging(&succeeded_split_plan, &current_region_routes);
1616        let staged_region_routes =
1617            exit_group_staging(&succeeded_split_plan, &current_region_routes);
1618        env.create_physical_table_metadata_with_wal_options(
1619            table_id,
1620            staged_region_routes,
1621            test_region_wal_options(&[1, 2, 3, 4]),
1622        )
1623        .await;
1624
1625        let mut persistent_ctx = PersistentContext::new(
1626            TableName::new("test_catalog", "test_schema", "test_table"),
1627            table_id,
1628            None,
1629        );
1630        persistent_ctx.plans = vec![failed_merge_plan, succeeded_split_plan.clone()];
1631        persistent_ctx.failed_procedures = vec![ProcedureMeta {
1632            plan_index: 0,
1633            group_id: persistent_ctx.plans[0].group_id,
1634            procedure_id: ProcedureId::random(),
1635        }];
1636
1637        let context = Context::new(
1638            &ddl_ctx,
1639            env.mailbox_ctx.mailbox().clone(),
1640            env.server_addr.clone(),
1641            persistent_ctx,
1642        );
1643        let mut procedure = RepartitionProcedure {
1644            state: Box::new(Collect::new(vec![])),
1645            context,
1646        };
1647
1648        procedure
1649            .rollback(&TestingEnv::procedure_context())
1650            .await
1651            .unwrap();
1652
1653        let region_routes = current_parent_region_routes(&procedure.context).await;
1654        assert_eq!(
1655            region_routes,
1656            vec![
1657                test_region_route(
1658                    RegionId::new(table_id, 1),
1659                    &range_expr("x", 0, 100).as_json_str().unwrap(),
1660                ),
1661                test_region_route(
1662                    RegionId::new(table_id, 2),
1663                    &range_expr("x", 100, 200).as_json_str().unwrap(),
1664                ),
1665                RegionRoute {
1666                    region: Region {
1667                        id: RegionId::new(table_id, 3),
1668                        partition_expr: range_expr("x", 200, 250).as_json_str().unwrap(),
1669                        ..Default::default()
1670                    },
1671                    leader_peer: Some(Peer::empty(1)),
1672                    ..Default::default()
1673                },
1674                RegionRoute {
1675                    region: Region {
1676                        id: RegionId::new(table_id, 4),
1677                        partition_expr: range_expr("x", 250, 300).as_json_str().unwrap(),
1678                        ..Default::default()
1679                    },
1680                    leader_peer: Some(Peer::empty(1)),
1681                    ..Default::default()
1682                },
1683            ]
1684        );
1685    }
1686
1687    #[tokio::test]
1688    async fn test_repartition_procedure_flow_split_failed_and_full_rollback() {
1689        let env = TestingEnv::new();
1690        let table_id = 1024;
1691        let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
1692
1693        env.create_physical_table_metadata_for_repartition(
1694            table_id,
1695            vec![
1696                test_region_route(
1697                    RegionId::new(table_id, 1),
1698                    &range_expr("x", 0, 100).as_json_str().unwrap(),
1699                ),
1700                test_region_route(
1701                    RegionId::new(table_id, 2),
1702                    &range_expr("x", 100, 200).as_json_str().unwrap(),
1703                ),
1704            ],
1705            test_region_wal_options(&[1, 2]),
1706        )
1707        .await;
1708
1709        let context = new_parent_context(&env, node_manager, table_id);
1710        let mut procedure = RepartitionProcedure::new(
1711            vec![range_expr("x", 0, 100)],
1712            vec![range_expr("x", 0, 50), range_expr("x", 50, 100)],
1713            context,
1714        );
1715
1716        let start_status = procedure
1717            .execute(&TestingEnv::procedure_context())
1718            .await
1719            .unwrap();
1720        assert!(!start_status.need_persist());
1721        let start_status = procedure
1722            .execute(&TestingEnv::procedure_context())
1723            .await
1724            .unwrap();
1725        assert!(start_status.need_persist());
1726        assert_parent_state::<AllocateRegion>(&procedure);
1727
1728        let allocate_status = procedure
1729            .execute(&TestingEnv::procedure_context())
1730            .await
1731            .unwrap();
1732        assert!(allocate_status.need_persist());
1733        assert_parent_state::<Dispatch>(&procedure);
1734        assert_eq!(procedure.context.persistent_ctx.plans.len(), 1);
1735        let plan = &procedure.context.persistent_ctx.plans[0];
1736        let expected_plan = test_plan(table_id);
1737        assert_eq!(plan.source_regions, expected_plan.source_regions);
1738        assert_eq!(plan.target_regions, expected_plan.target_regions);
1739        assert_eq!(
1740            plan.allocated_region_ids,
1741            expected_plan.allocated_region_ids
1742        );
1743        assert_eq!(
1744            plan.pending_deallocate_region_ids,
1745            expected_plan.pending_deallocate_region_ids
1746        );
1747        assert_eq!(plan.transition_map, expected_plan.transition_map);
1748        assert_eq!(
1749            current_parent_region_routes(&procedure.context).await,
1750            vec![
1751                test_region_route(
1752                    RegionId::new(table_id, 1),
1753                    &range_expr("x", 0, 100).as_json_str().unwrap(),
1754                ),
1755                test_region_route(
1756                    RegionId::new(table_id, 2),
1757                    &range_expr("x", 100, 200).as_json_str().unwrap(),
1758                ),
1759                RegionRoute {
1760                    region: Region {
1761                        id: RegionId::new(table_id, 3),
1762                        partition_expr: range_expr("x", 50, 100).as_json_str().unwrap(),
1763                        ..Default::default()
1764                    },
1765                    leader_peer: Some(Peer::empty(0)),
1766                    ..Default::default()
1767                },
1768            ]
1769        );
1770
1771        let dispatch_status = procedure
1772            .execute(&TestingEnv::procedure_context())
1773            .await
1774            .unwrap();
1775        assert!(!dispatch_status.need_persist());
1776        let subprocedure_ids = extract_subprocedure_ids(dispatch_status);
1777        assert_eq!(subprocedure_ids.len(), 1);
1778        assert_parent_state::<Collect>(&procedure);
1779
1780        let failed_state = ProcedureState::failed(Arc::new(ProcedureError::external(
1781            MockError::new(StatusCode::Internal),
1782        )));
1783        let collect_ctx = procedure_context_with_receivers(HashMap::from([(
1784            subprocedure_ids[0],
1785            procedure_state_receiver(failed_state),
1786        )]));
1787
1788        let err = procedure.execute(&collect_ctx).await.unwrap_err();
1789        assert!(!err.is_retry_later());
1790        assert_parent_state::<Collect>(&procedure);
1791
1792        procedure
1793            .rollback(&TestingEnv::procedure_context())
1794            .await
1795            .unwrap();
1796
1797        let region_routes = current_parent_region_routes(&procedure.context).await;
1798        assert_eq!(
1799            region_routes,
1800            vec![
1801                test_region_route(
1802                    RegionId::new(table_id, 1),
1803                    &range_expr("x", 0, 100).as_json_str().unwrap(),
1804                ),
1805                test_region_route(
1806                    RegionId::new(table_id, 2),
1807                    &range_expr("x", 100, 200).as_json_str().unwrap(),
1808                ),
1809            ]
1810        );
1811    }
1812
1813    #[tokio::test]
1814    async fn test_repartition_procedure_flow_split_allocate_retryable_then_resume() {
1815        common_telemetry::init_default_ut_logging();
1816        let env = TestingEnv::new();
1817        let table_id = 1024;
1818        let (tx, _rx) = mpsc::channel(8);
1819        let should_retry = Arc::new(AtomicBool::new(true));
1820        let datanode_handler = DatanodeWatcher::new(tx).with_handler(move |_, _| {
1821            if should_retry.swap(false, Ordering::SeqCst) {
1822                return Err(error::Error::RetryLater {
1823                    source: BoxedError::new(
1824                        error::UnexpectedSnafu {
1825                            err_msg: "retry later",
1826                        }
1827                        .build(),
1828                    ),
1829                    clean_poisons: false,
1830                });
1831            }
1832
1833            Ok(api::region::RegionResponse::new(0))
1834        });
1835        let node_manager = Arc::new(MockDatanodeManager::new(datanode_handler));
1836
1837        env.create_physical_table_metadata_for_repartition(
1838            table_id,
1839            vec![
1840                test_region_route(
1841                    RegionId::new(table_id, 1),
1842                    &range_expr("x", 0, 100).as_json_str().unwrap(),
1843                ),
1844                test_region_route(
1845                    RegionId::new(table_id, 2),
1846                    &range_expr("x", 100, 200).as_json_str().unwrap(),
1847                ),
1848            ],
1849            test_region_wal_options(&[1, 2]),
1850        )
1851        .await;
1852
1853        let context = new_parent_context(&env, node_manager, table_id);
1854        let mut procedure = RepartitionProcedure::new(
1855            vec![range_expr("x", 0, 100)],
1856            vec![range_expr("x", 0, 50), range_expr("x", 50, 100)],
1857            context,
1858        );
1859
1860        let start_status = procedure
1861            .execute(&TestingEnv::procedure_context())
1862            .await
1863            .unwrap();
1864        assert!(!start_status.need_persist());
1865        let start_status = procedure
1866            .execute(&TestingEnv::procedure_context())
1867            .await
1868            .unwrap();
1869        assert!(start_status.need_persist());
1870        assert_parent_state::<AllocateRegion>(&procedure);
1871
1872        let err = procedure
1873            .execute(&TestingEnv::procedure_context())
1874            .await
1875            .unwrap_err();
1876        assert!(err.is_retry_later());
1877        assert_parent_state::<AllocateRegion>(&procedure);
1878        assert!(!procedure.context.persistent_ctx.plans.is_empty());
1879        assert_eq!(
1880            current_parent_region_routes(&procedure.context).await,
1881            vec![
1882                test_region_route(
1883                    RegionId::new(table_id, 1),
1884                    &range_expr("x", 0, 100).as_json_str().unwrap(),
1885                ),
1886                test_region_route(
1887                    RegionId::new(table_id, 2),
1888                    &range_expr("x", 100, 200).as_json_str().unwrap(),
1889                ),
1890            ]
1891        );
1892
1893        let allocate_status = procedure
1894            .execute(&TestingEnv::procedure_context())
1895            .await
1896            .unwrap();
1897        assert!(allocate_status.need_persist());
1898        assert_parent_state::<Dispatch>(&procedure);
1899
1900        assert_eq!(procedure.context.persistent_ctx.plans.len(), 1);
1901        let plan = &procedure.context.persistent_ctx.plans[0];
1902        let expected_plan = test_plan(table_id);
1903        assert_eq!(plan.source_regions, expected_plan.source_regions);
1904        assert_eq!(plan.target_regions, expected_plan.target_regions);
1905        assert_eq!(
1906            plan.allocated_region_ids,
1907            expected_plan.allocated_region_ids
1908        );
1909        assert_eq!(plan.transition_map, expected_plan.transition_map);
1910        assert_eq!(
1911            current_parent_region_routes(&procedure.context).await,
1912            vec![
1913                test_region_route(
1914                    RegionId::new(table_id, 1),
1915                    &range_expr("x", 0, 100).as_json_str().unwrap(),
1916                ),
1917                test_region_route(
1918                    RegionId::new(table_id, 2),
1919                    &range_expr("x", 100, 200).as_json_str().unwrap(),
1920                ),
1921                RegionRoute {
1922                    region: Region {
1923                        id: RegionId::new(table_id, 3),
1924                        partition_expr: range_expr("x", 50, 100).as_json_str().unwrap(),
1925                        ..Default::default()
1926                    },
1927                    leader_peer: Some(Peer::empty(0)),
1928                    ..Default::default()
1929                },
1930            ]
1931        );
1932
1933        let dispatch_status = procedure
1934            .execute(&TestingEnv::procedure_context())
1935            .await
1936            .unwrap();
1937        assert!(!dispatch_status.need_persist());
1938        let subprocedure_ids = extract_subprocedure_ids(dispatch_status);
1939        assert_eq!(subprocedure_ids.len(), 1);
1940        assert_parent_state::<Collect>(&procedure);
1941    }
1942}