1pub mod allocate_region;
16pub mod collect;
17pub mod deallocate_region;
18pub mod dispatch;
19pub mod group;
20pub mod plan;
21pub mod repartition_end;
22pub mod repartition_start;
23pub mod update_partition_metadata;
24pub mod utils;
25
26use std::any::Any;
27use std::collections::{HashMap, HashSet};
28use std::fmt::{Debug, Display};
29use std::time::{Duration, Instant};
30
31use common_error::ext::BoxedError;
32use common_meta::cache_invalidator::CacheInvalidatorRef;
33use common_meta::ddl::DdlContext;
34use common_meta::ddl::allocator::region_routes::RegionRoutesAllocatorRef;
35use common_meta::ddl::allocator::wal_options::WalOptionsAllocatorRef;
36use common_meta::ddl_manager::{RepartitionProcedureFactory, RepartitionSource};
37use common_meta::instruction::CacheIdent;
38use common_meta::key::datanode_table::RegionInfo;
39use common_meta::key::table_info::TableInfoValue;
40use common_meta::key::table_route::TableRouteValue;
41use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
42use common_meta::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
43use common_meta::node_manager::NodeManagerRef;
44use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard};
45use common_meta::region_registry::LeaderRegionRegistryRef;
46use common_meta::rpc::router::{RegionRoute, operating_leader_region_roles};
47use common_meta::wal_provider::RegionWalOptions;
48use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
49use common_procedure::{
50 BoxedProcedure, Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
51 ProcedureManagerRef, Result as ProcedureResult, Status, StringKey, UserMetadata,
52};
53use common_telemetry::{error, info, warn};
54use partition::expr::PartitionExpr;
55use serde::{Deserialize, Serialize};
56use snafu::{OptionExt, ResultExt};
57use store_api::storage::TableId;
58use table::table_name::TableName;
59
60use crate::error::{self, Result};
61use crate::procedure::repartition::collect::ProcedureMeta;
62use crate::procedure::repartition::deallocate_region::DeallocateRegion;
63use crate::procedure::repartition::group::{
64 Context as RepartitionGroupContext, RepartitionGroupProcedure, region_routes,
65};
66use crate::procedure::repartition::plan::RepartitionPlanEntry;
67use crate::procedure::repartition::repartition_start::{RepartitionFrom, RepartitionStart};
68use crate::procedure::repartition::update_partition_metadata::PartitionMetadataUpdate;
69use crate::procedure::repartition::utils::{
70 get_datanode_table_value, rollback_group_metadata_routes,
71};
72use crate::service::mailbox::MailboxRef;
73
74#[cfg(test)]
75pub mod test_util;
76
77#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
78pub struct PersistentContext {
79 pub catalog_name: String,
80 pub schema_name: String,
81 pub table_name: String,
82 pub table_id: TableId,
83 pub plans: Vec<RepartitionPlanEntry>,
84 #[serde(default)]
85 pub failed_procedures: Vec<ProcedureMeta>,
90 #[serde(default)]
91 pub unknown_procedures: Vec<ProcedureMeta>,
96 #[serde(with = "humantime_serde", default = "default_timeout")]
98 pub timeout: Duration,
99 #[serde(default)]
100 pub partition_metadata_update: Option<PartitionMetadataUpdate>,
102}
103
104fn default_timeout() -> Duration {
105 Duration::from_mins(2)
106}
107
108impl PersistentContext {
109 pub fn new(
113 TableName {
114 catalog_name,
115 schema_name,
116 table_name,
117 }: TableName,
118 table_id: TableId,
119 timeout: Option<Duration>,
120 ) -> Self {
121 Self {
122 catalog_name,
123 schema_name,
124 table_name,
125 table_id,
126 plans: vec![],
127 failed_procedures: vec![],
128 unknown_procedures: vec![],
129 timeout: timeout.unwrap_or_else(default_timeout),
130 partition_metadata_update: None,
131 }
132 }
133
134 pub fn lock_key(&self) -> Vec<StringKey> {
135 vec![
136 CatalogLock::Read(&self.catalog_name).into(),
137 SchemaLock::read(&self.catalog_name, &self.schema_name).into(),
138 TableLock::Write(self.table_id).into(),
139 TableNameLock::new(&self.catalog_name, &self.schema_name, &self.table_name).into(),
140 ]
141 }
142}
143
144#[derive(Clone)]
145pub struct Context {
146 pub persistent_ctx: PersistentContext,
147 pub volatile_ctx: VolatileContext,
148 pub table_metadata_manager: TableMetadataManagerRef,
149 pub memory_region_keeper: MemoryRegionKeeperRef,
150 pub node_manager: NodeManagerRef,
151 pub leader_region_registry: LeaderRegionRegistryRef,
152 pub mailbox: MailboxRef,
153 pub server_addr: String,
154 pub cache_invalidator: CacheInvalidatorRef,
155 pub region_routes_allocator: RegionRoutesAllocatorRef,
156 pub wal_options_allocator: WalOptionsAllocatorRef,
157 pub start_time: Instant,
158}
159
160#[derive(Debug, Clone, Default)]
161pub struct VolatileContext {
162 pub metrics: Metrics,
163 pub dispatch_start_time: Option<Instant>,
164}
165
166#[derive(Debug, Clone, Default)]
168pub struct Metrics {
169 build_plan_elapsed: Duration,
171 allocate_region_elapsed: Duration,
173 finish_groups_elapsed: Duration,
175 deallocate_region_elapsed: Duration,
177}
178
179impl Display for Metrics {
180 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
181 let total = self.build_plan_elapsed
182 + self.allocate_region_elapsed
183 + self.finish_groups_elapsed
184 + self.deallocate_region_elapsed;
185 write!(f, "total: {:?}", total)?;
186 let mut parts = Vec::with_capacity(4);
187 if self.build_plan_elapsed > Duration::ZERO {
188 parts.push(format!("build_plan_elapsed: {:?}", self.build_plan_elapsed));
189 }
190 if self.allocate_region_elapsed > Duration::ZERO {
191 parts.push(format!(
192 "allocate_region_elapsed: {:?}",
193 self.allocate_region_elapsed
194 ));
195 }
196 if self.finish_groups_elapsed > Duration::ZERO {
197 parts.push(format!(
198 "finish_groups_elapsed: {:?}",
199 self.finish_groups_elapsed
200 ));
201 }
202 if self.deallocate_region_elapsed > Duration::ZERO {
203 parts.push(format!(
204 "deallocate_region_elapsed: {:?}",
205 self.deallocate_region_elapsed
206 ));
207 }
208
209 if !parts.is_empty() {
210 write!(f, ", {}", parts.join(", "))?;
211 }
212 Ok(())
213 }
214}
215
216impl Metrics {
217 pub fn update_build_plan_elapsed(&mut self, elapsed: Duration) {
219 self.build_plan_elapsed += elapsed;
220 }
221
222 pub fn update_allocate_region_elapsed(&mut self, elapsed: Duration) {
224 self.allocate_region_elapsed += elapsed;
225 }
226
227 pub fn update_finish_groups_elapsed(&mut self, elapsed: Duration) {
229 self.finish_groups_elapsed += elapsed;
230 }
231
232 pub fn update_deallocate_region_elapsed(&mut self, elapsed: Duration) {
234 self.deallocate_region_elapsed += elapsed;
235 }
236}
237
238impl Context {
239 pub fn new(
240 ddl_ctx: &DdlContext,
241 mailbox: MailboxRef,
242 server_addr: String,
243 persistent_ctx: PersistentContext,
244 ) -> Self {
245 Self {
246 persistent_ctx,
247 table_metadata_manager: ddl_ctx.table_metadata_manager.clone(),
248 memory_region_keeper: ddl_ctx.memory_region_keeper.clone(),
249 node_manager: ddl_ctx.node_manager.clone(),
250 leader_region_registry: ddl_ctx.leader_region_registry.clone(),
251 mailbox,
252 server_addr,
253 cache_invalidator: ddl_ctx.cache_invalidator.clone(),
254 region_routes_allocator: ddl_ctx.table_metadata_allocator.region_routes_allocator(),
255 wal_options_allocator: ddl_ctx.table_metadata_allocator.wal_options_allocator(),
256 start_time: Instant::now(),
257 volatile_ctx: VolatileContext::default(),
258 }
259 }
260
261 pub fn next_operation_timeout(&self) -> Option<Duration> {
263 self.persistent_ctx
264 .timeout
265 .checked_sub(self.start_time.elapsed())
266 }
267
268 pub fn update_build_plan_elapsed(&mut self, elapsed: Duration) {
270 self.volatile_ctx.metrics.update_build_plan_elapsed(elapsed);
271 }
272
273 pub fn update_allocate_region_elapsed(&mut self, elapsed: Duration) {
275 self.volatile_ctx
276 .metrics
277 .update_allocate_region_elapsed(elapsed);
278 }
279
280 pub fn update_finish_groups_elapsed(&mut self, elapsed: Duration) {
282 self.volatile_ctx
283 .metrics
284 .update_finish_groups_elapsed(elapsed);
285 }
286
287 pub fn update_deallocate_region_elapsed(&mut self, elapsed: Duration) {
289 self.volatile_ctx
290 .metrics
291 .update_deallocate_region_elapsed(elapsed);
292 }
293
294 pub async fn get_table_route_value(
302 &self,
303 ) -> Result<DeserializedValueWithBytes<TableRouteValue>> {
304 let table_id = self.persistent_ctx.table_id;
305 let table_route_value = self
306 .table_metadata_manager
307 .table_route_manager()
308 .table_route_storage()
309 .get_with_raw_bytes(table_id)
310 .await
311 .map_err(BoxedError::new)
312 .with_context(|_| error::RetryLaterWithSourceSnafu {
313 reason: format!("Failed to get table route for table: {}", table_id),
314 })?
315 .context(error::TableRouteNotFoundSnafu { table_id })?;
316
317 Ok(table_route_value)
318 }
319
320 pub async fn get_raw_table_info_value(
328 &self,
329 ) -> Result<DeserializedValueWithBytes<TableInfoValue>> {
330 let table_id = self.persistent_ctx.table_id;
331 let table_info_value = self
332 .table_metadata_manager
333 .table_info_manager()
334 .get(table_id)
335 .await
336 .map_err(BoxedError::new)
337 .with_context(|_| error::RetryLaterWithSourceSnafu {
338 reason: format!("Failed to get table info for table: {}", table_id),
339 })?
340 .context(error::TableInfoNotFoundSnafu { table_id })?;
341
342 Ok(table_info_value)
343 }
344
345 pub async fn get_table_info_value(&self) -> Result<TableInfoValue> {
346 let table_info_value = self.get_raw_table_info_value().await?.into_inner();
347 Ok(table_info_value)
348 }
349
350 pub async fn update_table_info(
352 &self,
353 current_table_info_value: &DeserializedValueWithBytes<TableInfoValue>,
354 new_table_info_value: TableInfoValue,
355 ) -> Result<()> {
356 let table_id = self.persistent_ctx.table_id;
357 self.table_metadata_manager
358 .update_table_info(
359 current_table_info_value,
360 None,
361 new_table_info_value.table_info,
362 )
363 .await
364 .map_err(BoxedError::new)
365 .with_context(|_| error::RetryLaterWithSourceSnafu {
366 reason: format!("Failed to update table info for table: {}", table_id),
367 })
368 }
369
370 pub async fn update_table_route(
379 &self,
380 current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
381 new_region_routes: Vec<RegionRoute>,
382 new_region_wal_options: RegionWalOptions,
383 ) -> Result<()> {
384 let table_id = self.persistent_ctx.table_id;
385 if new_region_routes.is_empty() {
386 return error::UnexpectedSnafu {
387 violated: format!("new_region_routes is empty for table: {}", table_id),
388 }
389 .fail();
390 }
391 let datanode_id = new_region_routes
392 .first()
393 .unwrap()
394 .leader_peer
395 .as_ref()
396 .context(error::NoLeaderSnafu)?
397 .id;
398 let datanode_table_value =
399 get_datanode_table_value(&self.table_metadata_manager, table_id, datanode_id).await?;
400
401 let RegionInfo {
402 region_options,
403 region_wal_options,
404 ..
405 } = &datanode_table_value.region_info;
406
407 let validated_region_wal_options =
409 crate::procedure::repartition::utils::merge_and_validate_region_wal_options(
410 region_wal_options,
411 new_region_wal_options,
412 &new_region_routes,
413 table_id,
414 )?;
415 info!(
416 "Updating table route for table: {}, new region routes: {:?}",
417 table_id, new_region_routes
418 );
419 self.table_metadata_manager
420 .update_table_route(
421 table_id,
422 datanode_table_value.region_info.clone(),
423 current_table_route_value,
424 new_region_routes,
425 region_options,
426 &validated_region_wal_options,
427 )
428 .await
429 .context(error::TableMetadataManagerSnafu)
430 }
431
432 pub async fn invalidate_table_cache(&self) -> Result<()> {
434 let table_id = self.persistent_ctx.table_id;
435 let subject = format!(
436 "Invalidate table cache for repartition table, table: {}",
437 table_id,
438 );
439 let ctx = common_meta::cache_invalidator::Context {
440 subject: Some(subject),
441 };
442 let _ = self
443 .cache_invalidator
444 .invalidate(
445 &ctx,
446 &[
447 CacheIdent::TableId(table_id),
448 CacheIdent::TableName(TableName {
449 catalog_name: self.persistent_ctx.catalog_name.clone(),
450 schema_name: self.persistent_ctx.schema_name.clone(),
451 table_name: self.persistent_ctx.table_name.clone(),
452 }),
453 ],
454 )
455 .await;
456 Ok(())
457 }
458
459 pub fn register_operating_regions(
460 memory_region_keeper: &MemoryRegionKeeperRef,
461 region_routes: &[RegionRoute],
462 ) -> Result<Vec<OperatingRegionGuard>> {
463 let mut operating_guards = Vec::with_capacity(region_routes.len());
464 for (region_id, datanode_id, role) in operating_leader_region_roles(region_routes) {
465 let guard = memory_region_keeper
466 .register_with_role(datanode_id, region_id, role)
467 .context(error::RegionOperatingRaceSnafu {
468 peer_id: datanode_id,
469 region_id,
470 })?;
471 operating_guards.push(guard);
472 }
473 Ok(operating_guards)
474 }
475}
476
477#[async_trait::async_trait]
478#[typetag::serde(tag = "repartition_state")]
479pub(crate) trait State: Sync + Send + Debug {
480 fn name(&self) -> &'static str {
481 let type_name = std::any::type_name::<Self>();
482 type_name.split("::").last().unwrap_or(type_name)
484 }
485
486 async fn next(
488 &mut self,
489 ctx: &mut Context,
490 procedure_ctx: &ProcedureContext,
491 ) -> Result<(Box<dyn State>, Status)>;
492
493 fn as_any(&self) -> &dyn Any;
494}
495
496pub struct RepartitionProcedure {
497 state: Box<dyn State>,
498 context: Context,
499}
500
501#[derive(Debug, Serialize)]
502struct RepartitionData<'a> {
503 state: &'a dyn State,
504 persistent_ctx: &'a PersistentContext,
505}
506
507#[derive(Debug, Deserialize)]
508struct RepartitionDataOwned {
509 state: Box<dyn State>,
510 persistent_ctx: PersistentContext,
511}
512
513impl RepartitionProcedure {
514 const TYPE_NAME: &'static str = "metasrv-procedure::Repartition";
515
516 pub fn new(from: RepartitionFrom, to_exprs: Vec<PartitionExpr>, context: Context) -> Self {
517 let state = Box::new(RepartitionStart::new(from, to_exprs));
518
519 Self { state, context }
520 }
521
522 pub fn from_json<F>(json: &str, ctx_factory: F) -> ProcedureResult<Self>
523 where
524 F: FnOnce(PersistentContext) -> Context,
525 {
526 let RepartitionDataOwned {
527 state,
528 persistent_ctx,
529 } = serde_json::from_str(json).context(FromJsonSnafu)?;
530 let context = ctx_factory(persistent_ctx);
531
532 Ok(Self { state, context })
533 }
534
535 fn should_rollback(&self) -> bool {
549 self.state
550 .as_any()
551 .is::<update_partition_metadata::UpdatePartitionMetadata>()
552 || self.state.as_any().is::<allocate_region::AllocateRegion>()
553 || self.state.as_any().is::<dispatch::Dispatch>()
554 || self.state.as_any().is::<collect::Collect>()
555 }
556
557 fn rollback_plan_indices(&self) -> HashSet<usize> {
558 self.context
559 .persistent_ctx
560 .failed_procedures
561 .iter()
562 .chain(self.context.persistent_ctx.unknown_procedures.iter())
563 .map(|procedure_meta| procedure_meta.plan_index)
564 .collect()
565 }
566
567 fn rollback_allocated_region_ids(&self) -> HashSet<store_api::storage::RegionId> {
575 if self.state.as_any().is::<allocate_region::AllocateRegion>()
576 || self.state.as_any().is::<dispatch::Dispatch>()
577 {
578 return self
579 .context
580 .persistent_ctx
581 .plans
582 .iter()
583 .flat_map(|plan| plan.allocated_region_ids.iter().copied())
584 .collect();
585 }
586
587 self.rollback_plan_indices()
588 .into_iter()
589 .flat_map(|plan_index| {
590 self.context.persistent_ctx.plans[plan_index]
591 .allocated_region_ids
592 .iter()
593 .copied()
594 })
595 .collect()
596 }
597
598 async fn rollback_group_metadata_for_selected_plans(
602 &mut self,
603 region_routes: &mut [RegionRoute],
604 ) -> Result<()> {
605 let rollback_plan_indices = self.rollback_plan_indices();
606 if rollback_plan_indices.is_empty() {
607 return Ok(());
608 }
609
610 let mut region_routes_map = region_routes
611 .iter_mut()
612 .map(|route| (route.region.id, route))
613 .collect::<HashMap<_, _>>();
614 for plan_index in rollback_plan_indices {
615 let plan = &self.context.persistent_ctx.plans[plan_index];
616 rollback_group_metadata_routes(
617 plan.group_id,
618 &plan.source_regions,
619 &plan.original_target_routes,
620 &plan.allocated_region_ids,
621 &plan.pending_deallocate_region_ids,
622 &mut region_routes_map,
623 )?;
624 }
625
626 Ok(())
627 }
628
629 async fn rollback_partition_metadata(&mut self) -> Result<()> {
630 let Some(update) = self
631 .context
632 .persistent_ctx
633 .partition_metadata_update
634 .as_ref()
635 else {
636 return Ok(());
637 };
638 let table_info_value = self.context.get_raw_table_info_value().await?;
639 let current_partition_key_indices = &table_info_value.table_info.meta.partition_key_indices;
640 let Some(new_partition_key_indices) = update.rollback_partition_key_indices(
641 self.context.persistent_ctx.table_id,
642 current_partition_key_indices,
643 )?
644 else {
645 return Ok(());
646 };
647
648 let mut new_table_info = table_info_value.table_info.clone();
649 new_table_info.meta.partition_key_indices = new_partition_key_indices;
650 self.context
651 .update_table_info(&table_info_value, table_info_value.update(new_table_info))
652 .await?;
653
654 Ok(())
661 }
662
663 async fn rollback_inner(&mut self, procedure_ctx: &ProcedureContext) -> Result<()> {
664 if !self.should_rollback() {
665 return Ok(());
666 }
667
668 let table_id = self.context.persistent_ctx.table_id;
669 let allocated_region_ids = self.rollback_allocated_region_ids();
670
671 let table_lock = TableLock::Write(table_id).into();
672 let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
673
674 self.rollback_partition_metadata().await?;
675 let table_route_value = self.context.get_table_route_value().await?;
676 let original_region_routes = region_routes(table_id, table_route_value.get_inner_ref())?;
677 let mut current_region_routes = original_region_routes.clone();
678 self.rollback_group_metadata_for_selected_plans(&mut current_region_routes)
679 .await?;
680 let allocated_region_routes = DeallocateRegion::filter_deallocatable_region_routes(
681 table_id,
682 ¤t_region_routes,
683 &allocated_region_ids,
684 );
685 if !allocated_region_routes.is_empty() {
686 let table = TableName {
687 catalog_name: self.context.persistent_ctx.catalog_name.clone(),
688 schema_name: self.context.persistent_ctx.schema_name.clone(),
689 table_name: self.context.persistent_ctx.table_name.clone(),
690 };
691 if let Err(err) = DeallocateRegion::deallocate_regions(
694 &self.context.node_manager,
695 &self.context.leader_region_registry,
696 table,
697 table_id,
698 &allocated_region_routes,
699 )
700 .await
701 {
702 warn!(err; "Failed to drop allocated regions during repartition rollback, table_id: {}, regions: {:?}", table_id, allocated_region_ids);
703 }
704 }
705
706 let new_region_routes =
707 DeallocateRegion::generate_region_routes(¤t_region_routes, &allocated_region_ids);
708
709 if new_region_routes != *original_region_routes {
710 self.context
711 .update_table_route(&table_route_value, new_region_routes, HashMap::new())
712 .await
713 .map_err(BoxedError::new)
714 .with_context(|_| error::RetryLaterWithSourceSnafu {
715 reason: format!(
716 "Failed to rollback allocated region routes for repartition table: {}",
717 table_id
718 ),
719 })?;
720 }
721
722 if let Err(err) = self.context.invalidate_table_cache().await {
723 warn!(err; "Failed to invalidate table cache during repartition rollback, table_id: {}", table_id);
724 }
725
726 Ok(())
727 }
728}
729
730#[async_trait::async_trait]
731impl Procedure for RepartitionProcedure {
732 fn type_name(&self) -> &str {
733 Self::TYPE_NAME
734 }
735
736 #[tracing::instrument(skip_all, fields(
737 state = %self.state.name(),
738 table_id = %self.context.persistent_ctx.table_id
739 ))]
740 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
741 let state = &mut self.state;
742 let state_name = state.name();
743 common_telemetry::info!(
745 "Repartition procedure executing state: {}, table_id: {}",
746 state_name,
747 self.context.persistent_ctx.table_id
748 );
749 match state.next(&mut self.context, _ctx).await {
750 Ok((next, status)) => {
751 *state = next;
752 Ok(status)
753 }
754 Err(e) => {
755 if e.is_retryable() {
756 Err(ProcedureError::retry_later(e))
757 } else {
758 error!(
759 e;
760 "Repartition procedure failed, table id: {}",
761 self.context.persistent_ctx.table_id,
762 );
763 Err(ProcedureError::external(e))
764 }
765 }
766 }
767 }
768
769 async fn rollback(&mut self, ctx: &ProcedureContext) -> ProcedureResult<()> {
770 self.rollback_inner(ctx)
771 .await
772 .map_err(ProcedureError::external)
773 }
774
775 fn rollback_supported(&self) -> bool {
776 true
777 }
778
779 fn dump(&self) -> ProcedureResult<String> {
780 let data = RepartitionData {
781 state: self.state.as_ref(),
782 persistent_ctx: &self.context.persistent_ctx,
783 };
784 serde_json::to_string(&data).context(ToJsonSnafu)
785 }
786
787 fn lock_key(&self) -> LockKey {
788 LockKey::new(self.context.persistent_ctx.lock_key())
789 }
790
791 fn user_metadata(&self) -> Option<UserMetadata> {
792 None
794 }
795}
796
797pub struct DefaultRepartitionProcedureFactory {
798 mailbox: MailboxRef,
799 server_addr: String,
800}
801
802impl DefaultRepartitionProcedureFactory {
803 pub fn new(mailbox: MailboxRef, server_addr: String) -> Self {
804 Self {
805 mailbox,
806 server_addr,
807 }
808 }
809}
810
811impl RepartitionProcedureFactory for DefaultRepartitionProcedureFactory {
812 fn create(
813 &self,
814 ddl_ctx: &DdlContext,
815 table_name: TableName,
816 table_id: TableId,
817 source: RepartitionSource,
818 to_exprs: Vec<String>,
819 timeout: Option<Duration>,
820 ) -> std::result::Result<BoxedProcedure, BoxedError> {
821 let persistent_ctx = PersistentContext::new(table_name, table_id, timeout);
822 let from = match source {
823 RepartitionSource::Partitioned {
824 exprs,
825 target_partition_columns,
826 } => {
827 let exprs = exprs
828 .iter()
829 .map(|e| {
830 PartitionExpr::from_json_str(e)
831 .context(error::DeserializePartitionExprSnafu)?
832 .context(error::EmptyPartitionExprSnafu)
833 })
834 .collect::<Result<Vec<_>>>()
835 .map_err(BoxedError::new)?;
836 RepartitionFrom::Partitioned {
837 exprs,
838 target_partition_columns,
839 }
840 }
841 RepartitionSource::Unpartitioned { partition_columns } => {
842 RepartitionFrom::Unpartitioned { partition_columns }
843 }
844 };
845 let to_exprs = to_exprs
846 .iter()
847 .map(|e| {
848 PartitionExpr::from_json_str(e)
849 .context(error::DeserializePartitionExprSnafu)?
850 .context(error::EmptyPartitionExprSnafu)
851 })
852 .collect::<Result<Vec<_>>>()
853 .map_err(BoxedError::new)?;
854
855 let procedure = RepartitionProcedure::new(
856 from,
857 to_exprs,
858 Context::new(
859 ddl_ctx,
860 self.mailbox.clone(),
861 self.server_addr.clone(),
862 persistent_ctx,
863 ),
864 );
865
866 Ok(Box::new(procedure))
867 }
868
869 fn register_loaders(
870 &self,
871 ddl_ctx: &DdlContext,
872 procedure_manager: &ProcedureManagerRef,
873 ) -> std::result::Result<(), BoxedError> {
874 let mailbox = self.mailbox.clone();
876 let server_addr = self.server_addr.clone();
877 let moved_ddl_ctx = ddl_ctx.clone();
878 procedure_manager
879 .register_loader(
880 RepartitionProcedure::TYPE_NAME,
881 Box::new(move |json| {
882 let mailbox = mailbox.clone();
883 let server_addr = server_addr.clone();
884 let ddl_ctx = moved_ddl_ctx.clone();
885 let factory = move |persistent_ctx| {
886 Context::new(&ddl_ctx, mailbox, server_addr, persistent_ctx)
887 };
888 RepartitionProcedure::from_json(json, factory).map(|p| Box::new(p) as _)
889 }),
890 )
891 .map_err(BoxedError::new)?;
892
893 let mailbox = self.mailbox.clone();
895 let server_addr = self.server_addr.clone();
896 let moved_ddl_ctx = ddl_ctx.clone();
897 procedure_manager
898 .register_loader(
899 RepartitionGroupProcedure::TYPE_NAME,
900 Box::new(move |json| {
901 let mailbox = mailbox.clone();
902 let server_addr = server_addr.clone();
903 let ddl_ctx = moved_ddl_ctx.clone();
904 let factory = move |persistent_ctx| {
905 RepartitionGroupContext::new(&ddl_ctx, mailbox, server_addr, persistent_ctx)
906 };
907 RepartitionGroupProcedure::from_json(json, factory).map(|p| Box::new(p) as _)
908 }),
909 )
910 .map_err(BoxedError::new)?;
911
912 Ok(())
913 }
914}
915
916#[cfg(test)]
917mod tests {
918 use std::collections::HashMap;
919 use std::sync::Arc;
920 use std::sync::atomic::{AtomicBool, Ordering};
921
922 use common_error::ext::BoxedError;
923 use common_error::mock::MockError;
924 use common_error::status_code::StatusCode;
925 use common_meta::ddl::test_util::datanode_handler::{
926 DatanodeWatcher, NaiveDatanodeHandler, UnexpectedErrorDatanodeHandler,
927 };
928 use common_meta::error;
929 use common_meta::peer::Peer;
930 use common_meta::region_keeper::MemoryRegionKeeper;
931 use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
932 use common_meta::test_util::MockDatanodeManager;
933 use common_procedure::{Error as ProcedureError, Procedure, ProcedureId, ProcedureState};
934 use store_api::region_engine::RegionRole;
935 use store_api::storage::RegionId;
936 use table::table_name::TableName;
937 use tokio::sync::mpsc;
938 use uuid::Uuid;
939
940 use super::*;
941 use crate::procedure::repartition::allocate_region::AllocateRegion;
942 use crate::procedure::repartition::collect::Collect;
943 use crate::procedure::repartition::deallocate_region::DeallocateRegion;
944 use crate::procedure::repartition::dispatch::Dispatch;
945 use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
946 use crate::procedure::repartition::plan::{SourceRegionDescriptor, TargetRegionDescriptor};
947 use crate::procedure::repartition::repartition_end::RepartitionEnd;
948 use crate::procedure::repartition::test_util::{
949 TestingEnv, assert_parent_state, current_parent_region_routes, extract_subprocedure_ids,
950 new_parent_context, procedure_context_with_receivers, procedure_state_receiver, range_expr,
951 test_region_route, test_region_wal_options,
952 };
953 use crate::procedure::repartition::update_partition_metadata::{
954 PartitionMetadataUpdate, UpdatePartitionMetadata,
955 };
956
957 fn test_plan(table_id: TableId) -> RepartitionPlanEntry {
958 RepartitionPlanEntry {
959 group_id: uuid::Uuid::new_v4(),
960 source_regions: vec![SourceRegionDescriptor::partitioned(
961 RegionId::new(table_id, 1),
962 range_expr("x", 0, 100),
963 )],
964 target_regions: vec![
965 TargetRegionDescriptor {
966 region_id: RegionId::new(table_id, 1),
967 partition_expr: range_expr("x", 0, 50),
968 },
969 TargetRegionDescriptor {
970 region_id: RegionId::new(table_id, 3),
971 partition_expr: range_expr("x", 50, 100),
972 },
973 ],
974 allocated_region_ids: vec![RegionId::new(table_id, 3)],
975 pending_deallocate_region_ids: vec![],
976 transition_map: vec![vec![0, 1]],
977 original_target_routes: vec![],
978 }
979 }
980
981 fn with_rollback_metadata(
982 mut plan: RepartitionPlanEntry,
983 original_target_routes: Vec<RegionRoute>,
984 ) -> RepartitionPlanEntry {
985 plan.original_target_routes = original_target_routes;
986 plan
987 }
988
989 fn apply_group_staging(
990 plan: &RepartitionPlanEntry,
991 current_region_routes: &[RegionRoute],
992 ) -> Vec<RegionRoute> {
993 UpdateMetadata::apply_staging_region_routes(
994 plan.group_id,
995 &plan.source_regions,
996 &plan.target_regions,
997 &plan.pending_deallocate_region_ids,
998 current_region_routes,
999 )
1000 .unwrap()
1001 }
1002
1003 fn exit_group_staging(
1004 plan: &RepartitionPlanEntry,
1005 current_region_routes: &[RegionRoute],
1006 ) -> Vec<RegionRoute> {
1007 UpdateMetadata::exit_staging_region_routes(
1008 plan.group_id,
1009 &plan.source_regions,
1010 &plan.target_regions,
1011 current_region_routes,
1012 )
1013 .unwrap()
1014 }
1015
1016 fn region_route_by_id(region_routes: &[RegionRoute], region_id: RegionId) -> &RegionRoute {
1017 region_routes
1018 .iter()
1019 .find(|route| route.region.id == region_id)
1020 .unwrap()
1021 }
1022
1023 async fn table_partition_key_indices(ctx: &Context) -> Vec<usize> {
1024 ctx.get_table_info_value()
1025 .await
1026 .unwrap()
1027 .table_info
1028 .meta
1029 .partition_key_indices
1030 }
1031
1032 fn test_procedure(state: Box<dyn State>, context: Context) -> RepartitionProcedure {
1033 RepartitionProcedure { state, context }
1034 }
1035
1036 fn test_context(env: &TestingEnv, table_id: TableId) -> Context {
1037 let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
1038 let ddl_ctx = env.ddl_context(node_manager);
1039 let persistent_ctx = PersistentContext::new(
1040 TableName::new("test_catalog", "test_schema", "test_table"),
1041 table_id,
1042 None,
1043 );
1044
1045 Context::new(
1046 &ddl_ctx,
1047 env.mailbox_ctx.mailbox().clone(),
1048 env.server_addr.clone(),
1049 persistent_ctx,
1050 )
1051 }
1052
1053 #[test]
1054 fn test_filter_allocated_region_routes() {
1055 let table_id = 1024;
1056 let region_routes = vec![
1057 test_region_route(RegionId::new(table_id, 1), "a"),
1058 test_region_route(RegionId::new(table_id, 2), "b"),
1059 ];
1060 let allocated_region_ids = HashSet::from([RegionId::new(table_id, 2)]);
1061
1062 let new_region_routes =
1063 DeallocateRegion::generate_region_routes(®ion_routes, &allocated_region_ids);
1064
1065 assert_eq!(new_region_routes.len(), 1);
1066 assert_eq!(new_region_routes[0].region.id, RegionId::new(table_id, 1));
1067 }
1068
1069 #[test]
1070 fn test_should_rollback_after_metadata_update() {
1071 let env = TestingEnv::new();
1072 let table_id = 1024;
1073
1074 let procedure = test_procedure(
1075 Box::new(RepartitionStart::new(
1076 RepartitionFrom::Partitioned {
1077 exprs: vec![],
1078 target_partition_columns: None,
1079 },
1080 vec![],
1081 )),
1082 test_context(&env, table_id),
1083 );
1084 assert!(!procedure.should_rollback());
1085
1086 let procedure = test_procedure(
1087 Box::new(UpdatePartitionMetadata::new(vec![])),
1088 test_context(&env, table_id),
1089 );
1090 assert!(procedure.should_rollback());
1091
1092 let procedure = test_procedure(
1093 Box::new(AllocateRegion::new(vec![])),
1094 test_context(&env, table_id),
1095 );
1096 assert!(procedure.should_rollback());
1097
1098 let procedure = test_procedure(Box::new(Dispatch), test_context(&env, table_id));
1099 assert!(procedure.should_rollback());
1100
1101 let procedure =
1102 test_procedure(Box::new(Collect::new(vec![])), test_context(&env, table_id));
1103 assert!(procedure.should_rollback());
1104
1105 let procedure = test_procedure(Box::new(DeallocateRegion), test_context(&env, table_id));
1106 assert!(!procedure.should_rollback());
1107
1108 let procedure = test_procedure(Box::new(RepartitionEnd), test_context(&env, table_id));
1109 assert!(!procedure.should_rollback());
1110 }
1111
1112 #[test]
1113 fn test_register_operating_regions_preserves_route_roles() {
1114 let keeper = Arc::new(MemoryRegionKeeper::new());
1115 let region_routes = vec![
1116 RegionRoute {
1117 region: Region::new_test(RegionId::new(1024, 1)),
1118 leader_peer: Some(Peer::empty(1)),
1119 follower_peers: vec![],
1120 leader_state: None,
1121 leader_down_since: None,
1122 write_route_policy: None,
1123 },
1124 RegionRoute {
1125 region: Region::new_test(RegionId::new(1024, 2)),
1126 leader_peer: Some(Peer::empty(2)),
1127 follower_peers: vec![],
1128 leader_state: Some(LeaderState::Staging),
1129 leader_down_since: None,
1130 write_route_policy: None,
1131 },
1132 RegionRoute {
1133 region: Region::new_test(RegionId::new(1024, 3)),
1134 leader_peer: Some(Peer::empty(3)),
1135 follower_peers: vec![],
1136 leader_state: Some(LeaderState::Downgrading),
1137 leader_down_since: None,
1138 write_route_policy: None,
1139 },
1140 ];
1141
1142 let _guards = Context::register_operating_regions(&keeper, ®ion_routes).unwrap();
1143
1144 let leader_roles =
1145 keeper.extract_operating_region_roles(1, &HashSet::from([RegionId::new(1024, 1)]));
1146 let staging_roles =
1147 keeper.extract_operating_region_roles(2, &HashSet::from([RegionId::new(1024, 2)]));
1148 let downgrading_roles =
1149 keeper.extract_operating_region_roles(3, &HashSet::from([RegionId::new(1024, 3)]));
1150
1151 assert_eq!(
1152 leader_roles.get(&RegionId::new(1024, 1)),
1153 Some(&RegionRole::Leader)
1154 );
1155 assert_eq!(
1156 staging_roles.get(&RegionId::new(1024, 2)),
1157 Some(&RegionRole::StagingLeader)
1158 );
1159 assert_eq!(
1160 downgrading_roles.get(&RegionId::new(1024, 3)),
1161 Some(&RegionRole::DowngradingLeader)
1162 );
1163 }
1164
1165 #[test]
1166 fn test_persistent_context_partition_metadata_update_serde_default() {
1167 let json = r#"{
1168 "catalog_name":"test_catalog",
1169 "schema_name":"test_schema",
1170 "table_name":"test_table",
1171 "table_id":1024,
1172 "plans":[],
1173 "timeout":"120s"
1174 }"#;
1175
1176 let persistent_ctx: PersistentContext = serde_json::from_str(json).unwrap();
1177
1178 assert!(persistent_ctx.partition_metadata_update.is_none());
1179 }
1180
1181 #[tokio::test]
1182 async fn test_repartition_rollback_removes_partition_metadata_indices() {
1183 let env = TestingEnv::new();
1184 let table_id = 1024;
1185 let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
1186 env.create_physical_table_metadata_for_repartition(
1187 table_id,
1188 vec![test_region_route(RegionId::new(table_id, 1), "")],
1189 test_region_wal_options(&[1]),
1190 )
1191 .await;
1192
1193 let mut context = new_parent_context(&env, node_manager, table_id);
1194 let current = context.get_raw_table_info_value().await.unwrap();
1195 let mut table_info = current.table_info.clone();
1196 table_info.meta.partition_key_indices = vec![0, 1];
1197 context
1198 .update_table_info(¤t, current.update(table_info))
1199 .await
1200 .unwrap();
1201 context.persistent_ctx.partition_metadata_update = Some(
1202 PartitionMetadataUpdate::from_partitioned(vec![1], vec![0, 1]),
1203 );
1204 let mut procedure = RepartitionProcedure {
1205 state: Box::new(UpdatePartitionMetadata::new(vec![])),
1206 context,
1207 };
1208
1209 procedure
1210 .rollback(&TestingEnv::procedure_context())
1211 .await
1212 .unwrap();
1213
1214 assert_eq!(
1215 procedure
1216 .context
1217 .get_table_info_value()
1218 .await
1219 .unwrap()
1220 .table_info
1221 .meta
1222 .partition_key_indices,
1223 vec![1]
1224 );
1225 }
1226
1227 #[tokio::test]
1228 async fn test_repartition_rollback_removes_allocated_routes_from_dispatch() {
1229 let env = TestingEnv::new();
1230 let table_id = 1024;
1231 let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
1232 let ddl_ctx = env.ddl_context(node_manager);
1233 let original_region_routes = vec![
1234 test_region_route(
1235 RegionId::new(table_id, 1),
1236 &range_expr("x", 0, 100).as_json_str().unwrap(),
1237 ),
1238 test_region_route(
1239 RegionId::new(table_id, 2),
1240 &range_expr("x", 50, 100).as_json_str().unwrap(),
1241 ),
1242 test_region_route(RegionId::new(table_id, 3), ""),
1243 ];
1244 env.create_physical_table_metadata_with_wal_options(
1245 table_id,
1246 original_region_routes,
1247 test_region_wal_options(&[1, 2]),
1248 )
1249 .await;
1250
1251 let mut persistent_ctx = PersistentContext::new(
1252 TableName::new("test_catalog", "test_schema", "test_table"),
1253 table_id,
1254 None,
1255 );
1256 persistent_ctx.plans = vec![with_rollback_metadata(
1257 test_plan(table_id),
1258 vec![
1259 test_region_route(
1260 RegionId::new(table_id, 1),
1261 &range_expr("x", 0, 100).as_json_str().unwrap(),
1262 ),
1263 test_region_route(RegionId::new(table_id, 3), ""),
1264 ],
1265 )];
1266 persistent_ctx.failed_procedures = vec![ProcedureMeta {
1267 plan_index: 0,
1268 group_id: Uuid::new_v4(),
1269 procedure_id: ProcedureId::random(),
1270 }];
1271 let context = Context::new(
1272 &ddl_ctx,
1273 env.mailbox_ctx.mailbox().clone(),
1274 env.server_addr.clone(),
1275 persistent_ctx,
1276 );
1277 let mut procedure = RepartitionProcedure {
1278 state: Box::new(Dispatch),
1279 context,
1280 };
1281
1282 procedure
1283 .rollback(&TestingEnv::procedure_context())
1284 .await
1285 .unwrap();
1286
1287 let region_routes = current_parent_region_routes(&procedure.context).await;
1288 assert_eq!(region_routes.len(), 2);
1289 assert_eq!(region_routes[0].region.id, RegionId::new(table_id, 1));
1290 assert_eq!(region_routes[1].region.id, RegionId::new(table_id, 2));
1291 }
1292
1293 #[tokio::test]
1294 async fn test_repartition_rollback_removes_allocated_routes_from_allocate() {
1295 let env = TestingEnv::new();
1296 let table_id = 1024;
1297 let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
1298 let ddl_ctx = env.ddl_context(node_manager);
1299 let original_region_routes = vec![
1300 test_region_route(
1301 RegionId::new(table_id, 1),
1302 &range_expr("x", 0, 100).as_json_str().unwrap(),
1303 ),
1304 test_region_route(
1305 RegionId::new(table_id, 2),
1306 &range_expr("x", 50, 100).as_json_str().unwrap(),
1307 ),
1308 test_region_route(RegionId::new(table_id, 3), ""),
1309 ];
1310 env.create_physical_table_metadata_with_wal_options(
1311 table_id,
1312 original_region_routes,
1313 test_region_wal_options(&[1, 2]),
1314 )
1315 .await;
1316
1317 let mut persistent_ctx = PersistentContext::new(
1318 TableName::new("test_catalog", "test_schema", "test_table"),
1319 table_id,
1320 None,
1321 );
1322 persistent_ctx.plans = vec![test_plan(table_id)];
1323 let context = Context::new(
1324 &ddl_ctx,
1325 env.mailbox_ctx.mailbox().clone(),
1326 env.server_addr.clone(),
1327 persistent_ctx,
1328 );
1329 let mut procedure = RepartitionProcedure {
1330 state: Box::new(AllocateRegion::new(vec![])),
1331 context,
1332 };
1333
1334 procedure
1335 .rollback(&TestingEnv::procedure_context())
1336 .await
1337 .unwrap();
1338
1339 let region_routes = current_parent_region_routes(&procedure.context).await;
1340 assert_eq!(region_routes.len(), 2);
1341 assert_eq!(region_routes[0].region.id, RegionId::new(table_id, 1));
1342 assert_eq!(region_routes[1].region.id, RegionId::new(table_id, 2));
1343 }
1344
1345 #[tokio::test]
1346 async fn test_repartition_rollback_from_collect_only_removes_failed_allocated_routes() {
1347 let env = TestingEnv::new();
1348 let table_id = 1024;
1349 let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
1350 let ddl_ctx = env.ddl_context(node_manager);
1351 let original_region_routes = vec![
1352 test_region_route(
1353 RegionId::new(table_id, 1),
1354 &range_expr("x", 0, 100).as_json_str().unwrap(),
1355 ),
1356 test_region_route(
1357 RegionId::new(table_id, 2),
1358 &range_expr("x", 100, 200).as_json_str().unwrap(),
1359 ),
1360 test_region_route(RegionId::new(table_id, 3), ""),
1361 test_region_route(RegionId::new(table_id, 4), ""),
1362 ];
1363 env.create_physical_table_metadata_with_wal_options(
1364 table_id,
1365 original_region_routes,
1366 test_region_wal_options(&[1, 2, 3, 4]),
1367 )
1368 .await;
1369
1370 let mut persistent_ctx = PersistentContext::new(
1371 TableName::new("test_catalog", "test_schema", "test_table"),
1372 table_id,
1373 None,
1374 );
1375 let failed_plan = test_plan(table_id);
1376 let failed_plan = with_rollback_metadata(
1377 failed_plan,
1378 vec![
1379 test_region_route(
1380 RegionId::new(table_id, 1),
1381 &range_expr("x", 0, 100).as_json_str().unwrap(),
1382 ),
1383 test_region_route(RegionId::new(table_id, 3), ""),
1384 ],
1385 );
1386 let succeeded_plan = RepartitionPlanEntry {
1387 group_id: Uuid::new_v4(),
1388 source_regions: vec![SourceRegionDescriptor::partitioned(
1389 RegionId::new(table_id, 2),
1390 range_expr("x", 100, 200),
1391 )],
1392 target_regions: vec![
1393 TargetRegionDescriptor {
1394 region_id: RegionId::new(table_id, 2),
1395 partition_expr: range_expr("x", 100, 150),
1396 },
1397 TargetRegionDescriptor {
1398 region_id: RegionId::new(table_id, 4),
1399 partition_expr: range_expr("x", 150, 200),
1400 },
1401 ],
1402 allocated_region_ids: vec![RegionId::new(table_id, 4)],
1403 pending_deallocate_region_ids: vec![],
1404 transition_map: vec![vec![0]],
1405 original_target_routes: vec![
1406 test_region_route(
1407 RegionId::new(table_id, 2),
1408 &range_expr("x", 100, 200).as_json_str().unwrap(),
1409 ),
1410 test_region_route(RegionId::new(table_id, 4), ""),
1411 ],
1412 };
1413 persistent_ctx.plans = vec![failed_plan, succeeded_plan];
1414 persistent_ctx.failed_procedures = vec![ProcedureMeta {
1415 plan_index: 0,
1416 group_id: persistent_ctx.plans[0].group_id,
1417 procedure_id: ProcedureId::random(),
1418 }];
1419
1420 let context = Context::new(
1421 &ddl_ctx,
1422 env.mailbox_ctx.mailbox().clone(),
1423 env.server_addr.clone(),
1424 persistent_ctx,
1425 );
1426 let mut procedure = RepartitionProcedure {
1427 state: Box::new(Collect::new(vec![])),
1428 context,
1429 };
1430
1431 procedure
1432 .rollback(&TestingEnv::procedure_context())
1433 .await
1434 .unwrap();
1435
1436 let region_routes = current_parent_region_routes(&procedure.context).await;
1437 assert_eq!(region_routes.len(), 3);
1438 assert_eq!(region_routes[0].region.id, RegionId::new(table_id, 1));
1439 assert_eq!(region_routes[1].region.id, RegionId::new(table_id, 2));
1440 assert_eq!(region_routes[2].region.id, RegionId::new(table_id, 4));
1441 }
1442
1443 #[tokio::test]
1444 async fn test_repartition_rollback_from_collect_restores_failed_group_metadata_only() {
1445 let env = TestingEnv::new();
1446 let table_id = 1024;
1447 let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
1448 let ddl_ctx = env.ddl_context(node_manager);
1449 let original_region_routes = vec![
1450 test_region_route(
1451 RegionId::new(table_id, 1),
1452 &range_expr("x", 0, 100).as_json_str().unwrap(),
1453 ),
1454 test_region_route(
1455 RegionId::new(table_id, 2),
1456 &range_expr("x", 100, 200).as_json_str().unwrap(),
1457 ),
1458 test_region_route(RegionId::new(table_id, 3), ""),
1459 test_region_route(RegionId::new(table_id, 4), ""),
1460 ];
1461
1462 let failed_plan = with_rollback_metadata(
1463 test_plan(table_id),
1464 vec![
1465 original_region_routes[0].clone(),
1466 original_region_routes[2].clone(),
1467 ],
1468 );
1469 let succeeded_plan = RepartitionPlanEntry {
1470 group_id: Uuid::new_v4(),
1471 source_regions: vec![SourceRegionDescriptor::partitioned(
1472 RegionId::new(table_id, 2),
1473 range_expr("x", 100, 200),
1474 )],
1475 target_regions: vec![
1476 TargetRegionDescriptor {
1477 region_id: RegionId::new(table_id, 2),
1478 partition_expr: range_expr("x", 100, 150),
1479 },
1480 TargetRegionDescriptor {
1481 region_id: RegionId::new(table_id, 4),
1482 partition_expr: range_expr("x", 150, 200),
1483 },
1484 ],
1485 allocated_region_ids: vec![RegionId::new(table_id, 4)],
1486 pending_deallocate_region_ids: vec![],
1487 transition_map: vec![vec![0, 1]],
1488 original_target_routes: vec![
1489 original_region_routes[1].clone(),
1490 original_region_routes[3].clone(),
1491 ],
1492 };
1493 let current_region_routes = apply_group_staging(&failed_plan, &original_region_routes);
1494 let current_region_routes = apply_group_staging(&succeeded_plan, ¤t_region_routes);
1495 let current_region_routes = exit_group_staging(&succeeded_plan, ¤t_region_routes);
1496 env.create_physical_table_metadata_with_wal_options(
1497 table_id,
1498 current_region_routes,
1499 test_region_wal_options(&[1, 2, 3, 4]),
1500 )
1501 .await;
1502
1503 let mut persistent_ctx = PersistentContext::new(
1504 TableName::new("test_catalog", "test_schema", "test_table"),
1505 table_id,
1506 None,
1507 );
1508 persistent_ctx.plans = vec![failed_plan, succeeded_plan.clone()];
1509 persistent_ctx.failed_procedures = vec![ProcedureMeta {
1510 plan_index: 0,
1511 group_id: persistent_ctx.plans[0].group_id,
1512 procedure_id: ProcedureId::random(),
1513 }];
1514
1515 let context = Context::new(
1516 &ddl_ctx,
1517 env.mailbox_ctx.mailbox().clone(),
1518 env.server_addr.clone(),
1519 persistent_ctx,
1520 );
1521 let mut procedure = RepartitionProcedure {
1522 state: Box::new(Collect::new(vec![])),
1523 context,
1524 };
1525
1526 procedure
1527 .rollback(&TestingEnv::procedure_context())
1528 .await
1529 .unwrap();
1530
1531 assert_eq!(
1532 current_parent_region_routes(&procedure.context).await,
1533 vec![
1534 test_region_route(
1535 RegionId::new(table_id, 1),
1536 &range_expr("x", 0, 100).as_json_str().unwrap(),
1537 ),
1538 RegionRoute {
1539 region: Region {
1540 id: RegionId::new(table_id, 2),
1541 partition_expr: range_expr("x", 100, 150).as_json_str().unwrap(),
1542 ..Default::default()
1543 },
1544 leader_peer: Some(Peer::empty(1)),
1545 ..Default::default()
1546 },
1547 RegionRoute {
1548 region: Region {
1549 id: RegionId::new(table_id, 4),
1550 partition_expr: range_expr("x", 150, 200).as_json_str().unwrap(),
1551 ..Default::default()
1552 },
1553 leader_peer: Some(Peer::empty(1)),
1554 ..Default::default()
1555 },
1556 ]
1557 );
1558 }
1559
1560 #[tokio::test]
1561 async fn test_repartition_rollback_from_collect_restores_unknown_group_metadata() {
1562 let env = TestingEnv::new();
1563 let table_id = 1024;
1564 let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
1565 let ddl_ctx = env.ddl_context(node_manager);
1566 let original_region_routes = vec![
1567 test_region_route(
1568 RegionId::new(table_id, 1),
1569 &range_expr("x", 0, 100).as_json_str().unwrap(),
1570 ),
1571 test_region_route(
1572 RegionId::new(table_id, 2),
1573 &range_expr("x", 100, 200).as_json_str().unwrap(),
1574 ),
1575 test_region_route(RegionId::new(table_id, 3), ""),
1576 ];
1577 let plan = with_rollback_metadata(
1578 test_plan(table_id),
1579 vec![
1580 original_region_routes[0].clone(),
1581 original_region_routes[2].clone(),
1582 ],
1583 );
1584 let staged_region_routes = apply_group_staging(&plan, &original_region_routes);
1585 assert_eq!(
1586 region_route_by_id(&staged_region_routes, RegionId::new(table_id, 1))
1587 .region
1588 .partition_expr(),
1589 range_expr("x", 0, 50).as_json_str().unwrap()
1590 );
1591 assert!(
1592 region_route_by_id(&staged_region_routes, RegionId::new(table_id, 1))
1593 .is_leader_staging()
1594 );
1595 assert_eq!(
1596 region_route_by_id(&staged_region_routes, RegionId::new(table_id, 3))
1597 .region
1598 .partition_expr(),
1599 range_expr("x", 50, 100).as_json_str().unwrap()
1600 );
1601 assert!(
1602 region_route_by_id(&staged_region_routes, RegionId::new(table_id, 3))
1603 .is_leader_staging()
1604 );
1605 env.create_physical_table_metadata_with_wal_options(
1606 table_id,
1607 staged_region_routes,
1608 test_region_wal_options(&[1, 2, 3]),
1609 )
1610 .await;
1611
1612 let mut persistent_ctx = PersistentContext::new(
1613 TableName::new("test_catalog", "test_schema", "test_table"),
1614 table_id,
1615 None,
1616 );
1617 persistent_ctx.plans = vec![plan.clone()];
1618 persistent_ctx.unknown_procedures = vec![ProcedureMeta {
1619 plan_index: 0,
1620 group_id: plan.group_id,
1621 procedure_id: ProcedureId::random(),
1622 }];
1623
1624 let context = Context::new(
1625 &ddl_ctx,
1626 env.mailbox_ctx.mailbox().clone(),
1627 env.server_addr.clone(),
1628 persistent_ctx,
1629 );
1630 let mut procedure = RepartitionProcedure {
1631 state: Box::new(Collect::new(vec![])),
1632 context,
1633 };
1634
1635 procedure
1636 .rollback(&TestingEnv::procedure_context())
1637 .await
1638 .unwrap();
1639
1640 assert_eq!(
1641 current_parent_region_routes(&procedure.context).await,
1642 vec![
1643 original_region_routes[0].clone(),
1644 original_region_routes[1].clone()
1645 ]
1646 );
1647 }
1648
1649 #[tokio::test]
1650 async fn test_repartition_rollback_is_idempotent() {
1651 let env = TestingEnv::new();
1652 let table_id = 1024;
1653 let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
1654 let ddl_ctx = env.ddl_context(node_manager);
1655 let original_region_routes = vec![
1656 test_region_route(
1657 RegionId::new(table_id, 1),
1658 &range_expr("x", 0, 100).as_json_str().unwrap(),
1659 ),
1660 test_region_route(
1661 RegionId::new(table_id, 2),
1662 &range_expr("x", 50, 100).as_json_str().unwrap(),
1663 ),
1664 test_region_route(RegionId::new(table_id, 3), ""),
1665 ];
1666 env.create_physical_table_metadata_with_wal_options(
1667 table_id,
1668 original_region_routes,
1669 test_region_wal_options(&[1, 2]),
1670 )
1671 .await;
1672
1673 let mut persistent_ctx = PersistentContext::new(
1674 TableName::new("test_catalog", "test_schema", "test_table"),
1675 table_id,
1676 None,
1677 );
1678 persistent_ctx.plans = vec![with_rollback_metadata(
1679 test_plan(table_id),
1680 vec![
1681 test_region_route(
1682 RegionId::new(table_id, 1),
1683 &range_expr("x", 0, 100).as_json_str().unwrap(),
1684 ),
1685 test_region_route(RegionId::new(table_id, 3), ""),
1686 ],
1687 )];
1688 persistent_ctx.failed_procedures = vec![ProcedureMeta {
1689 plan_index: 0,
1690 group_id: Uuid::new_v4(),
1691 procedure_id: ProcedureId::random(),
1692 }];
1693 let context = Context::new(
1694 &ddl_ctx,
1695 env.mailbox_ctx.mailbox().clone(),
1696 env.server_addr.clone(),
1697 persistent_ctx,
1698 );
1699 let mut procedure = RepartitionProcedure {
1700 state: Box::new(Dispatch),
1701 context,
1702 };
1703
1704 procedure
1705 .rollback(&TestingEnv::procedure_context())
1706 .await
1707 .unwrap();
1708 let once = current_parent_region_routes(&procedure.context).await;
1709
1710 procedure
1711 .rollback(&TestingEnv::procedure_context())
1712 .await
1713 .unwrap();
1714 let twice = current_parent_region_routes(&procedure.context).await;
1715
1716 assert_eq!(once, twice);
1717 assert_eq!(once.len(), 2);
1718 assert_eq!(once[0].region.id, RegionId::new(table_id, 1));
1719 assert_eq!(once[1].region.id, RegionId::new(table_id, 2));
1720 }
1721
1722 #[tokio::test]
1723 async fn test_repartition_rollback_from_collect_restores_failed_merge_group_metadata_only() {
1724 let env = TestingEnv::new();
1725 let table_id = 1024;
1726 let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
1727 let ddl_ctx = env.ddl_context(node_manager);
1728 let original_region_routes = vec![
1729 test_region_route(
1730 RegionId::new(table_id, 1),
1731 &range_expr("x", 0, 100).as_json_str().unwrap(),
1732 ),
1733 test_region_route(
1734 RegionId::new(table_id, 2),
1735 &range_expr("x", 100, 200).as_json_str().unwrap(),
1736 ),
1737 test_region_route(
1738 RegionId::new(table_id, 3),
1739 &range_expr("x", 200, 300).as_json_str().unwrap(),
1740 ),
1741 test_region_route(RegionId::new(table_id, 4), ""),
1742 ];
1743 let failed_merge_plan = RepartitionPlanEntry {
1744 group_id: Uuid::new_v4(),
1745 source_regions: vec![
1746 SourceRegionDescriptor::partitioned(
1747 RegionId::new(table_id, 1),
1748 range_expr("x", 0, 100),
1749 ),
1750 SourceRegionDescriptor::partitioned(
1751 RegionId::new(table_id, 2),
1752 range_expr("x", 100, 200),
1753 ),
1754 ],
1755 target_regions: vec![TargetRegionDescriptor {
1756 region_id: RegionId::new(table_id, 1),
1757 partition_expr: range_expr("x", 0, 200),
1758 }],
1759 allocated_region_ids: vec![],
1760 pending_deallocate_region_ids: vec![RegionId::new(table_id, 2)],
1761 transition_map: vec![vec![0], vec![0]],
1762 original_target_routes: vec![original_region_routes[0].clone()],
1763 };
1764 let succeeded_split_plan = RepartitionPlanEntry {
1765 group_id: Uuid::new_v4(),
1766 source_regions: vec![SourceRegionDescriptor::partitioned(
1767 RegionId::new(table_id, 3),
1768 range_expr("x", 200, 300),
1769 )],
1770 target_regions: vec![
1771 TargetRegionDescriptor {
1772 region_id: RegionId::new(table_id, 3),
1773 partition_expr: range_expr("x", 200, 250),
1774 },
1775 TargetRegionDescriptor {
1776 region_id: RegionId::new(table_id, 4),
1777 partition_expr: range_expr("x", 250, 300),
1778 },
1779 ],
1780 allocated_region_ids: vec![RegionId::new(table_id, 4)],
1781 pending_deallocate_region_ids: vec![],
1782 transition_map: vec![vec![0, 1]],
1783 original_target_routes: vec![
1784 original_region_routes[2].clone(),
1785 original_region_routes[3].clone(),
1786 ],
1787 };
1788 let current_region_routes =
1789 apply_group_staging(&failed_merge_plan, &original_region_routes);
1790 let current_region_routes =
1791 apply_group_staging(&succeeded_split_plan, ¤t_region_routes);
1792 let staged_region_routes =
1793 exit_group_staging(&succeeded_split_plan, ¤t_region_routes);
1794 env.create_physical_table_metadata_with_wal_options(
1795 table_id,
1796 staged_region_routes,
1797 test_region_wal_options(&[1, 2, 3, 4]),
1798 )
1799 .await;
1800
1801 let mut persistent_ctx = PersistentContext::new(
1802 TableName::new("test_catalog", "test_schema", "test_table"),
1803 table_id,
1804 None,
1805 );
1806 persistent_ctx.plans = vec![failed_merge_plan, succeeded_split_plan.clone()];
1807 persistent_ctx.failed_procedures = vec![ProcedureMeta {
1808 plan_index: 0,
1809 group_id: persistent_ctx.plans[0].group_id,
1810 procedure_id: ProcedureId::random(),
1811 }];
1812
1813 let context = Context::new(
1814 &ddl_ctx,
1815 env.mailbox_ctx.mailbox().clone(),
1816 env.server_addr.clone(),
1817 persistent_ctx,
1818 );
1819 let mut procedure = RepartitionProcedure {
1820 state: Box::new(Collect::new(vec![])),
1821 context,
1822 };
1823
1824 procedure
1825 .rollback(&TestingEnv::procedure_context())
1826 .await
1827 .unwrap();
1828
1829 let region_routes = current_parent_region_routes(&procedure.context).await;
1830 assert_eq!(
1831 region_routes,
1832 vec![
1833 test_region_route(
1834 RegionId::new(table_id, 1),
1835 &range_expr("x", 0, 100).as_json_str().unwrap(),
1836 ),
1837 test_region_route(
1838 RegionId::new(table_id, 2),
1839 &range_expr("x", 100, 200).as_json_str().unwrap(),
1840 ),
1841 RegionRoute {
1842 region: Region {
1843 id: RegionId::new(table_id, 3),
1844 partition_expr: range_expr("x", 200, 250).as_json_str().unwrap(),
1845 ..Default::default()
1846 },
1847 leader_peer: Some(Peer::empty(1)),
1848 ..Default::default()
1849 },
1850 RegionRoute {
1851 region: Region {
1852 id: RegionId::new(table_id, 4),
1853 partition_expr: range_expr("x", 250, 300).as_json_str().unwrap(),
1854 ..Default::default()
1855 },
1856 leader_peer: Some(Peer::empty(1)),
1857 ..Default::default()
1858 },
1859 ]
1860 );
1861 }
1862
1863 #[tokio::test]
1864 async fn test_repartition_procedure_flow_split_failed_and_full_rollback() {
1865 let env = TestingEnv::new();
1866 let table_id = 1024;
1867 let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
1868
1869 env.create_physical_table_metadata_for_repartition(
1870 table_id,
1871 vec![
1872 test_region_route(
1873 RegionId::new(table_id, 1),
1874 &range_expr("x", 0, 100).as_json_str().unwrap(),
1875 ),
1876 test_region_route(
1877 RegionId::new(table_id, 2),
1878 &range_expr("x", 100, 200).as_json_str().unwrap(),
1879 ),
1880 ],
1881 test_region_wal_options(&[1, 2]),
1882 )
1883 .await;
1884
1885 let context = new_parent_context(&env, node_manager, table_id);
1886 let mut procedure = RepartitionProcedure::new(
1887 RepartitionFrom::Partitioned {
1888 exprs: vec![range_expr("x", 0, 100)],
1889 target_partition_columns: None,
1890 },
1891 vec![range_expr("x", 0, 50), range_expr("x", 50, 100)],
1892 context,
1893 );
1894
1895 let start_status = procedure
1896 .execute(&TestingEnv::procedure_context())
1897 .await
1898 .unwrap();
1899 assert!(!start_status.need_persist());
1900 let start_status = procedure
1901 .execute(&TestingEnv::procedure_context())
1902 .await
1903 .unwrap();
1904 assert!(start_status.need_persist());
1905 assert_parent_state::<AllocateRegion>(&procedure);
1906
1907 let allocate_status = procedure
1908 .execute(&TestingEnv::procedure_context())
1909 .await
1910 .unwrap();
1911 assert!(allocate_status.need_persist());
1912 assert_parent_state::<Dispatch>(&procedure);
1913 assert_eq!(procedure.context.persistent_ctx.plans.len(), 1);
1914 let plan = &procedure.context.persistent_ctx.plans[0];
1915 let expected_plan = test_plan(table_id);
1916 assert_eq!(plan.source_regions, expected_plan.source_regions);
1917 assert_eq!(plan.target_regions, expected_plan.target_regions);
1918 assert_eq!(
1919 plan.allocated_region_ids,
1920 expected_plan.allocated_region_ids
1921 );
1922 assert_eq!(
1923 plan.pending_deallocate_region_ids,
1924 expected_plan.pending_deallocate_region_ids
1925 );
1926 assert_eq!(plan.transition_map, expected_plan.transition_map);
1927 assert_eq!(
1928 current_parent_region_routes(&procedure.context).await,
1929 vec![
1930 test_region_route(
1931 RegionId::new(table_id, 1),
1932 &range_expr("x", 0, 100).as_json_str().unwrap(),
1933 ),
1934 test_region_route(
1935 RegionId::new(table_id, 2),
1936 &range_expr("x", 100, 200).as_json_str().unwrap(),
1937 ),
1938 RegionRoute {
1939 region: Region {
1940 id: RegionId::new(table_id, 3),
1941 partition_expr: range_expr("x", 50, 100).as_json_str().unwrap(),
1942 ..Default::default()
1943 },
1944 leader_peer: Some(Peer::empty(0)),
1945 ..Default::default()
1946 },
1947 ]
1948 );
1949
1950 let dispatch_status = procedure
1951 .execute(&TestingEnv::procedure_context())
1952 .await
1953 .unwrap();
1954 assert!(!dispatch_status.need_persist());
1955 let subprocedure_ids = extract_subprocedure_ids(dispatch_status);
1956 assert_eq!(subprocedure_ids.len(), 1);
1957 assert_parent_state::<Collect>(&procedure);
1958
1959 let failed_state = ProcedureState::failed(Arc::new(ProcedureError::external(
1960 MockError::new(StatusCode::Internal),
1961 )));
1962 let collect_ctx = procedure_context_with_receivers(HashMap::from([(
1963 subprocedure_ids[0],
1964 procedure_state_receiver(failed_state),
1965 )]));
1966
1967 let err = procedure.execute(&collect_ctx).await.unwrap_err();
1968 assert!(!err.is_retry_later());
1969 assert_parent_state::<Collect>(&procedure);
1970
1971 procedure
1972 .rollback(&TestingEnv::procedure_context())
1973 .await
1974 .unwrap();
1975
1976 let region_routes = current_parent_region_routes(&procedure.context).await;
1977 assert_eq!(
1978 region_routes,
1979 vec![
1980 test_region_route(
1981 RegionId::new(table_id, 1),
1982 &range_expr("x", 0, 100).as_json_str().unwrap(),
1983 ),
1984 test_region_route(
1985 RegionId::new(table_id, 2),
1986 &range_expr("x", 100, 200).as_json_str().unwrap(),
1987 ),
1988 ]
1989 );
1990 }
1991
1992 #[tokio::test]
1993 async fn test_repartition_procedure_flow_unpartitioned_failed_and_full_rollback() {
1994 let env = TestingEnv::new();
1995 let table_id = 1024;
1996 let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
1997
1998 env.create_physical_table_metadata_for_repartition(
1999 table_id,
2000 vec![test_region_route(RegionId::new(table_id, 1), "")],
2001 test_region_wal_options(&[1]),
2002 )
2003 .await;
2004
2005 let context = new_parent_context(&env, node_manager, table_id);
2006 let to_exprs = vec![range_expr("col1", 0, 50), range_expr("col1", 50, 100)];
2007 let mut procedure = RepartitionProcedure::new(
2008 RepartitionFrom::Unpartitioned {
2009 partition_columns: vec!["col1".to_string()],
2010 },
2011 to_exprs.clone(),
2012 context,
2013 );
2014
2015 let start_status = procedure
2016 .execute(&TestingEnv::procedure_context())
2017 .await
2018 .unwrap();
2019 assert!(start_status.need_persist());
2020 assert_parent_state::<UpdatePartitionMetadata>(&procedure);
2021 assert_eq!(
2022 procedure
2023 .context
2024 .persistent_ctx
2025 .partition_metadata_update
2026 .as_ref()
2027 .unwrap()
2028 .target_partition_key_indices,
2029 vec![0]
2030 );
2031
2032 let update_status = procedure
2033 .execute(&TestingEnv::procedure_context())
2034 .await
2035 .unwrap();
2036 assert!(update_status.need_persist());
2037 assert_parent_state::<AllocateRegion>(&procedure);
2038 assert_eq!(
2039 table_partition_key_indices(&procedure.context).await,
2040 vec![0]
2041 );
2042
2043 let build_allocate_status = procedure
2044 .execute(&TestingEnv::procedure_context())
2045 .await
2046 .unwrap();
2047 assert!(build_allocate_status.need_persist());
2048 assert_parent_state::<AllocateRegion>(&procedure);
2049 assert_eq!(procedure.context.persistent_ctx.plans.len(), 1);
2050 let plan = &procedure.context.persistent_ctx.plans[0];
2051 assert_eq!(
2052 plan.source_regions,
2053 vec![SourceRegionDescriptor::Default {
2054 region_id: RegionId::new(table_id, 1)
2055 }]
2056 );
2057 assert_eq!(plan.target_regions.len(), 2);
2058 assert_eq!(plan.target_regions[0].region_id, RegionId::new(table_id, 1));
2059 assert_eq!(plan.target_regions[0].partition_expr, to_exprs[0]);
2060 assert_eq!(
2061 plan.allocated_region_ids,
2062 vec![plan.target_regions[1].region_id]
2063 );
2064 assert!(plan.pending_deallocate_region_ids.is_empty());
2065 assert_eq!(plan.transition_map, vec![vec![0, 1]]);
2066 let target_regions = plan.target_regions.clone();
2067
2068 let execute_allocate_status = procedure
2069 .execute(&TestingEnv::procedure_context())
2070 .await
2071 .unwrap();
2072 assert!(execute_allocate_status.need_persist());
2073 assert_parent_state::<Dispatch>(&procedure);
2074 let region_routes = current_parent_region_routes(&procedure.context).await;
2075 assert_eq!(region_routes.len(), 2);
2076 assert_eq!(
2077 region_route_by_id(®ion_routes, target_regions[0].region_id)
2078 .region
2079 .partition_expr(),
2080 ""
2081 );
2082 assert_eq!(
2083 region_route_by_id(®ion_routes, target_regions[1].region_id)
2084 .region
2085 .partition_expr(),
2086 to_exprs[1].as_json_str().unwrap()
2087 );
2088
2089 let dispatch_status = procedure
2090 .execute(&TestingEnv::procedure_context())
2091 .await
2092 .unwrap();
2093 let subprocedure_ids = extract_subprocedure_ids(dispatch_status);
2094 assert_eq!(subprocedure_ids.len(), 1);
2095 assert_parent_state::<Collect>(&procedure);
2096
2097 let failed_state = ProcedureState::failed(Arc::new(ProcedureError::external(
2098 MockError::new(StatusCode::Internal),
2099 )));
2100 let collect_ctx = procedure_context_with_receivers(HashMap::from([(
2101 subprocedure_ids[0],
2102 procedure_state_receiver(failed_state),
2103 )]));
2104 let err = procedure.execute(&collect_ctx).await.unwrap_err();
2105 assert!(!err.is_retry_later());
2106 assert_parent_state::<Collect>(&procedure);
2107
2108 procedure
2109 .rollback(&TestingEnv::procedure_context())
2110 .await
2111 .unwrap();
2112
2113 assert!(
2114 table_partition_key_indices(&procedure.context)
2115 .await
2116 .is_empty()
2117 );
2118 assert_eq!(
2119 current_parent_region_routes(&procedure.context).await,
2120 vec![test_region_route(RegionId::new(table_id, 1), "")]
2121 );
2122 }
2123
2124 #[tokio::test]
2125 async fn test_repartition_procedure_flow_unpartitioned_rollback_is_idempotent() {
2126 let env = TestingEnv::new();
2127 let table_id = 1024;
2128 let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
2129
2130 env.create_physical_table_metadata_for_repartition(
2131 table_id,
2132 vec![test_region_route(RegionId::new(table_id, 1), "")],
2133 test_region_wal_options(&[1]),
2134 )
2135 .await;
2136
2137 let context = new_parent_context(&env, node_manager, table_id);
2138 let mut procedure = RepartitionProcedure::new(
2139 RepartitionFrom::Unpartitioned {
2140 partition_columns: vec!["col1".to_string()],
2141 },
2142 vec![range_expr("col1", 0, 50), range_expr("col1", 50, 100)],
2143 context,
2144 );
2145
2146 procedure
2147 .execute(&TestingEnv::procedure_context())
2148 .await
2149 .unwrap();
2150 procedure
2151 .execute(&TestingEnv::procedure_context())
2152 .await
2153 .unwrap();
2154 procedure
2155 .execute(&TestingEnv::procedure_context())
2156 .await
2157 .unwrap();
2158 procedure
2159 .execute(&TestingEnv::procedure_context())
2160 .await
2161 .unwrap();
2162 assert_eq!(
2163 table_partition_key_indices(&procedure.context).await,
2164 vec![0]
2165 );
2166 assert_eq!(
2167 current_parent_region_routes(&procedure.context).await.len(),
2168 2
2169 );
2170
2171 let dispatch_status = procedure
2172 .execute(&TestingEnv::procedure_context())
2173 .await
2174 .unwrap();
2175 let subprocedure_ids = extract_subprocedure_ids(dispatch_status);
2176 assert_eq!(subprocedure_ids.len(), 1);
2177 assert_parent_state::<Collect>(&procedure);
2178
2179 let failed_state = ProcedureState::failed(Arc::new(ProcedureError::external(
2180 MockError::new(StatusCode::Internal),
2181 )));
2182 let collect_ctx = procedure_context_with_receivers(HashMap::from([(
2183 subprocedure_ids[0],
2184 procedure_state_receiver(failed_state),
2185 )]));
2186 let err = procedure.execute(&collect_ctx).await.unwrap_err();
2187 assert!(!err.is_retry_later());
2188
2189 procedure
2190 .rollback(&TestingEnv::procedure_context())
2191 .await
2192 .unwrap();
2193 let once_indices = table_partition_key_indices(&procedure.context).await;
2194 let once_routes = current_parent_region_routes(&procedure.context).await;
2195
2196 procedure
2197 .rollback(&TestingEnv::procedure_context())
2198 .await
2199 .unwrap();
2200 let twice_indices = table_partition_key_indices(&procedure.context).await;
2201 let twice_routes = current_parent_region_routes(&procedure.context).await;
2202
2203 assert_eq!(once_indices, twice_indices);
2204 assert_eq!(once_routes, twice_routes);
2205 assert!(twice_indices.is_empty());
2206 assert_eq!(
2207 twice_routes,
2208 vec![test_region_route(RegionId::new(table_id, 1), "")]
2209 );
2210 }
2211
2212 #[tokio::test]
2213 async fn test_repartition_procedure_flow_split_allocate_retryable_then_resume() {
2214 common_telemetry::init_default_ut_logging();
2215 let env = TestingEnv::new();
2216 let table_id = 1024;
2217 let (tx, _rx) = mpsc::channel(8);
2218 let should_retry = Arc::new(AtomicBool::new(true));
2219 let datanode_handler = DatanodeWatcher::new(tx).with_handler(move |_, _| {
2220 if should_retry.swap(false, Ordering::SeqCst) {
2221 return Err(error::Error::RetryLater {
2222 source: BoxedError::new(
2223 error::UnexpectedSnafu {
2224 err_msg: "retry later",
2225 }
2226 .build(),
2227 ),
2228 clean_poisons: false,
2229 });
2230 }
2231
2232 Ok(api::region::RegionResponse::new(0))
2233 });
2234 let node_manager = Arc::new(MockDatanodeManager::new(datanode_handler));
2235
2236 env.create_physical_table_metadata_for_repartition(
2237 table_id,
2238 vec![
2239 test_region_route(
2240 RegionId::new(table_id, 1),
2241 &range_expr("x", 0, 100).as_json_str().unwrap(),
2242 ),
2243 test_region_route(
2244 RegionId::new(table_id, 2),
2245 &range_expr("x", 100, 200).as_json_str().unwrap(),
2246 ),
2247 ],
2248 test_region_wal_options(&[1, 2]),
2249 )
2250 .await;
2251
2252 let context = new_parent_context(&env, node_manager, table_id);
2253 let mut procedure = RepartitionProcedure::new(
2254 RepartitionFrom::Partitioned {
2255 exprs: vec![range_expr("x", 0, 100)],
2256 target_partition_columns: None,
2257 },
2258 vec![range_expr("x", 0, 50), range_expr("x", 50, 100)],
2259 context,
2260 );
2261
2262 let start_status = procedure
2263 .execute(&TestingEnv::procedure_context())
2264 .await
2265 .unwrap();
2266 assert!(!start_status.need_persist());
2267 let start_status = procedure
2268 .execute(&TestingEnv::procedure_context())
2269 .await
2270 .unwrap();
2271 assert!(start_status.need_persist());
2272 assert_parent_state::<AllocateRegion>(&procedure);
2273
2274 let err = procedure
2275 .execute(&TestingEnv::procedure_context())
2276 .await
2277 .unwrap_err();
2278 assert!(err.is_retry_later());
2279 assert_parent_state::<AllocateRegion>(&procedure);
2280 assert!(!procedure.context.persistent_ctx.plans.is_empty());
2281 assert_eq!(
2282 current_parent_region_routes(&procedure.context).await,
2283 vec![
2284 test_region_route(
2285 RegionId::new(table_id, 1),
2286 &range_expr("x", 0, 100).as_json_str().unwrap(),
2287 ),
2288 test_region_route(
2289 RegionId::new(table_id, 2),
2290 &range_expr("x", 100, 200).as_json_str().unwrap(),
2291 ),
2292 ]
2293 );
2294
2295 let allocate_status = procedure
2296 .execute(&TestingEnv::procedure_context())
2297 .await
2298 .unwrap();
2299 assert!(allocate_status.need_persist());
2300 assert_parent_state::<Dispatch>(&procedure);
2301
2302 assert_eq!(procedure.context.persistent_ctx.plans.len(), 1);
2303 let plan = &procedure.context.persistent_ctx.plans[0];
2304 let expected_plan = test_plan(table_id);
2305 assert_eq!(plan.source_regions, expected_plan.source_regions);
2306 assert_eq!(plan.target_regions, expected_plan.target_regions);
2307 assert_eq!(
2308 plan.allocated_region_ids,
2309 expected_plan.allocated_region_ids
2310 );
2311 assert_eq!(plan.transition_map, expected_plan.transition_map);
2312 assert_eq!(
2313 current_parent_region_routes(&procedure.context).await,
2314 vec![
2315 test_region_route(
2316 RegionId::new(table_id, 1),
2317 &range_expr("x", 0, 100).as_json_str().unwrap(),
2318 ),
2319 test_region_route(
2320 RegionId::new(table_id, 2),
2321 &range_expr("x", 100, 200).as_json_str().unwrap(),
2322 ),
2323 RegionRoute {
2324 region: Region {
2325 id: RegionId::new(table_id, 3),
2326 partition_expr: range_expr("x", 50, 100).as_json_str().unwrap(),
2327 ..Default::default()
2328 },
2329 leader_peer: Some(Peer::empty(0)),
2330 ..Default::default()
2331 },
2332 ]
2333 );
2334
2335 let dispatch_status = procedure
2336 .execute(&TestingEnv::procedure_context())
2337 .await
2338 .unwrap();
2339 assert!(!dispatch_status.need_persist());
2340 let subprocedure_ids = extract_subprocedure_ids(dispatch_status);
2341 assert_eq!(subprocedure_ids.len(), 1);
2342 assert_parent_state::<Collect>(&procedure);
2343 }
2344}