Skip to main content

meta_srv/procedure/
repartition.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod allocate_region;
16pub mod collect;
17pub mod deallocate_region;
18pub mod dispatch;
19pub mod group;
20pub mod plan;
21pub mod repartition_end;
22pub mod repartition_start;
23pub mod utils;
24
25use std::any::Any;
26use std::collections::{HashMap, HashSet};
27use std::fmt::{Debug, Display};
28use std::time::{Duration, Instant};
29
30use common_error::ext::BoxedError;
31use common_meta::cache_invalidator::CacheInvalidatorRef;
32use common_meta::ddl::DdlContext;
33use common_meta::ddl::allocator::region_routes::RegionRoutesAllocatorRef;
34use common_meta::ddl::allocator::wal_options::WalOptionsAllocatorRef;
35use common_meta::ddl_manager::RepartitionProcedureFactory;
36use common_meta::instruction::CacheIdent;
37use common_meta::key::datanode_table::RegionInfo;
38use common_meta::key::table_info::TableInfoValue;
39use common_meta::key::table_route::TableRouteValue;
40use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
41use common_meta::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
42use common_meta::node_manager::NodeManagerRef;
43use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard};
44use common_meta::region_registry::LeaderRegionRegistryRef;
45use common_meta::rpc::router::{RegionRoute, operating_leader_regions};
46use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
47use common_procedure::{
48    BoxedProcedure, Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
49    ProcedureManagerRef, Result as ProcedureResult, Status, StringKey, UserMetadata,
50};
51use common_telemetry::{error, info, warn};
52use partition::expr::PartitionExpr;
53use serde::{Deserialize, Serialize};
54use snafu::{OptionExt, ResultExt};
55use store_api::storage::{RegionNumber, TableId};
56use table::table_name::TableName;
57
58use crate::error::{self, Result};
59use crate::procedure::repartition::collect::ProcedureMeta;
60use crate::procedure::repartition::deallocate_region::DeallocateRegion;
61use crate::procedure::repartition::group::{
62    Context as RepartitionGroupContext, RepartitionGroupProcedure,
63};
64use crate::procedure::repartition::plan::RepartitionPlanEntry;
65use crate::procedure::repartition::repartition_start::RepartitionStart;
66use crate::procedure::repartition::utils::get_datanode_table_value;
67use crate::service::mailbox::MailboxRef;
68
69#[cfg(test)]
70pub mod test_util;
71
72#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
73pub struct PersistentContext {
74    pub catalog_name: String,
75    pub schema_name: String,
76    pub table_name: String,
77    pub table_id: TableId,
78    pub plans: Vec<RepartitionPlanEntry>,
79    /// Records failed sub-procedures for metadata rollback.
80    #[serde(default)]
81    pub failed_procedures: Vec<ProcedureMeta>,
82    #[serde(default)]
83    /// Records unknown sub-procedures for metadata rollback.
84    pub unknown_procedures: Vec<ProcedureMeta>,
85    /// The timeout for repartition operations.
86    #[serde(with = "humantime_serde", default = "default_timeout")]
87    pub timeout: Duration,
88}
89
90fn default_timeout() -> Duration {
91    Duration::from_mins(2)
92}
93
94impl PersistentContext {
95    /// Creates a new [PersistentContext] with the given table name, table id and timeout.
96    ///
97    /// If the timeout is not provided, the default timeout will be used.
98    pub fn new(
99        TableName {
100            catalog_name,
101            schema_name,
102            table_name,
103        }: TableName,
104        table_id: TableId,
105        timeout: Option<Duration>,
106    ) -> Self {
107        Self {
108            catalog_name,
109            schema_name,
110            table_name,
111            table_id,
112            plans: vec![],
113            failed_procedures: vec![],
114            unknown_procedures: vec![],
115            timeout: timeout.unwrap_or_else(default_timeout),
116        }
117    }
118
119    pub fn lock_key(&self) -> Vec<StringKey> {
120        vec![
121            CatalogLock::Read(&self.catalog_name).into(),
122            SchemaLock::read(&self.catalog_name, &self.schema_name).into(),
123            TableLock::Write(self.table_id).into(),
124            TableNameLock::new(&self.catalog_name, &self.schema_name, &self.table_name).into(),
125        ]
126    }
127}
128
129#[derive(Clone)]
130pub struct Context {
131    pub persistent_ctx: PersistentContext,
132    pub volatile_ctx: VolatileContext,
133    pub table_metadata_manager: TableMetadataManagerRef,
134    pub memory_region_keeper: MemoryRegionKeeperRef,
135    pub node_manager: NodeManagerRef,
136    pub leader_region_registry: LeaderRegionRegistryRef,
137    pub mailbox: MailboxRef,
138    pub server_addr: String,
139    pub cache_invalidator: CacheInvalidatorRef,
140    pub region_routes_allocator: RegionRoutesAllocatorRef,
141    pub wal_options_allocator: WalOptionsAllocatorRef,
142    pub start_time: Instant,
143}
144
145#[derive(Debug, Clone, Default)]
146pub struct VolatileContext {
147    pub metrics: Metrics,
148    pub dispatch_start_time: Option<Instant>,
149}
150
151/// Metrics of repartition.
152#[derive(Debug, Clone, Default)]
153pub struct Metrics {
154    /// Elapsed time of building plan.
155    build_plan_elapsed: Duration,
156    /// Elapsed time of allocating region.
157    allocate_region_elapsed: Duration,
158    /// Elapsed time of finishing groups.
159    finish_groups_elapsed: Duration,
160    /// Elapsed time of deallocating region.
161    deallocate_region_elapsed: Duration,
162}
163
164impl Display for Metrics {
165    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
166        let total = self.build_plan_elapsed
167            + self.allocate_region_elapsed
168            + self.finish_groups_elapsed
169            + self.deallocate_region_elapsed;
170        write!(f, "total: {:?}", total)?;
171        let mut parts = Vec::with_capacity(4);
172        if self.build_plan_elapsed > Duration::ZERO {
173            parts.push(format!("build_plan_elapsed: {:?}", self.build_plan_elapsed));
174        }
175        if self.allocate_region_elapsed > Duration::ZERO {
176            parts.push(format!(
177                "allocate_region_elapsed: {:?}",
178                self.allocate_region_elapsed
179            ));
180        }
181        if self.finish_groups_elapsed > Duration::ZERO {
182            parts.push(format!(
183                "finish_groups_elapsed: {:?}",
184                self.finish_groups_elapsed
185            ));
186        }
187        if self.deallocate_region_elapsed > Duration::ZERO {
188            parts.push(format!(
189                "deallocate_region_elapsed: {:?}",
190                self.deallocate_region_elapsed
191            ));
192        }
193
194        if !parts.is_empty() {
195            write!(f, ", {}", parts.join(", "))?;
196        }
197        Ok(())
198    }
199}
200
201impl Metrics {
202    /// Updates the elapsed time of building plan.
203    pub fn update_build_plan_elapsed(&mut self, elapsed: Duration) {
204        self.build_plan_elapsed += elapsed;
205    }
206
207    /// Updates the elapsed time of allocating region.
208    pub fn update_allocate_region_elapsed(&mut self, elapsed: Duration) {
209        self.allocate_region_elapsed += elapsed;
210    }
211
212    /// Updates the elapsed time of finishing groups.
213    pub fn update_finish_groups_elapsed(&mut self, elapsed: Duration) {
214        self.finish_groups_elapsed += elapsed;
215    }
216
217    /// Updates the elapsed time of deallocating region.
218    pub fn update_deallocate_region_elapsed(&mut self, elapsed: Duration) {
219        self.deallocate_region_elapsed += elapsed;
220    }
221}
222
223impl Context {
224    pub fn new(
225        ddl_ctx: &DdlContext,
226        mailbox: MailboxRef,
227        server_addr: String,
228        persistent_ctx: PersistentContext,
229    ) -> Self {
230        Self {
231            persistent_ctx,
232            table_metadata_manager: ddl_ctx.table_metadata_manager.clone(),
233            memory_region_keeper: ddl_ctx.memory_region_keeper.clone(),
234            node_manager: ddl_ctx.node_manager.clone(),
235            leader_region_registry: ddl_ctx.leader_region_registry.clone(),
236            mailbox,
237            server_addr,
238            cache_invalidator: ddl_ctx.cache_invalidator.clone(),
239            region_routes_allocator: ddl_ctx.table_metadata_allocator.region_routes_allocator(),
240            wal_options_allocator: ddl_ctx.table_metadata_allocator.wal_options_allocator(),
241            start_time: Instant::now(),
242            volatile_ctx: VolatileContext::default(),
243        }
244    }
245
246    /// Returns the next operation's timeout.
247    pub fn next_operation_timeout(&self) -> Option<Duration> {
248        self.persistent_ctx
249            .timeout
250            .checked_sub(self.start_time.elapsed())
251    }
252
253    /// Updates the elapsed time of building plan.
254    pub fn update_build_plan_elapsed(&mut self, elapsed: Duration) {
255        self.volatile_ctx.metrics.update_build_plan_elapsed(elapsed);
256    }
257
258    /// Updates the elapsed time of allocating region.
259    pub fn update_allocate_region_elapsed(&mut self, elapsed: Duration) {
260        self.volatile_ctx
261            .metrics
262            .update_allocate_region_elapsed(elapsed);
263    }
264
265    /// Updates the elapsed time of finishing groups.
266    pub fn update_finish_groups_elapsed(&mut self, elapsed: Duration) {
267        self.volatile_ctx
268            .metrics
269            .update_finish_groups_elapsed(elapsed);
270    }
271
272    /// Updates the elapsed time of deallocating region.
273    pub fn update_deallocate_region_elapsed(&mut self, elapsed: Duration) {
274        self.volatile_ctx
275            .metrics
276            .update_deallocate_region_elapsed(elapsed);
277    }
278
279    /// Retrieves the table route value for the given table id.
280    ///
281    /// Retry:
282    /// - Failed to retrieve the metadata of table.
283    ///
284    /// Abort:
285    /// - Table route not found.
286    pub async fn get_table_route_value(
287        &self,
288    ) -> Result<DeserializedValueWithBytes<TableRouteValue>> {
289        let table_id = self.persistent_ctx.table_id;
290        let table_route_value = self
291            .table_metadata_manager
292            .table_route_manager()
293            .table_route_storage()
294            .get_with_raw_bytes(table_id)
295            .await
296            .map_err(BoxedError::new)
297            .with_context(|_| error::RetryLaterWithSourceSnafu {
298                reason: format!("Failed to get table route for table: {}", table_id),
299            })?
300            .context(error::TableRouteNotFoundSnafu { table_id })?;
301
302        Ok(table_route_value)
303    }
304
305    /// Retrieves the table info value for the given table id.
306    ///
307    /// Retry:
308    /// - Failed to retrieve the metadata of table.
309    ///
310    /// Abort:
311    /// - Table info not found.
312    pub async fn get_table_info_value(&self) -> Result<TableInfoValue> {
313        let table_id = self.persistent_ctx.table_id;
314        let table_info_value = self
315            .table_metadata_manager
316            .table_info_manager()
317            .get(table_id)
318            .await
319            .map_err(BoxedError::new)
320            .with_context(|_| error::RetryLaterWithSourceSnafu {
321                reason: format!("Failed to get table info for table: {}", table_id),
322            })?
323            .context(error::TableInfoNotFoundSnafu { table_id })?
324            .into_inner();
325        Ok(table_info_value)
326    }
327
328    /// Updates the table route.
329    ///
330    /// Retry:
331    /// - Failed to retrieve the metadata of datanode table.
332    ///
333    /// Abort:
334    /// - Table route not found.
335    /// - Failed to update the table route.
336    pub async fn update_table_route(
337        &self,
338        current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
339        new_region_routes: Vec<RegionRoute>,
340        new_region_wal_options: HashMap<RegionNumber, String>,
341    ) -> Result<()> {
342        let table_id = self.persistent_ctx.table_id;
343        if new_region_routes.is_empty() {
344            return error::UnexpectedSnafu {
345                violated: format!("new_region_routes is empty for table: {}", table_id),
346            }
347            .fail();
348        }
349        let datanode_id = new_region_routes
350            .first()
351            .unwrap()
352            .leader_peer
353            .as_ref()
354            .context(error::NoLeaderSnafu)?
355            .id;
356        let datanode_table_value =
357            get_datanode_table_value(&self.table_metadata_manager, table_id, datanode_id).await?;
358
359        let RegionInfo {
360            region_options,
361            region_wal_options,
362            ..
363        } = &datanode_table_value.region_info;
364
365        // Merge and validate the new region wal options.
366        let validated_region_wal_options =
367            crate::procedure::repartition::utils::merge_and_validate_region_wal_options(
368                region_wal_options,
369                new_region_wal_options,
370                &new_region_routes,
371                table_id,
372            )?;
373        info!(
374            "Updating table route for table: {}, new region routes: {:?}",
375            table_id, new_region_routes
376        );
377        self.table_metadata_manager
378            .update_table_route(
379                table_id,
380                datanode_table_value.region_info.clone(),
381                current_table_route_value,
382                new_region_routes,
383                region_options,
384                &validated_region_wal_options,
385            )
386            .await
387            .context(error::TableMetadataManagerSnafu)
388    }
389
390    /// Broadcasts the invalidate table cache message.
391    pub async fn invalidate_table_cache(&self) -> Result<()> {
392        let table_id = self.persistent_ctx.table_id;
393        let subject = format!(
394            "Invalidate table cache for repartition table, table: {}",
395            table_id,
396        );
397        let ctx = common_meta::cache_invalidator::Context {
398            subject: Some(subject),
399        };
400        let _ = self
401            .cache_invalidator
402            .invalidate(&ctx, &[CacheIdent::TableId(table_id)])
403            .await;
404        Ok(())
405    }
406
407    pub fn register_operating_regions(
408        memory_region_keeper: &MemoryRegionKeeperRef,
409        region_routes: &[RegionRoute],
410    ) -> Result<Vec<OperatingRegionGuard>> {
411        let mut operating_guards = Vec::with_capacity(region_routes.len());
412        for (region_id, datanode_id) in operating_leader_regions(region_routes) {
413            let guard = memory_region_keeper
414                .register(datanode_id, region_id)
415                .context(error::RegionOperatingRaceSnafu {
416                    peer_id: datanode_id,
417                    region_id,
418                })?;
419            operating_guards.push(guard);
420        }
421        Ok(operating_guards)
422    }
423}
424
425#[async_trait::async_trait]
426#[typetag::serde(tag = "repartition_state")]
427pub(crate) trait State: Sync + Send + Debug {
428    fn name(&self) -> &'static str {
429        let type_name = std::any::type_name::<Self>();
430        // short name
431        type_name.split("::").last().unwrap_or(type_name)
432    }
433
434    /// Yields the next [State] and [Status].
435    async fn next(
436        &mut self,
437        ctx: &mut Context,
438        procedure_ctx: &ProcedureContext,
439    ) -> Result<(Box<dyn State>, Status)>;
440
441    fn as_any(&self) -> &dyn Any;
442}
443
444pub struct RepartitionProcedure {
445    state: Box<dyn State>,
446    context: Context,
447}
448
449#[derive(Debug, Serialize)]
450struct RepartitionData<'a> {
451    state: &'a dyn State,
452    persistent_ctx: &'a PersistentContext,
453}
454
455#[derive(Debug, Deserialize)]
456struct RepartitionDataOwned {
457    state: Box<dyn State>,
458    persistent_ctx: PersistentContext,
459}
460
461impl RepartitionProcedure {
462    const TYPE_NAME: &'static str = "metasrv-procedure::Repartition";
463
464    pub fn new(
465        from_exprs: Vec<PartitionExpr>,
466        to_exprs: Vec<PartitionExpr>,
467        context: Context,
468    ) -> Self {
469        let state = Box::new(RepartitionStart::new(from_exprs, to_exprs));
470
471        Self { state, context }
472    }
473
474    pub fn from_json<F>(json: &str, ctx_factory: F) -> ProcedureResult<Self>
475    where
476        F: FnOnce(PersistentContext) -> Context,
477    {
478        let RepartitionDataOwned {
479            state,
480            persistent_ctx,
481        } = serde_json::from_str(json).context(FromJsonSnafu)?;
482        let context = ctx_factory(persistent_ctx);
483
484        Ok(Self { state, context })
485    }
486
487    /// Returns whether parent rollback should remove this repartition's allocated regions.
488    ///
489    /// This uses an "after AllocateRegion" semantic: once execution reaches
490    /// `AllocateRegion` or any later state, rollback must try to remove this round's
491    /// `allocated_region_ids` from table-route metadata when they exist.
492    ///
493    /// State flow:
494    /// `RepartitionStart -> AllocateRegion -> Dispatch -> Collect -> DeallocateRegion -> RepartitionEnd`
495    ///                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
496    ///                     rollback allocated regions in metadata
497    ///
498    /// Notes:
499    /// - `RepartitionStart`: no-op, because allocation has not happened yet.
500    /// - `AllocateRegion` / `Dispatch` / `Collect`  rollback-active.
501    /// - `DeallocateRegion`: is not rollback-active.
502    /// - `RepartitionEnd`: no-op.
503    fn should_rollback_allocated_regions(&self) -> bool {
504        self.state.as_any().is::<allocate_region::AllocateRegion>()
505            || self.state.as_any().is::<dispatch::Dispatch>()
506            || self.state.as_any().is::<collect::Collect>()
507    }
508
509    fn rollback_allocated_region_ids(&self) -> HashSet<store_api::storage::RegionId> {
510        if self.state.as_any().is::<allocate_region::AllocateRegion>()
511            || self.state.as_any().is::<dispatch::Dispatch>()
512        {
513            return self
514                .context
515                .persistent_ctx
516                .plans
517                .iter()
518                .flat_map(|plan| plan.allocated_region_ids.iter().copied())
519                .collect();
520        }
521
522        self.context
523            .persistent_ctx
524            .failed_procedures
525            .iter()
526            .chain(self.context.persistent_ctx.unknown_procedures.iter())
527            .flat_map(|procedure_meta| {
528                let plan_index = procedure_meta.plan_index;
529                self.context.persistent_ctx.plans[plan_index]
530                    .allocated_region_ids
531                    .iter()
532                    .copied()
533            })
534            .collect()
535    }
536
537    fn filter_allocated_region_routes(
538        region_routes: &[RegionRoute],
539        allocated_region_ids: &HashSet<store_api::storage::RegionId>,
540    ) -> Vec<RegionRoute> {
541        region_routes
542            .iter()
543            .filter(|route| !allocated_region_ids.contains(&route.region.id))
544            .cloned()
545            .collect()
546    }
547
548    async fn rollback_inner(&mut self, procedure_ctx: &ProcedureContext) -> Result<()> {
549        if !self.should_rollback_allocated_regions() {
550            return Ok(());
551        }
552
553        let table_id = self.context.persistent_ctx.table_id;
554        let allocated_region_ids = self.rollback_allocated_region_ids();
555        if allocated_region_ids.is_empty() {
556            return Ok(());
557        }
558
559        let table_lock = TableLock::Write(table_id).into();
560        let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
561        let table_route_value = self.context.get_table_route_value().await?;
562        let current_region_routes = table_route_value.region_routes().unwrap();
563        let allocated_region_routes = DeallocateRegion::filter_deallocatable_region_routes(
564            table_id,
565            current_region_routes,
566            &allocated_region_ids,
567        );
568        if !allocated_region_routes.is_empty() {
569            let table = TableName {
570                catalog_name: self.context.persistent_ctx.catalog_name.clone(),
571                schema_name: self.context.persistent_ctx.schema_name.clone(),
572                table_name: self.context.persistent_ctx.table_name.clone(),
573            };
574            // Memory guards are not required here,
575            // because the table metadata still contains routes for the deallocating regions.
576            if let Err(err) = DeallocateRegion::deallocate_regions(
577                &self.context.node_manager,
578                &self.context.leader_region_registry,
579                table,
580                table_id,
581                &allocated_region_routes,
582            )
583            .await
584            {
585                warn!(err; "Failed to drop allocated regions during repartition rollback, table_id: {}, regions: {:?}", table_id, allocated_region_ids);
586            }
587        }
588
589        let new_region_routes =
590            Self::filter_allocated_region_routes(current_region_routes, &allocated_region_ids);
591
592        if new_region_routes.len() != current_region_routes.len() {
593            self.context
594                .update_table_route(&table_route_value, new_region_routes, HashMap::new())
595                .await
596                .map_err(BoxedError::new)
597                .with_context(|_| error::RetryLaterWithSourceSnafu {
598                    reason: format!(
599                        "Failed to rollback allocated region routes for repartition table: {}",
600                        table_id
601                    ),
602                })?;
603        }
604
605        if let Err(err) = self.context.invalidate_table_cache().await {
606            warn!(err; "Failed to invalidate table cache during repartition rollback, table_id: {}", table_id);
607        }
608
609        Ok(())
610    }
611}
612
613#[async_trait::async_trait]
614impl Procedure for RepartitionProcedure {
615    fn type_name(&self) -> &str {
616        Self::TYPE_NAME
617    }
618
619    #[tracing::instrument(skip_all, fields(
620        state = %self.state.name(),
621        table_id = %self.context.persistent_ctx.table_id
622    ))]
623    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
624        let state = &mut self.state;
625        let state_name = state.name();
626        // Log state transition
627        common_telemetry::info!(
628            "Repartition procedure executing state: {}, table_id: {}",
629            state_name,
630            self.context.persistent_ctx.table_id
631        );
632        match state.next(&mut self.context, _ctx).await {
633            Ok((next, status)) => {
634                *state = next;
635                Ok(status)
636            }
637            Err(e) => {
638                if e.is_retryable() {
639                    Err(ProcedureError::retry_later(e))
640                } else {
641                    error!(
642                        e;
643                        "Repartition procedure failed, table id: {}",
644                        self.context.persistent_ctx.table_id,
645                    );
646                    Err(ProcedureError::external(e))
647                }
648            }
649        }
650    }
651
652    async fn rollback(&mut self, ctx: &ProcedureContext) -> ProcedureResult<()> {
653        self.rollback_inner(ctx)
654            .await
655            .map_err(ProcedureError::external)
656    }
657
658    fn rollback_supported(&self) -> bool {
659        true
660    }
661
662    fn dump(&self) -> ProcedureResult<String> {
663        let data = RepartitionData {
664            state: self.state.as_ref(),
665            persistent_ctx: &self.context.persistent_ctx,
666        };
667        serde_json::to_string(&data).context(ToJsonSnafu)
668    }
669
670    fn lock_key(&self) -> LockKey {
671        LockKey::new(self.context.persistent_ctx.lock_key())
672    }
673
674    fn user_metadata(&self) -> Option<UserMetadata> {
675        // TODO(weny): support user metadata.
676        None
677    }
678}
679
680pub struct DefaultRepartitionProcedureFactory {
681    mailbox: MailboxRef,
682    server_addr: String,
683}
684
685impl DefaultRepartitionProcedureFactory {
686    pub fn new(mailbox: MailboxRef, server_addr: String) -> Self {
687        Self {
688            mailbox,
689            server_addr,
690        }
691    }
692}
693
694impl RepartitionProcedureFactory for DefaultRepartitionProcedureFactory {
695    fn create(
696        &self,
697        ddl_ctx: &DdlContext,
698        table_name: TableName,
699        table_id: TableId,
700        from_exprs: Vec<String>,
701        to_exprs: Vec<String>,
702        timeout: Option<Duration>,
703    ) -> std::result::Result<BoxedProcedure, BoxedError> {
704        let persistent_ctx = PersistentContext::new(table_name, table_id, timeout);
705        let from_exprs = from_exprs
706            .iter()
707            .map(|e| {
708                PartitionExpr::from_json_str(e)
709                    .context(error::DeserializePartitionExprSnafu)?
710                    .context(error::EmptyPartitionExprSnafu)
711            })
712            .collect::<Result<Vec<_>>>()
713            .map_err(BoxedError::new)?;
714        let to_exprs = to_exprs
715            .iter()
716            .map(|e| {
717                PartitionExpr::from_json_str(e)
718                    .context(error::DeserializePartitionExprSnafu)?
719                    .context(error::EmptyPartitionExprSnafu)
720            })
721            .collect::<Result<Vec<_>>>()
722            .map_err(BoxedError::new)?;
723
724        let procedure = RepartitionProcedure::new(
725            from_exprs,
726            to_exprs,
727            Context::new(
728                ddl_ctx,
729                self.mailbox.clone(),
730                self.server_addr.clone(),
731                persistent_ctx,
732            ),
733        );
734
735        Ok(Box::new(procedure))
736    }
737
738    fn register_loaders(
739        &self,
740        ddl_ctx: &DdlContext,
741        procedure_manager: &ProcedureManagerRef,
742    ) -> std::result::Result<(), BoxedError> {
743        // Registers the repartition procedure loader.
744        let mailbox = self.mailbox.clone();
745        let server_addr = self.server_addr.clone();
746        let moved_ddl_ctx = ddl_ctx.clone();
747        procedure_manager
748            .register_loader(
749                RepartitionProcedure::TYPE_NAME,
750                Box::new(move |json| {
751                    let mailbox = mailbox.clone();
752                    let server_addr = server_addr.clone();
753                    let ddl_ctx = moved_ddl_ctx.clone();
754                    let factory = move |persistent_ctx| {
755                        Context::new(&ddl_ctx, mailbox, server_addr, persistent_ctx)
756                    };
757                    RepartitionProcedure::from_json(json, factory).map(|p| Box::new(p) as _)
758                }),
759            )
760            .map_err(BoxedError::new)?;
761
762        // Registers the repartition group procedure loader.
763        let mailbox = self.mailbox.clone();
764        let server_addr = self.server_addr.clone();
765        let moved_ddl_ctx = ddl_ctx.clone();
766        procedure_manager
767            .register_loader(
768                RepartitionGroupProcedure::TYPE_NAME,
769                Box::new(move |json| {
770                    let mailbox = mailbox.clone();
771                    let server_addr = server_addr.clone();
772                    let ddl_ctx = moved_ddl_ctx.clone();
773                    let factory = move |persistent_ctx| {
774                        RepartitionGroupContext::new(&ddl_ctx, mailbox, server_addr, persistent_ctx)
775                    };
776                    RepartitionGroupProcedure::from_json(json, factory).map(|p| Box::new(p) as _)
777                }),
778            )
779            .map_err(BoxedError::new)?;
780
781        Ok(())
782    }
783}
784
785#[cfg(test)]
786mod tests {
787    use std::collections::HashMap;
788    use std::sync::Arc;
789    use std::sync::atomic::{AtomicBool, Ordering};
790
791    use common_error::ext::BoxedError;
792    use common_error::mock::MockError;
793    use common_error::status_code::StatusCode;
794    use common_meta::ddl::test_util::datanode_handler::{
795        DatanodeWatcher, NaiveDatanodeHandler, UnexpectedErrorDatanodeHandler,
796    };
797    use common_meta::error;
798    use common_meta::peer::Peer;
799    use common_meta::rpc::router::{Region, RegionRoute};
800    use common_meta::test_util::MockDatanodeManager;
801    use common_procedure::{Error as ProcedureError, Procedure, ProcedureId, ProcedureState};
802    use store_api::storage::RegionId;
803    use table::table_name::TableName;
804    use tokio::sync::mpsc;
805    use uuid::Uuid;
806
807    use super::*;
808    use crate::procedure::repartition::allocate_region::AllocateRegion;
809    use crate::procedure::repartition::collect::Collect;
810    use crate::procedure::repartition::deallocate_region::DeallocateRegion;
811    use crate::procedure::repartition::dispatch::Dispatch;
812    use crate::procedure::repartition::plan::RegionDescriptor;
813    use crate::procedure::repartition::repartition_end::RepartitionEnd;
814    use crate::procedure::repartition::test_util::{
815        TestingEnv, assert_parent_state, current_parent_region_routes, extract_subprocedure_ids,
816        new_parent_context, procedure_context_with_receivers, procedure_state_receiver, range_expr,
817        test_region_route, test_region_wal_options,
818    };
819
820    fn test_plan(table_id: TableId) -> RepartitionPlanEntry {
821        RepartitionPlanEntry {
822            group_id: uuid::Uuid::new_v4(),
823            source_regions: vec![RegionDescriptor {
824                region_id: RegionId::new(table_id, 1),
825                partition_expr: range_expr("x", 0, 100),
826            }],
827            target_regions: vec![
828                RegionDescriptor {
829                    region_id: RegionId::new(table_id, 1),
830                    partition_expr: range_expr("x", 0, 50),
831                },
832                RegionDescriptor {
833                    region_id: RegionId::new(table_id, 3),
834                    partition_expr: range_expr("x", 50, 100),
835                },
836            ],
837            allocated_region_ids: vec![RegionId::new(table_id, 3)],
838            pending_deallocate_region_ids: vec![],
839            transition_map: vec![vec![0, 1]],
840        }
841    }
842
843    fn test_procedure(state: Box<dyn State>, context: Context) -> RepartitionProcedure {
844        RepartitionProcedure { state, context }
845    }
846
847    fn test_context(env: &TestingEnv, table_id: TableId) -> Context {
848        let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
849        let ddl_ctx = env.ddl_context(node_manager);
850        let persistent_ctx = PersistentContext::new(
851            TableName::new("test_catalog", "test_schema", "test_table"),
852            table_id,
853            None,
854        );
855
856        Context::new(
857            &ddl_ctx,
858            env.mailbox_ctx.mailbox().clone(),
859            env.server_addr.clone(),
860            persistent_ctx,
861        )
862    }
863
864    #[test]
865    fn test_filter_allocated_region_routes() {
866        let table_id = 1024;
867        let region_routes = vec![
868            test_region_route(RegionId::new(table_id, 1), "a"),
869            test_region_route(RegionId::new(table_id, 2), "b"),
870        ];
871        let allocated_region_ids = HashSet::from([RegionId::new(table_id, 2)]);
872
873        let new_region_routes = RepartitionProcedure::filter_allocated_region_routes(
874            &region_routes,
875            &allocated_region_ids,
876        );
877
878        assert_eq!(new_region_routes.len(), 1);
879        assert_eq!(new_region_routes[0].region.id, RegionId::new(table_id, 1));
880    }
881
882    #[test]
883    fn test_should_rollback_allocated_regions() {
884        let env = TestingEnv::new();
885        let table_id = 1024;
886
887        let procedure = test_procedure(
888            Box::new(RepartitionStart::new(vec![], vec![])),
889            test_context(&env, table_id),
890        );
891        assert!(!procedure.should_rollback_allocated_regions());
892
893        let procedure = test_procedure(
894            Box::new(AllocateRegion::new(vec![])),
895            test_context(&env, table_id),
896        );
897        assert!(procedure.should_rollback_allocated_regions());
898
899        let procedure = test_procedure(Box::new(Dispatch), test_context(&env, table_id));
900        assert!(procedure.should_rollback_allocated_regions());
901
902        let procedure =
903            test_procedure(Box::new(Collect::new(vec![])), test_context(&env, table_id));
904        assert!(procedure.should_rollback_allocated_regions());
905
906        let procedure = test_procedure(Box::new(DeallocateRegion), test_context(&env, table_id));
907        assert!(!procedure.should_rollback_allocated_regions());
908
909        let procedure = test_procedure(Box::new(RepartitionEnd), test_context(&env, table_id));
910        assert!(!procedure.should_rollback_allocated_regions());
911    }
912
913    #[tokio::test]
914    async fn test_repartition_rollback_removes_allocated_routes_from_dispatch() {
915        let env = TestingEnv::new();
916        let table_id = 1024;
917        let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
918        let ddl_ctx = env.ddl_context(node_manager);
919        let original_region_routes = vec![
920            test_region_route(
921                RegionId::new(table_id, 1),
922                &range_expr("x", 0, 100).as_json_str().unwrap(),
923            ),
924            test_region_route(
925                RegionId::new(table_id, 2),
926                &range_expr("x", 50, 100).as_json_str().unwrap(),
927            ),
928            test_region_route(RegionId::new(table_id, 3), ""),
929        ];
930        env.create_physical_table_metadata_with_wal_options(
931            table_id,
932            original_region_routes,
933            test_region_wal_options(&[1, 2]),
934        )
935        .await;
936
937        let mut persistent_ctx = PersistentContext::new(
938            TableName::new("test_catalog", "test_schema", "test_table"),
939            table_id,
940            None,
941        );
942        persistent_ctx.plans = vec![test_plan(table_id)];
943        persistent_ctx.failed_procedures = vec![ProcedureMeta {
944            plan_index: 0,
945            group_id: Uuid::new_v4(),
946            procedure_id: ProcedureId::random(),
947        }];
948        let context = Context::new(
949            &ddl_ctx,
950            env.mailbox_ctx.mailbox().clone(),
951            env.server_addr.clone(),
952            persistent_ctx,
953        );
954        let mut procedure = RepartitionProcedure {
955            state: Box::new(Dispatch),
956            context,
957        };
958
959        procedure
960            .rollback(&TestingEnv::procedure_context())
961            .await
962            .unwrap();
963
964        let region_routes = current_parent_region_routes(&procedure.context).await;
965        assert_eq!(region_routes.len(), 2);
966        assert_eq!(region_routes[0].region.id, RegionId::new(table_id, 1));
967        assert_eq!(region_routes[1].region.id, RegionId::new(table_id, 2));
968    }
969
970    #[tokio::test]
971    async fn test_repartition_rollback_removes_allocated_routes_from_allocate() {
972        let env = TestingEnv::new();
973        let table_id = 1024;
974        let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
975        let ddl_ctx = env.ddl_context(node_manager);
976        let original_region_routes = vec![
977            test_region_route(
978                RegionId::new(table_id, 1),
979                &range_expr("x", 0, 100).as_json_str().unwrap(),
980            ),
981            test_region_route(
982                RegionId::new(table_id, 2),
983                &range_expr("x", 50, 100).as_json_str().unwrap(),
984            ),
985            test_region_route(RegionId::new(table_id, 3), ""),
986        ];
987        env.create_physical_table_metadata_with_wal_options(
988            table_id,
989            original_region_routes,
990            test_region_wal_options(&[1, 2]),
991        )
992        .await;
993
994        let mut persistent_ctx = PersistentContext::new(
995            TableName::new("test_catalog", "test_schema", "test_table"),
996            table_id,
997            None,
998        );
999        persistent_ctx.plans = vec![test_plan(table_id)];
1000        let context = Context::new(
1001            &ddl_ctx,
1002            env.mailbox_ctx.mailbox().clone(),
1003            env.server_addr.clone(),
1004            persistent_ctx,
1005        );
1006        let mut procedure = RepartitionProcedure {
1007            state: Box::new(AllocateRegion::new(vec![])),
1008            context,
1009        };
1010
1011        procedure
1012            .rollback(&TestingEnv::procedure_context())
1013            .await
1014            .unwrap();
1015
1016        let region_routes = current_parent_region_routes(&procedure.context).await;
1017        assert_eq!(region_routes.len(), 2);
1018        assert_eq!(region_routes[0].region.id, RegionId::new(table_id, 1));
1019        assert_eq!(region_routes[1].region.id, RegionId::new(table_id, 2));
1020    }
1021
1022    #[tokio::test]
1023    async fn test_repartition_rollback_from_collect_only_removes_failed_allocated_routes() {
1024        let env = TestingEnv::new();
1025        let table_id = 1024;
1026        let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
1027        let ddl_ctx = env.ddl_context(node_manager);
1028        let original_region_routes = vec![
1029            test_region_route(
1030                RegionId::new(table_id, 1),
1031                &range_expr("x", 0, 100).as_json_str().unwrap(),
1032            ),
1033            test_region_route(
1034                RegionId::new(table_id, 2),
1035                &range_expr("x", 100, 200).as_json_str().unwrap(),
1036            ),
1037            test_region_route(RegionId::new(table_id, 3), ""),
1038            test_region_route(RegionId::new(table_id, 4), ""),
1039        ];
1040        env.create_physical_table_metadata_with_wal_options(
1041            table_id,
1042            original_region_routes,
1043            test_region_wal_options(&[1, 2, 3, 4]),
1044        )
1045        .await;
1046
1047        let mut persistent_ctx = PersistentContext::new(
1048            TableName::new("test_catalog", "test_schema", "test_table"),
1049            table_id,
1050            None,
1051        );
1052        let failed_plan = test_plan(table_id);
1053        let succeeded_plan = RepartitionPlanEntry {
1054            group_id: Uuid::new_v4(),
1055            source_regions: vec![RegionDescriptor {
1056                region_id: RegionId::new(table_id, 2),
1057                partition_expr: range_expr("x", 100, 200),
1058            }],
1059            target_regions: vec![
1060                RegionDescriptor {
1061                    region_id: RegionId::new(table_id, 2),
1062                    partition_expr: range_expr("x", 100, 150),
1063                },
1064                RegionDescriptor {
1065                    region_id: RegionId::new(table_id, 4),
1066                    partition_expr: range_expr("x", 150, 200),
1067                },
1068            ],
1069            allocated_region_ids: vec![RegionId::new(table_id, 4)],
1070            pending_deallocate_region_ids: vec![],
1071            transition_map: vec![vec![0]],
1072        };
1073        persistent_ctx.plans = vec![failed_plan, succeeded_plan];
1074        persistent_ctx.failed_procedures = vec![ProcedureMeta {
1075            plan_index: 0,
1076            group_id: persistent_ctx.plans[0].group_id,
1077            procedure_id: ProcedureId::random(),
1078        }];
1079
1080        let context = Context::new(
1081            &ddl_ctx,
1082            env.mailbox_ctx.mailbox().clone(),
1083            env.server_addr.clone(),
1084            persistent_ctx,
1085        );
1086        let mut procedure = RepartitionProcedure {
1087            state: Box::new(Collect::new(vec![])),
1088            context,
1089        };
1090
1091        procedure
1092            .rollback(&TestingEnv::procedure_context())
1093            .await
1094            .unwrap();
1095
1096        let region_routes = current_parent_region_routes(&procedure.context).await;
1097        assert_eq!(region_routes.len(), 3);
1098        assert_eq!(region_routes[0].region.id, RegionId::new(table_id, 1));
1099        assert_eq!(region_routes[1].region.id, RegionId::new(table_id, 2));
1100        assert_eq!(region_routes[2].region.id, RegionId::new(table_id, 4));
1101    }
1102
1103    #[tokio::test]
1104    async fn test_repartition_rollback_is_idempotent() {
1105        let env = TestingEnv::new();
1106        let table_id = 1024;
1107        let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
1108        let ddl_ctx = env.ddl_context(node_manager);
1109        let original_region_routes = vec![
1110            test_region_route(
1111                RegionId::new(table_id, 1),
1112                &range_expr("x", 0, 100).as_json_str().unwrap(),
1113            ),
1114            test_region_route(
1115                RegionId::new(table_id, 2),
1116                &range_expr("x", 50, 100).as_json_str().unwrap(),
1117            ),
1118            test_region_route(RegionId::new(table_id, 3), ""),
1119        ];
1120        env.create_physical_table_metadata_with_wal_options(
1121            table_id,
1122            original_region_routes,
1123            test_region_wal_options(&[1, 2]),
1124        )
1125        .await;
1126
1127        let mut persistent_ctx = PersistentContext::new(
1128            TableName::new("test_catalog", "test_schema", "test_table"),
1129            table_id,
1130            None,
1131        );
1132        persistent_ctx.plans = vec![test_plan(table_id)];
1133        persistent_ctx.failed_procedures = vec![ProcedureMeta {
1134            plan_index: 0,
1135            group_id: Uuid::new_v4(),
1136            procedure_id: ProcedureId::random(),
1137        }];
1138        let context = Context::new(
1139            &ddl_ctx,
1140            env.mailbox_ctx.mailbox().clone(),
1141            env.server_addr.clone(),
1142            persistent_ctx,
1143        );
1144        let mut procedure = RepartitionProcedure {
1145            state: Box::new(Dispatch),
1146            context,
1147        };
1148
1149        procedure
1150            .rollback(&TestingEnv::procedure_context())
1151            .await
1152            .unwrap();
1153        let once = current_parent_region_routes(&procedure.context).await;
1154
1155        procedure
1156            .rollback(&TestingEnv::procedure_context())
1157            .await
1158            .unwrap();
1159        let twice = current_parent_region_routes(&procedure.context).await;
1160
1161        assert_eq!(once, twice);
1162        assert_eq!(once.len(), 2);
1163        assert_eq!(once[0].region.id, RegionId::new(table_id, 1));
1164        assert_eq!(once[1].region.id, RegionId::new(table_id, 2));
1165    }
1166
1167    #[tokio::test]
1168    async fn test_repartition_procedure_flow_split_failed_and_full_rollback() {
1169        let env = TestingEnv::new();
1170        let table_id = 1024;
1171        let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
1172
1173        env.create_physical_table_metadata_for_repartition(
1174            table_id,
1175            vec![
1176                test_region_route(
1177                    RegionId::new(table_id, 1),
1178                    &range_expr("x", 0, 100).as_json_str().unwrap(),
1179                ),
1180                test_region_route(
1181                    RegionId::new(table_id, 2),
1182                    &range_expr("x", 100, 200).as_json_str().unwrap(),
1183                ),
1184            ],
1185            test_region_wal_options(&[1, 2]),
1186        )
1187        .await;
1188
1189        let context = new_parent_context(&env, node_manager, table_id);
1190        let mut procedure = RepartitionProcedure::new(
1191            vec![range_expr("x", 0, 100)],
1192            vec![range_expr("x", 0, 50), range_expr("x", 50, 100)],
1193            context,
1194        );
1195
1196        let start_status = procedure
1197            .execute(&TestingEnv::procedure_context())
1198            .await
1199            .unwrap();
1200        assert!(!start_status.need_persist());
1201        let start_status = procedure
1202            .execute(&TestingEnv::procedure_context())
1203            .await
1204            .unwrap();
1205        assert!(start_status.need_persist());
1206        assert_parent_state::<AllocateRegion>(&procedure);
1207
1208        let allocate_status = procedure
1209            .execute(&TestingEnv::procedure_context())
1210            .await
1211            .unwrap();
1212        assert!(allocate_status.need_persist());
1213        assert_parent_state::<Dispatch>(&procedure);
1214        assert_eq!(procedure.context.persistent_ctx.plans.len(), 1);
1215        let plan = &procedure.context.persistent_ctx.plans[0];
1216        let expected_plan = test_plan(table_id);
1217        assert_eq!(plan.source_regions, expected_plan.source_regions);
1218        assert_eq!(plan.target_regions, expected_plan.target_regions);
1219        assert_eq!(
1220            plan.allocated_region_ids,
1221            expected_plan.allocated_region_ids
1222        );
1223        assert_eq!(
1224            plan.pending_deallocate_region_ids,
1225            expected_plan.pending_deallocate_region_ids
1226        );
1227        assert_eq!(plan.transition_map, expected_plan.transition_map);
1228        assert_eq!(
1229            current_parent_region_routes(&procedure.context).await,
1230            vec![
1231                test_region_route(
1232                    RegionId::new(table_id, 1),
1233                    &range_expr("x", 0, 100).as_json_str().unwrap(),
1234                ),
1235                test_region_route(
1236                    RegionId::new(table_id, 2),
1237                    &range_expr("x", 100, 200).as_json_str().unwrap(),
1238                ),
1239                RegionRoute {
1240                    region: Region {
1241                        id: RegionId::new(table_id, 3),
1242                        partition_expr: range_expr("x", 50, 100).as_json_str().unwrap(),
1243                        ..Default::default()
1244                    },
1245                    leader_peer: Some(Peer::empty(0)),
1246                    ..Default::default()
1247                },
1248            ]
1249        );
1250
1251        let dispatch_status = procedure
1252            .execute(&TestingEnv::procedure_context())
1253            .await
1254            .unwrap();
1255        assert!(!dispatch_status.need_persist());
1256        let subprocedure_ids = extract_subprocedure_ids(dispatch_status);
1257        assert_eq!(subprocedure_ids.len(), 1);
1258        assert_parent_state::<Collect>(&procedure);
1259
1260        let failed_state = ProcedureState::failed(Arc::new(ProcedureError::external(
1261            MockError::new(StatusCode::Internal),
1262        )));
1263        let collect_ctx = procedure_context_with_receivers(HashMap::from([(
1264            subprocedure_ids[0],
1265            procedure_state_receiver(failed_state),
1266        )]));
1267
1268        let err = procedure.execute(&collect_ctx).await.unwrap_err();
1269        assert!(!err.is_retry_later());
1270        assert_parent_state::<Collect>(&procedure);
1271
1272        procedure
1273            .rollback(&TestingEnv::procedure_context())
1274            .await
1275            .unwrap();
1276
1277        let region_routes = current_parent_region_routes(&procedure.context).await;
1278        assert_eq!(
1279            region_routes,
1280            vec![
1281                test_region_route(
1282                    RegionId::new(table_id, 1),
1283                    &range_expr("x", 0, 100).as_json_str().unwrap(),
1284                ),
1285                test_region_route(
1286                    RegionId::new(table_id, 2),
1287                    &range_expr("x", 100, 200).as_json_str().unwrap(),
1288                ),
1289            ]
1290        );
1291    }
1292
1293    #[tokio::test]
1294    async fn test_repartition_procedure_flow_split_allocate_retryable_then_resume() {
1295        common_telemetry::init_default_ut_logging();
1296        let env = TestingEnv::new();
1297        let table_id = 1024;
1298        let (tx, _rx) = mpsc::channel(8);
1299        let should_retry = Arc::new(AtomicBool::new(true));
1300        let datanode_handler = DatanodeWatcher::new(tx).with_handler(move |_, _| {
1301            if should_retry.swap(false, Ordering::SeqCst) {
1302                return Err(error::Error::RetryLater {
1303                    source: BoxedError::new(
1304                        error::UnexpectedSnafu {
1305                            err_msg: "retry later",
1306                        }
1307                        .build(),
1308                    ),
1309                    clean_poisons: false,
1310                });
1311            }
1312
1313            Ok(api::region::RegionResponse::new(0))
1314        });
1315        let node_manager = Arc::new(MockDatanodeManager::new(datanode_handler));
1316
1317        env.create_physical_table_metadata_for_repartition(
1318            table_id,
1319            vec![
1320                test_region_route(
1321                    RegionId::new(table_id, 1),
1322                    &range_expr("x", 0, 100).as_json_str().unwrap(),
1323                ),
1324                test_region_route(
1325                    RegionId::new(table_id, 2),
1326                    &range_expr("x", 100, 200).as_json_str().unwrap(),
1327                ),
1328            ],
1329            test_region_wal_options(&[1, 2]),
1330        )
1331        .await;
1332
1333        let context = new_parent_context(&env, node_manager, table_id);
1334        let mut procedure = RepartitionProcedure::new(
1335            vec![range_expr("x", 0, 100)],
1336            vec![range_expr("x", 0, 50), range_expr("x", 50, 100)],
1337            context,
1338        );
1339
1340        let start_status = procedure
1341            .execute(&TestingEnv::procedure_context())
1342            .await
1343            .unwrap();
1344        assert!(!start_status.need_persist());
1345        let start_status = procedure
1346            .execute(&TestingEnv::procedure_context())
1347            .await
1348            .unwrap();
1349        assert!(start_status.need_persist());
1350        assert_parent_state::<AllocateRegion>(&procedure);
1351
1352        let err = procedure
1353            .execute(&TestingEnv::procedure_context())
1354            .await
1355            .unwrap_err();
1356        assert!(err.is_retry_later());
1357        assert_parent_state::<AllocateRegion>(&procedure);
1358        assert!(!procedure.context.persistent_ctx.plans.is_empty());
1359        assert_eq!(
1360            current_parent_region_routes(&procedure.context).await,
1361            vec![
1362                test_region_route(
1363                    RegionId::new(table_id, 1),
1364                    &range_expr("x", 0, 100).as_json_str().unwrap(),
1365                ),
1366                test_region_route(
1367                    RegionId::new(table_id, 2),
1368                    &range_expr("x", 100, 200).as_json_str().unwrap(),
1369                ),
1370            ]
1371        );
1372
1373        let allocate_status = procedure
1374            .execute(&TestingEnv::procedure_context())
1375            .await
1376            .unwrap();
1377        assert!(allocate_status.need_persist());
1378        assert_parent_state::<Dispatch>(&procedure);
1379
1380        assert_eq!(procedure.context.persistent_ctx.plans.len(), 1);
1381        let plan = &procedure.context.persistent_ctx.plans[0];
1382        let expected_plan = test_plan(table_id);
1383        assert_eq!(plan.source_regions, expected_plan.source_regions);
1384        assert_eq!(plan.target_regions, expected_plan.target_regions);
1385        assert_eq!(
1386            plan.allocated_region_ids,
1387            expected_plan.allocated_region_ids
1388        );
1389        assert_eq!(plan.transition_map, expected_plan.transition_map);
1390        assert_eq!(
1391            current_parent_region_routes(&procedure.context).await,
1392            vec![
1393                test_region_route(
1394                    RegionId::new(table_id, 1),
1395                    &range_expr("x", 0, 100).as_json_str().unwrap(),
1396                ),
1397                test_region_route(
1398                    RegionId::new(table_id, 2),
1399                    &range_expr("x", 100, 200).as_json_str().unwrap(),
1400                ),
1401                RegionRoute {
1402                    region: Region {
1403                        id: RegionId::new(table_id, 3),
1404                        partition_expr: range_expr("x", 50, 100).as_json_str().unwrap(),
1405                        ..Default::default()
1406                    },
1407                    leader_peer: Some(Peer::empty(0)),
1408                    ..Default::default()
1409                },
1410            ]
1411        );
1412
1413        let dispatch_status = procedure
1414            .execute(&TestingEnv::procedure_context())
1415            .await
1416            .unwrap();
1417        assert!(!dispatch_status.need_persist());
1418        let subprocedure_ids = extract_subprocedure_ids(dispatch_status);
1419        assert_eq!(subprocedure_ids.len(), 1);
1420        assert_parent_state::<Collect>(&procedure);
1421    }
1422}