1pub mod allocate_region;
16pub mod collect;
17pub mod deallocate_region;
18pub mod dispatch;
19pub mod group;
20pub mod plan;
21pub mod repartition_end;
22pub mod repartition_start;
23pub mod utils;
24
25use std::any::Any;
26use std::collections::{HashMap, HashSet};
27use std::fmt::{Debug, Display};
28use std::time::{Duration, Instant};
29
30use common_error::ext::BoxedError;
31use common_meta::cache_invalidator::CacheInvalidatorRef;
32use common_meta::ddl::DdlContext;
33use common_meta::ddl::allocator::region_routes::RegionRoutesAllocatorRef;
34use common_meta::ddl::allocator::wal_options::WalOptionsAllocatorRef;
35use common_meta::ddl_manager::RepartitionProcedureFactory;
36use common_meta::instruction::CacheIdent;
37use common_meta::key::datanode_table::RegionInfo;
38use common_meta::key::table_info::TableInfoValue;
39use common_meta::key::table_route::TableRouteValue;
40use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
41use common_meta::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
42use common_meta::node_manager::NodeManagerRef;
43use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard};
44use common_meta::region_registry::LeaderRegionRegistryRef;
45use common_meta::rpc::router::{RegionRoute, operating_leader_region_roles};
46use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
47use common_procedure::{
48 BoxedProcedure, Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
49 ProcedureManagerRef, Result as ProcedureResult, Status, StringKey, UserMetadata,
50};
51use common_telemetry::{error, info, warn};
52use partition::expr::PartitionExpr;
53use serde::{Deserialize, Serialize};
54use snafu::{OptionExt, ResultExt};
55use store_api::storage::{RegionNumber, TableId};
56use table::table_name::TableName;
57
58use crate::error::{self, Result};
59use crate::procedure::repartition::collect::ProcedureMeta;
60use crate::procedure::repartition::deallocate_region::DeallocateRegion;
61use crate::procedure::repartition::group::{
62 Context as RepartitionGroupContext, RepartitionGroupProcedure, region_routes,
63};
64use crate::procedure::repartition::plan::RepartitionPlanEntry;
65use crate::procedure::repartition::repartition_start::RepartitionStart;
66use crate::procedure::repartition::utils::{
67 get_datanode_table_value, rollback_group_metadata_routes,
68};
69use crate::service::mailbox::MailboxRef;
70
71#[cfg(test)]
72pub mod test_util;
73
74#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
75pub struct PersistentContext {
76 pub catalog_name: String,
77 pub schema_name: String,
78 pub table_name: String,
79 pub table_id: TableId,
80 pub plans: Vec<RepartitionPlanEntry>,
81 #[serde(default)]
82 pub failed_procedures: Vec<ProcedureMeta>,
87 #[serde(default)]
88 pub unknown_procedures: Vec<ProcedureMeta>,
93 #[serde(with = "humantime_serde", default = "default_timeout")]
95 pub timeout: Duration,
96}
97
98fn default_timeout() -> Duration {
99 Duration::from_mins(2)
100}
101
102impl PersistentContext {
103 pub fn new(
107 TableName {
108 catalog_name,
109 schema_name,
110 table_name,
111 }: TableName,
112 table_id: TableId,
113 timeout: Option<Duration>,
114 ) -> Self {
115 Self {
116 catalog_name,
117 schema_name,
118 table_name,
119 table_id,
120 plans: vec![],
121 failed_procedures: vec![],
122 unknown_procedures: vec![],
123 timeout: timeout.unwrap_or_else(default_timeout),
124 }
125 }
126
127 pub fn lock_key(&self) -> Vec<StringKey> {
128 vec![
129 CatalogLock::Read(&self.catalog_name).into(),
130 SchemaLock::read(&self.catalog_name, &self.schema_name).into(),
131 TableLock::Write(self.table_id).into(),
132 TableNameLock::new(&self.catalog_name, &self.schema_name, &self.table_name).into(),
133 ]
134 }
135}
136
137#[derive(Clone)]
138pub struct Context {
139 pub persistent_ctx: PersistentContext,
140 pub volatile_ctx: VolatileContext,
141 pub table_metadata_manager: TableMetadataManagerRef,
142 pub memory_region_keeper: MemoryRegionKeeperRef,
143 pub node_manager: NodeManagerRef,
144 pub leader_region_registry: LeaderRegionRegistryRef,
145 pub mailbox: MailboxRef,
146 pub server_addr: String,
147 pub cache_invalidator: CacheInvalidatorRef,
148 pub region_routes_allocator: RegionRoutesAllocatorRef,
149 pub wal_options_allocator: WalOptionsAllocatorRef,
150 pub start_time: Instant,
151}
152
153#[derive(Debug, Clone, Default)]
154pub struct VolatileContext {
155 pub metrics: Metrics,
156 pub dispatch_start_time: Option<Instant>,
157}
158
159#[derive(Debug, Clone, Default)]
161pub struct Metrics {
162 build_plan_elapsed: Duration,
164 allocate_region_elapsed: Duration,
166 finish_groups_elapsed: Duration,
168 deallocate_region_elapsed: Duration,
170}
171
172impl Display for Metrics {
173 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
174 let total = self.build_plan_elapsed
175 + self.allocate_region_elapsed
176 + self.finish_groups_elapsed
177 + self.deallocate_region_elapsed;
178 write!(f, "total: {:?}", total)?;
179 let mut parts = Vec::with_capacity(4);
180 if self.build_plan_elapsed > Duration::ZERO {
181 parts.push(format!("build_plan_elapsed: {:?}", self.build_plan_elapsed));
182 }
183 if self.allocate_region_elapsed > Duration::ZERO {
184 parts.push(format!(
185 "allocate_region_elapsed: {:?}",
186 self.allocate_region_elapsed
187 ));
188 }
189 if self.finish_groups_elapsed > Duration::ZERO {
190 parts.push(format!(
191 "finish_groups_elapsed: {:?}",
192 self.finish_groups_elapsed
193 ));
194 }
195 if self.deallocate_region_elapsed > Duration::ZERO {
196 parts.push(format!(
197 "deallocate_region_elapsed: {:?}",
198 self.deallocate_region_elapsed
199 ));
200 }
201
202 if !parts.is_empty() {
203 write!(f, ", {}", parts.join(", "))?;
204 }
205 Ok(())
206 }
207}
208
209impl Metrics {
210 pub fn update_build_plan_elapsed(&mut self, elapsed: Duration) {
212 self.build_plan_elapsed += elapsed;
213 }
214
215 pub fn update_allocate_region_elapsed(&mut self, elapsed: Duration) {
217 self.allocate_region_elapsed += elapsed;
218 }
219
220 pub fn update_finish_groups_elapsed(&mut self, elapsed: Duration) {
222 self.finish_groups_elapsed += elapsed;
223 }
224
225 pub fn update_deallocate_region_elapsed(&mut self, elapsed: Duration) {
227 self.deallocate_region_elapsed += elapsed;
228 }
229}
230
231impl Context {
232 pub fn new(
233 ddl_ctx: &DdlContext,
234 mailbox: MailboxRef,
235 server_addr: String,
236 persistent_ctx: PersistentContext,
237 ) -> Self {
238 Self {
239 persistent_ctx,
240 table_metadata_manager: ddl_ctx.table_metadata_manager.clone(),
241 memory_region_keeper: ddl_ctx.memory_region_keeper.clone(),
242 node_manager: ddl_ctx.node_manager.clone(),
243 leader_region_registry: ddl_ctx.leader_region_registry.clone(),
244 mailbox,
245 server_addr,
246 cache_invalidator: ddl_ctx.cache_invalidator.clone(),
247 region_routes_allocator: ddl_ctx.table_metadata_allocator.region_routes_allocator(),
248 wal_options_allocator: ddl_ctx.table_metadata_allocator.wal_options_allocator(),
249 start_time: Instant::now(),
250 volatile_ctx: VolatileContext::default(),
251 }
252 }
253
254 pub fn next_operation_timeout(&self) -> Option<Duration> {
256 self.persistent_ctx
257 .timeout
258 .checked_sub(self.start_time.elapsed())
259 }
260
261 pub fn update_build_plan_elapsed(&mut self, elapsed: Duration) {
263 self.volatile_ctx.metrics.update_build_plan_elapsed(elapsed);
264 }
265
266 pub fn update_allocate_region_elapsed(&mut self, elapsed: Duration) {
268 self.volatile_ctx
269 .metrics
270 .update_allocate_region_elapsed(elapsed);
271 }
272
273 pub fn update_finish_groups_elapsed(&mut self, elapsed: Duration) {
275 self.volatile_ctx
276 .metrics
277 .update_finish_groups_elapsed(elapsed);
278 }
279
280 pub fn update_deallocate_region_elapsed(&mut self, elapsed: Duration) {
282 self.volatile_ctx
283 .metrics
284 .update_deallocate_region_elapsed(elapsed);
285 }
286
287 pub async fn get_table_route_value(
295 &self,
296 ) -> Result<DeserializedValueWithBytes<TableRouteValue>> {
297 let table_id = self.persistent_ctx.table_id;
298 let table_route_value = self
299 .table_metadata_manager
300 .table_route_manager()
301 .table_route_storage()
302 .get_with_raw_bytes(table_id)
303 .await
304 .map_err(BoxedError::new)
305 .with_context(|_| error::RetryLaterWithSourceSnafu {
306 reason: format!("Failed to get table route for table: {}", table_id),
307 })?
308 .context(error::TableRouteNotFoundSnafu { table_id })?;
309
310 Ok(table_route_value)
311 }
312
313 pub async fn get_table_info_value(&self) -> Result<TableInfoValue> {
321 let table_id = self.persistent_ctx.table_id;
322 let table_info_value = self
323 .table_metadata_manager
324 .table_info_manager()
325 .get(table_id)
326 .await
327 .map_err(BoxedError::new)
328 .with_context(|_| error::RetryLaterWithSourceSnafu {
329 reason: format!("Failed to get table info for table: {}", table_id),
330 })?
331 .context(error::TableInfoNotFoundSnafu { table_id })?
332 .into_inner();
333 Ok(table_info_value)
334 }
335
336 pub async fn update_table_route(
345 &self,
346 current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
347 new_region_routes: Vec<RegionRoute>,
348 new_region_wal_options: HashMap<RegionNumber, String>,
349 ) -> Result<()> {
350 let table_id = self.persistent_ctx.table_id;
351 if new_region_routes.is_empty() {
352 return error::UnexpectedSnafu {
353 violated: format!("new_region_routes is empty for table: {}", table_id),
354 }
355 .fail();
356 }
357 let datanode_id = new_region_routes
358 .first()
359 .unwrap()
360 .leader_peer
361 .as_ref()
362 .context(error::NoLeaderSnafu)?
363 .id;
364 let datanode_table_value =
365 get_datanode_table_value(&self.table_metadata_manager, table_id, datanode_id).await?;
366
367 let RegionInfo {
368 region_options,
369 region_wal_options,
370 ..
371 } = &datanode_table_value.region_info;
372
373 let validated_region_wal_options =
375 crate::procedure::repartition::utils::merge_and_validate_region_wal_options(
376 region_wal_options,
377 new_region_wal_options,
378 &new_region_routes,
379 table_id,
380 )?;
381 info!(
382 "Updating table route for table: {}, new region routes: {:?}",
383 table_id, new_region_routes
384 );
385 self.table_metadata_manager
386 .update_table_route(
387 table_id,
388 datanode_table_value.region_info.clone(),
389 current_table_route_value,
390 new_region_routes,
391 region_options,
392 &validated_region_wal_options,
393 )
394 .await
395 .context(error::TableMetadataManagerSnafu)
396 }
397
398 pub async fn invalidate_table_cache(&self) -> Result<()> {
400 let table_id = self.persistent_ctx.table_id;
401 let subject = format!(
402 "Invalidate table cache for repartition table, table: {}",
403 table_id,
404 );
405 let ctx = common_meta::cache_invalidator::Context {
406 subject: Some(subject),
407 };
408 let _ = self
409 .cache_invalidator
410 .invalidate(&ctx, &[CacheIdent::TableId(table_id)])
411 .await;
412 Ok(())
413 }
414
415 pub fn register_operating_regions(
416 memory_region_keeper: &MemoryRegionKeeperRef,
417 region_routes: &[RegionRoute],
418 ) -> Result<Vec<OperatingRegionGuard>> {
419 let mut operating_guards = Vec::with_capacity(region_routes.len());
420 for (region_id, datanode_id, role) in operating_leader_region_roles(region_routes) {
421 let guard = memory_region_keeper
422 .register_with_role(datanode_id, region_id, role)
423 .context(error::RegionOperatingRaceSnafu {
424 peer_id: datanode_id,
425 region_id,
426 })?;
427 operating_guards.push(guard);
428 }
429 Ok(operating_guards)
430 }
431}
432
433#[async_trait::async_trait]
434#[typetag::serde(tag = "repartition_state")]
435pub(crate) trait State: Sync + Send + Debug {
436 fn name(&self) -> &'static str {
437 let type_name = std::any::type_name::<Self>();
438 type_name.split("::").last().unwrap_or(type_name)
440 }
441
442 async fn next(
444 &mut self,
445 ctx: &mut Context,
446 procedure_ctx: &ProcedureContext,
447 ) -> Result<(Box<dyn State>, Status)>;
448
449 fn as_any(&self) -> &dyn Any;
450}
451
452pub struct RepartitionProcedure {
453 state: Box<dyn State>,
454 context: Context,
455}
456
457#[derive(Debug, Serialize)]
458struct RepartitionData<'a> {
459 state: &'a dyn State,
460 persistent_ctx: &'a PersistentContext,
461}
462
463#[derive(Debug, Deserialize)]
464struct RepartitionDataOwned {
465 state: Box<dyn State>,
466 persistent_ctx: PersistentContext,
467}
468
469impl RepartitionProcedure {
470 const TYPE_NAME: &'static str = "metasrv-procedure::Repartition";
471
472 pub fn new(
473 from_exprs: Vec<PartitionExpr>,
474 to_exprs: Vec<PartitionExpr>,
475 context: Context,
476 ) -> Self {
477 let state = Box::new(RepartitionStart::new(from_exprs, to_exprs));
478
479 Self { state, context }
480 }
481
482 pub fn from_json<F>(json: &str, ctx_factory: F) -> ProcedureResult<Self>
483 where
484 F: FnOnce(PersistentContext) -> Context,
485 {
486 let RepartitionDataOwned {
487 state,
488 persistent_ctx,
489 } = serde_json::from_str(json).context(FromJsonSnafu)?;
490 let context = ctx_factory(persistent_ctx);
491
492 Ok(Self { state, context })
493 }
494
495 fn should_rollback_allocated_regions(&self) -> bool {
512 self.state.as_any().is::<allocate_region::AllocateRegion>()
513 || self.state.as_any().is::<dispatch::Dispatch>()
514 || self.state.as_any().is::<collect::Collect>()
515 }
516
517 fn rollback_plan_indices(&self) -> HashSet<usize> {
518 self.context
519 .persistent_ctx
520 .failed_procedures
521 .iter()
522 .chain(self.context.persistent_ctx.unknown_procedures.iter())
523 .map(|procedure_meta| procedure_meta.plan_index)
524 .collect()
525 }
526
527 fn rollback_allocated_region_ids(&self) -> HashSet<store_api::storage::RegionId> {
535 if self.state.as_any().is::<allocate_region::AllocateRegion>()
536 || self.state.as_any().is::<dispatch::Dispatch>()
537 {
538 return self
539 .context
540 .persistent_ctx
541 .plans
542 .iter()
543 .flat_map(|plan| plan.allocated_region_ids.iter().copied())
544 .collect();
545 }
546
547 self.rollback_plan_indices()
548 .into_iter()
549 .flat_map(|plan_index| {
550 self.context.persistent_ctx.plans[plan_index]
551 .allocated_region_ids
552 .iter()
553 .copied()
554 })
555 .collect()
556 }
557
558 async fn rollback_group_metadata_for_selected_plans(
562 &mut self,
563 region_routes: &mut [RegionRoute],
564 ) -> Result<()> {
565 let rollback_plan_indices = self.rollback_plan_indices();
566 if rollback_plan_indices.is_empty() {
567 return Ok(());
568 }
569
570 let mut region_routes_map = region_routes
571 .iter_mut()
572 .map(|route| (route.region.id, route))
573 .collect::<HashMap<_, _>>();
574 for plan_index in rollback_plan_indices {
575 let plan = &self.context.persistent_ctx.plans[plan_index];
576 rollback_group_metadata_routes(
577 plan.group_id,
578 &plan.source_regions,
579 &plan.original_target_routes,
580 &plan.allocated_region_ids,
581 &plan.pending_deallocate_region_ids,
582 &mut region_routes_map,
583 )?;
584 }
585
586 Ok(())
587 }
588
589 async fn rollback_inner(&mut self, procedure_ctx: &ProcedureContext) -> Result<()> {
590 if !self.should_rollback_allocated_regions() {
591 return Ok(());
592 }
593
594 let table_id = self.context.persistent_ctx.table_id;
595 let allocated_region_ids = self.rollback_allocated_region_ids();
596
597 let table_lock = TableLock::Write(table_id).into();
598 let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
599 let table_route_value = self.context.get_table_route_value().await?;
600 let original_region_routes = region_routes(table_id, table_route_value.get_inner_ref())?;
601 let mut current_region_routes = original_region_routes.clone();
602 self.rollback_group_metadata_for_selected_plans(&mut current_region_routes)
603 .await?;
604 let allocated_region_routes = DeallocateRegion::filter_deallocatable_region_routes(
605 table_id,
606 ¤t_region_routes,
607 &allocated_region_ids,
608 );
609 if !allocated_region_routes.is_empty() {
610 let table = TableName {
611 catalog_name: self.context.persistent_ctx.catalog_name.clone(),
612 schema_name: self.context.persistent_ctx.schema_name.clone(),
613 table_name: self.context.persistent_ctx.table_name.clone(),
614 };
615 if let Err(err) = DeallocateRegion::deallocate_regions(
618 &self.context.node_manager,
619 &self.context.leader_region_registry,
620 table,
621 table_id,
622 &allocated_region_routes,
623 )
624 .await
625 {
626 warn!(err; "Failed to drop allocated regions during repartition rollback, table_id: {}, regions: {:?}", table_id, allocated_region_ids);
627 }
628 }
629
630 let new_region_routes =
631 DeallocateRegion::generate_region_routes(¤t_region_routes, &allocated_region_ids);
632
633 if new_region_routes != *original_region_routes {
634 self.context
635 .update_table_route(&table_route_value, new_region_routes, HashMap::new())
636 .await
637 .map_err(BoxedError::new)
638 .with_context(|_| error::RetryLaterWithSourceSnafu {
639 reason: format!(
640 "Failed to rollback allocated region routes for repartition table: {}",
641 table_id
642 ),
643 })?;
644 }
645
646 if let Err(err) = self.context.invalidate_table_cache().await {
647 warn!(err; "Failed to invalidate table cache during repartition rollback, table_id: {}", table_id);
648 }
649
650 Ok(())
651 }
652}
653
654#[async_trait::async_trait]
655impl Procedure for RepartitionProcedure {
656 fn type_name(&self) -> &str {
657 Self::TYPE_NAME
658 }
659
660 #[tracing::instrument(skip_all, fields(
661 state = %self.state.name(),
662 table_id = %self.context.persistent_ctx.table_id
663 ))]
664 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
665 let state = &mut self.state;
666 let state_name = state.name();
667 common_telemetry::info!(
669 "Repartition procedure executing state: {}, table_id: {}",
670 state_name,
671 self.context.persistent_ctx.table_id
672 );
673 match state.next(&mut self.context, _ctx).await {
674 Ok((next, status)) => {
675 *state = next;
676 Ok(status)
677 }
678 Err(e) => {
679 if e.is_retryable() {
680 Err(ProcedureError::retry_later(e))
681 } else {
682 error!(
683 e;
684 "Repartition procedure failed, table id: {}",
685 self.context.persistent_ctx.table_id,
686 );
687 Err(ProcedureError::external(e))
688 }
689 }
690 }
691 }
692
693 async fn rollback(&mut self, ctx: &ProcedureContext) -> ProcedureResult<()> {
694 self.rollback_inner(ctx)
695 .await
696 .map_err(ProcedureError::external)
697 }
698
699 fn rollback_supported(&self) -> bool {
700 true
701 }
702
703 fn dump(&self) -> ProcedureResult<String> {
704 let data = RepartitionData {
705 state: self.state.as_ref(),
706 persistent_ctx: &self.context.persistent_ctx,
707 };
708 serde_json::to_string(&data).context(ToJsonSnafu)
709 }
710
711 fn lock_key(&self) -> LockKey {
712 LockKey::new(self.context.persistent_ctx.lock_key())
713 }
714
715 fn user_metadata(&self) -> Option<UserMetadata> {
716 None
718 }
719}
720
721pub struct DefaultRepartitionProcedureFactory {
722 mailbox: MailboxRef,
723 server_addr: String,
724}
725
726impl DefaultRepartitionProcedureFactory {
727 pub fn new(mailbox: MailboxRef, server_addr: String) -> Self {
728 Self {
729 mailbox,
730 server_addr,
731 }
732 }
733}
734
735impl RepartitionProcedureFactory for DefaultRepartitionProcedureFactory {
736 fn create(
737 &self,
738 ddl_ctx: &DdlContext,
739 table_name: TableName,
740 table_id: TableId,
741 from_exprs: Vec<String>,
742 to_exprs: Vec<String>,
743 timeout: Option<Duration>,
744 ) -> std::result::Result<BoxedProcedure, BoxedError> {
745 let persistent_ctx = PersistentContext::new(table_name, table_id, timeout);
746 let from_exprs = from_exprs
747 .iter()
748 .map(|e| {
749 PartitionExpr::from_json_str(e)
750 .context(error::DeserializePartitionExprSnafu)?
751 .context(error::EmptyPartitionExprSnafu)
752 })
753 .collect::<Result<Vec<_>>>()
754 .map_err(BoxedError::new)?;
755 let to_exprs = to_exprs
756 .iter()
757 .map(|e| {
758 PartitionExpr::from_json_str(e)
759 .context(error::DeserializePartitionExprSnafu)?
760 .context(error::EmptyPartitionExprSnafu)
761 })
762 .collect::<Result<Vec<_>>>()
763 .map_err(BoxedError::new)?;
764
765 let procedure = RepartitionProcedure::new(
766 from_exprs,
767 to_exprs,
768 Context::new(
769 ddl_ctx,
770 self.mailbox.clone(),
771 self.server_addr.clone(),
772 persistent_ctx,
773 ),
774 );
775
776 Ok(Box::new(procedure))
777 }
778
779 fn register_loaders(
780 &self,
781 ddl_ctx: &DdlContext,
782 procedure_manager: &ProcedureManagerRef,
783 ) -> std::result::Result<(), BoxedError> {
784 let mailbox = self.mailbox.clone();
786 let server_addr = self.server_addr.clone();
787 let moved_ddl_ctx = ddl_ctx.clone();
788 procedure_manager
789 .register_loader(
790 RepartitionProcedure::TYPE_NAME,
791 Box::new(move |json| {
792 let mailbox = mailbox.clone();
793 let server_addr = server_addr.clone();
794 let ddl_ctx = moved_ddl_ctx.clone();
795 let factory = move |persistent_ctx| {
796 Context::new(&ddl_ctx, mailbox, server_addr, persistent_ctx)
797 };
798 RepartitionProcedure::from_json(json, factory).map(|p| Box::new(p) as _)
799 }),
800 )
801 .map_err(BoxedError::new)?;
802
803 let mailbox = self.mailbox.clone();
805 let server_addr = self.server_addr.clone();
806 let moved_ddl_ctx = ddl_ctx.clone();
807 procedure_manager
808 .register_loader(
809 RepartitionGroupProcedure::TYPE_NAME,
810 Box::new(move |json| {
811 let mailbox = mailbox.clone();
812 let server_addr = server_addr.clone();
813 let ddl_ctx = moved_ddl_ctx.clone();
814 let factory = move |persistent_ctx| {
815 RepartitionGroupContext::new(&ddl_ctx, mailbox, server_addr, persistent_ctx)
816 };
817 RepartitionGroupProcedure::from_json(json, factory).map(|p| Box::new(p) as _)
818 }),
819 )
820 .map_err(BoxedError::new)?;
821
822 Ok(())
823 }
824}
825
826#[cfg(test)]
827mod tests {
828 use std::collections::HashMap;
829 use std::sync::Arc;
830 use std::sync::atomic::{AtomicBool, Ordering};
831
832 use common_error::ext::BoxedError;
833 use common_error::mock::MockError;
834 use common_error::status_code::StatusCode;
835 use common_meta::ddl::test_util::datanode_handler::{
836 DatanodeWatcher, NaiveDatanodeHandler, UnexpectedErrorDatanodeHandler,
837 };
838 use common_meta::error;
839 use common_meta::peer::Peer;
840 use common_meta::region_keeper::MemoryRegionKeeper;
841 use common_meta::rpc::router::{LeaderState, Region, RegionRoute};
842 use common_meta::test_util::MockDatanodeManager;
843 use common_procedure::{Error as ProcedureError, Procedure, ProcedureId, ProcedureState};
844 use store_api::region_engine::RegionRole;
845 use store_api::storage::RegionId;
846 use table::table_name::TableName;
847 use tokio::sync::mpsc;
848 use uuid::Uuid;
849
850 use super::*;
851 use crate::procedure::repartition::allocate_region::AllocateRegion;
852 use crate::procedure::repartition::collect::Collect;
853 use crate::procedure::repartition::deallocate_region::DeallocateRegion;
854 use crate::procedure::repartition::dispatch::Dispatch;
855 use crate::procedure::repartition::group::update_metadata::UpdateMetadata;
856 use crate::procedure::repartition::plan::RegionDescriptor;
857 use crate::procedure::repartition::repartition_end::RepartitionEnd;
858 use crate::procedure::repartition::test_util::{
859 TestingEnv, assert_parent_state, current_parent_region_routes, extract_subprocedure_ids,
860 new_parent_context, procedure_context_with_receivers, procedure_state_receiver, range_expr,
861 test_region_route, test_region_wal_options,
862 };
863
864 fn test_plan(table_id: TableId) -> RepartitionPlanEntry {
865 RepartitionPlanEntry {
866 group_id: uuid::Uuid::new_v4(),
867 source_regions: vec![RegionDescriptor {
868 region_id: RegionId::new(table_id, 1),
869 partition_expr: range_expr("x", 0, 100),
870 }],
871 target_regions: vec![
872 RegionDescriptor {
873 region_id: RegionId::new(table_id, 1),
874 partition_expr: range_expr("x", 0, 50),
875 },
876 RegionDescriptor {
877 region_id: RegionId::new(table_id, 3),
878 partition_expr: range_expr("x", 50, 100),
879 },
880 ],
881 allocated_region_ids: vec![RegionId::new(table_id, 3)],
882 pending_deallocate_region_ids: vec![],
883 transition_map: vec![vec![0, 1]],
884 original_target_routes: vec![],
885 }
886 }
887
888 fn with_rollback_metadata(
889 mut plan: RepartitionPlanEntry,
890 original_target_routes: Vec<RegionRoute>,
891 ) -> RepartitionPlanEntry {
892 plan.original_target_routes = original_target_routes;
893 plan
894 }
895
896 fn apply_group_staging(
897 plan: &RepartitionPlanEntry,
898 current_region_routes: &[RegionRoute],
899 ) -> Vec<RegionRoute> {
900 UpdateMetadata::apply_staging_region_routes(
901 plan.group_id,
902 &plan.source_regions,
903 &plan.target_regions,
904 &plan.pending_deallocate_region_ids,
905 current_region_routes,
906 )
907 .unwrap()
908 }
909
910 fn exit_group_staging(
911 plan: &RepartitionPlanEntry,
912 current_region_routes: &[RegionRoute],
913 ) -> Vec<RegionRoute> {
914 UpdateMetadata::exit_staging_region_routes(
915 plan.group_id,
916 &plan.source_regions,
917 &plan.target_regions,
918 current_region_routes,
919 )
920 .unwrap()
921 }
922
923 fn region_route_by_id(region_routes: &[RegionRoute], region_id: RegionId) -> &RegionRoute {
924 region_routes
925 .iter()
926 .find(|route| route.region.id == region_id)
927 .unwrap()
928 }
929
930 fn test_procedure(state: Box<dyn State>, context: Context) -> RepartitionProcedure {
931 RepartitionProcedure { state, context }
932 }
933
934 fn test_context(env: &TestingEnv, table_id: TableId) -> Context {
935 let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
936 let ddl_ctx = env.ddl_context(node_manager);
937 let persistent_ctx = PersistentContext::new(
938 TableName::new("test_catalog", "test_schema", "test_table"),
939 table_id,
940 None,
941 );
942
943 Context::new(
944 &ddl_ctx,
945 env.mailbox_ctx.mailbox().clone(),
946 env.server_addr.clone(),
947 persistent_ctx,
948 )
949 }
950
951 #[test]
952 fn test_filter_allocated_region_routes() {
953 let table_id = 1024;
954 let region_routes = vec![
955 test_region_route(RegionId::new(table_id, 1), "a"),
956 test_region_route(RegionId::new(table_id, 2), "b"),
957 ];
958 let allocated_region_ids = HashSet::from([RegionId::new(table_id, 2)]);
959
960 let new_region_routes =
961 DeallocateRegion::generate_region_routes(®ion_routes, &allocated_region_ids);
962
963 assert_eq!(new_region_routes.len(), 1);
964 assert_eq!(new_region_routes[0].region.id, RegionId::new(table_id, 1));
965 }
966
967 #[test]
968 fn test_should_rollback_allocated_regions() {
969 let env = TestingEnv::new();
970 let table_id = 1024;
971
972 let procedure = test_procedure(
973 Box::new(RepartitionStart::new(vec![], vec![])),
974 test_context(&env, table_id),
975 );
976 assert!(!procedure.should_rollback_allocated_regions());
977
978 let procedure = test_procedure(
979 Box::new(AllocateRegion::new(vec![])),
980 test_context(&env, table_id),
981 );
982 assert!(procedure.should_rollback_allocated_regions());
983
984 let procedure = test_procedure(Box::new(Dispatch), test_context(&env, table_id));
985 assert!(procedure.should_rollback_allocated_regions());
986
987 let procedure =
988 test_procedure(Box::new(Collect::new(vec![])), test_context(&env, table_id));
989 assert!(procedure.should_rollback_allocated_regions());
990
991 let procedure = test_procedure(Box::new(DeallocateRegion), test_context(&env, table_id));
992 assert!(!procedure.should_rollback_allocated_regions());
993
994 let procedure = test_procedure(Box::new(RepartitionEnd), test_context(&env, table_id));
995 assert!(!procedure.should_rollback_allocated_regions());
996 }
997
998 #[test]
999 fn test_register_operating_regions_preserves_route_roles() {
1000 let keeper = Arc::new(MemoryRegionKeeper::new());
1001 let region_routes = vec![
1002 RegionRoute {
1003 region: Region::new_test(RegionId::new(1024, 1)),
1004 leader_peer: Some(Peer::empty(1)),
1005 follower_peers: vec![],
1006 leader_state: None,
1007 leader_down_since: None,
1008 write_route_policy: None,
1009 },
1010 RegionRoute {
1011 region: Region::new_test(RegionId::new(1024, 2)),
1012 leader_peer: Some(Peer::empty(2)),
1013 follower_peers: vec![],
1014 leader_state: Some(LeaderState::Staging),
1015 leader_down_since: None,
1016 write_route_policy: None,
1017 },
1018 RegionRoute {
1019 region: Region::new_test(RegionId::new(1024, 3)),
1020 leader_peer: Some(Peer::empty(3)),
1021 follower_peers: vec![],
1022 leader_state: Some(LeaderState::Downgrading),
1023 leader_down_since: None,
1024 write_route_policy: None,
1025 },
1026 ];
1027
1028 let _guards = Context::register_operating_regions(&keeper, ®ion_routes).unwrap();
1029
1030 let leader_roles =
1031 keeper.extract_operating_region_roles(1, &HashSet::from([RegionId::new(1024, 1)]));
1032 let staging_roles =
1033 keeper.extract_operating_region_roles(2, &HashSet::from([RegionId::new(1024, 2)]));
1034 let downgrading_roles =
1035 keeper.extract_operating_region_roles(3, &HashSet::from([RegionId::new(1024, 3)]));
1036
1037 assert_eq!(
1038 leader_roles.get(&RegionId::new(1024, 1)),
1039 Some(&RegionRole::Leader)
1040 );
1041 assert_eq!(
1042 staging_roles.get(&RegionId::new(1024, 2)),
1043 Some(&RegionRole::StagingLeader)
1044 );
1045 assert_eq!(
1046 downgrading_roles.get(&RegionId::new(1024, 3)),
1047 Some(&RegionRole::DowngradingLeader)
1048 );
1049 }
1050
1051 #[tokio::test]
1052 async fn test_repartition_rollback_removes_allocated_routes_from_dispatch() {
1053 let env = TestingEnv::new();
1054 let table_id = 1024;
1055 let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
1056 let ddl_ctx = env.ddl_context(node_manager);
1057 let original_region_routes = vec![
1058 test_region_route(
1059 RegionId::new(table_id, 1),
1060 &range_expr("x", 0, 100).as_json_str().unwrap(),
1061 ),
1062 test_region_route(
1063 RegionId::new(table_id, 2),
1064 &range_expr("x", 50, 100).as_json_str().unwrap(),
1065 ),
1066 test_region_route(RegionId::new(table_id, 3), ""),
1067 ];
1068 env.create_physical_table_metadata_with_wal_options(
1069 table_id,
1070 original_region_routes,
1071 test_region_wal_options(&[1, 2]),
1072 )
1073 .await;
1074
1075 let mut persistent_ctx = PersistentContext::new(
1076 TableName::new("test_catalog", "test_schema", "test_table"),
1077 table_id,
1078 None,
1079 );
1080 persistent_ctx.plans = vec![with_rollback_metadata(
1081 test_plan(table_id),
1082 vec![
1083 test_region_route(
1084 RegionId::new(table_id, 1),
1085 &range_expr("x", 0, 100).as_json_str().unwrap(),
1086 ),
1087 test_region_route(RegionId::new(table_id, 3), ""),
1088 ],
1089 )];
1090 persistent_ctx.failed_procedures = vec![ProcedureMeta {
1091 plan_index: 0,
1092 group_id: Uuid::new_v4(),
1093 procedure_id: ProcedureId::random(),
1094 }];
1095 let context = Context::new(
1096 &ddl_ctx,
1097 env.mailbox_ctx.mailbox().clone(),
1098 env.server_addr.clone(),
1099 persistent_ctx,
1100 );
1101 let mut procedure = RepartitionProcedure {
1102 state: Box::new(Dispatch),
1103 context,
1104 };
1105
1106 procedure
1107 .rollback(&TestingEnv::procedure_context())
1108 .await
1109 .unwrap();
1110
1111 let region_routes = current_parent_region_routes(&procedure.context).await;
1112 assert_eq!(region_routes.len(), 2);
1113 assert_eq!(region_routes[0].region.id, RegionId::new(table_id, 1));
1114 assert_eq!(region_routes[1].region.id, RegionId::new(table_id, 2));
1115 }
1116
1117 #[tokio::test]
1118 async fn test_repartition_rollback_removes_allocated_routes_from_allocate() {
1119 let env = TestingEnv::new();
1120 let table_id = 1024;
1121 let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
1122 let ddl_ctx = env.ddl_context(node_manager);
1123 let original_region_routes = vec![
1124 test_region_route(
1125 RegionId::new(table_id, 1),
1126 &range_expr("x", 0, 100).as_json_str().unwrap(),
1127 ),
1128 test_region_route(
1129 RegionId::new(table_id, 2),
1130 &range_expr("x", 50, 100).as_json_str().unwrap(),
1131 ),
1132 test_region_route(RegionId::new(table_id, 3), ""),
1133 ];
1134 env.create_physical_table_metadata_with_wal_options(
1135 table_id,
1136 original_region_routes,
1137 test_region_wal_options(&[1, 2]),
1138 )
1139 .await;
1140
1141 let mut persistent_ctx = PersistentContext::new(
1142 TableName::new("test_catalog", "test_schema", "test_table"),
1143 table_id,
1144 None,
1145 );
1146 persistent_ctx.plans = vec![test_plan(table_id)];
1147 let context = Context::new(
1148 &ddl_ctx,
1149 env.mailbox_ctx.mailbox().clone(),
1150 env.server_addr.clone(),
1151 persistent_ctx,
1152 );
1153 let mut procedure = RepartitionProcedure {
1154 state: Box::new(AllocateRegion::new(vec![])),
1155 context,
1156 };
1157
1158 procedure
1159 .rollback(&TestingEnv::procedure_context())
1160 .await
1161 .unwrap();
1162
1163 let region_routes = current_parent_region_routes(&procedure.context).await;
1164 assert_eq!(region_routes.len(), 2);
1165 assert_eq!(region_routes[0].region.id, RegionId::new(table_id, 1));
1166 assert_eq!(region_routes[1].region.id, RegionId::new(table_id, 2));
1167 }
1168
1169 #[tokio::test]
1170 async fn test_repartition_rollback_from_collect_only_removes_failed_allocated_routes() {
1171 let env = TestingEnv::new();
1172 let table_id = 1024;
1173 let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
1174 let ddl_ctx = env.ddl_context(node_manager);
1175 let original_region_routes = vec![
1176 test_region_route(
1177 RegionId::new(table_id, 1),
1178 &range_expr("x", 0, 100).as_json_str().unwrap(),
1179 ),
1180 test_region_route(
1181 RegionId::new(table_id, 2),
1182 &range_expr("x", 100, 200).as_json_str().unwrap(),
1183 ),
1184 test_region_route(RegionId::new(table_id, 3), ""),
1185 test_region_route(RegionId::new(table_id, 4), ""),
1186 ];
1187 env.create_physical_table_metadata_with_wal_options(
1188 table_id,
1189 original_region_routes,
1190 test_region_wal_options(&[1, 2, 3, 4]),
1191 )
1192 .await;
1193
1194 let mut persistent_ctx = PersistentContext::new(
1195 TableName::new("test_catalog", "test_schema", "test_table"),
1196 table_id,
1197 None,
1198 );
1199 let failed_plan = test_plan(table_id);
1200 let failed_plan = with_rollback_metadata(
1201 failed_plan,
1202 vec![
1203 test_region_route(
1204 RegionId::new(table_id, 1),
1205 &range_expr("x", 0, 100).as_json_str().unwrap(),
1206 ),
1207 test_region_route(RegionId::new(table_id, 3), ""),
1208 ],
1209 );
1210 let succeeded_plan = RepartitionPlanEntry {
1211 group_id: Uuid::new_v4(),
1212 source_regions: vec![RegionDescriptor {
1213 region_id: RegionId::new(table_id, 2),
1214 partition_expr: range_expr("x", 100, 200),
1215 }],
1216 target_regions: vec![
1217 RegionDescriptor {
1218 region_id: RegionId::new(table_id, 2),
1219 partition_expr: range_expr("x", 100, 150),
1220 },
1221 RegionDescriptor {
1222 region_id: RegionId::new(table_id, 4),
1223 partition_expr: range_expr("x", 150, 200),
1224 },
1225 ],
1226 allocated_region_ids: vec![RegionId::new(table_id, 4)],
1227 pending_deallocate_region_ids: vec![],
1228 transition_map: vec![vec![0]],
1229 original_target_routes: vec![
1230 test_region_route(
1231 RegionId::new(table_id, 2),
1232 &range_expr("x", 100, 200).as_json_str().unwrap(),
1233 ),
1234 test_region_route(RegionId::new(table_id, 4), ""),
1235 ],
1236 };
1237 persistent_ctx.plans = vec![failed_plan, succeeded_plan];
1238 persistent_ctx.failed_procedures = vec![ProcedureMeta {
1239 plan_index: 0,
1240 group_id: persistent_ctx.plans[0].group_id,
1241 procedure_id: ProcedureId::random(),
1242 }];
1243
1244 let context = Context::new(
1245 &ddl_ctx,
1246 env.mailbox_ctx.mailbox().clone(),
1247 env.server_addr.clone(),
1248 persistent_ctx,
1249 );
1250 let mut procedure = RepartitionProcedure {
1251 state: Box::new(Collect::new(vec![])),
1252 context,
1253 };
1254
1255 procedure
1256 .rollback(&TestingEnv::procedure_context())
1257 .await
1258 .unwrap();
1259
1260 let region_routes = current_parent_region_routes(&procedure.context).await;
1261 assert_eq!(region_routes.len(), 3);
1262 assert_eq!(region_routes[0].region.id, RegionId::new(table_id, 1));
1263 assert_eq!(region_routes[1].region.id, RegionId::new(table_id, 2));
1264 assert_eq!(region_routes[2].region.id, RegionId::new(table_id, 4));
1265 }
1266
1267 #[tokio::test]
1268 async fn test_repartition_rollback_from_collect_restores_failed_group_metadata_only() {
1269 let env = TestingEnv::new();
1270 let table_id = 1024;
1271 let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
1272 let ddl_ctx = env.ddl_context(node_manager);
1273 let original_region_routes = vec![
1274 test_region_route(
1275 RegionId::new(table_id, 1),
1276 &range_expr("x", 0, 100).as_json_str().unwrap(),
1277 ),
1278 test_region_route(
1279 RegionId::new(table_id, 2),
1280 &range_expr("x", 100, 200).as_json_str().unwrap(),
1281 ),
1282 test_region_route(RegionId::new(table_id, 3), ""),
1283 test_region_route(RegionId::new(table_id, 4), ""),
1284 ];
1285
1286 let failed_plan = with_rollback_metadata(
1287 test_plan(table_id),
1288 vec![
1289 original_region_routes[0].clone(),
1290 original_region_routes[2].clone(),
1291 ],
1292 );
1293 let succeeded_plan = RepartitionPlanEntry {
1294 group_id: Uuid::new_v4(),
1295 source_regions: vec![RegionDescriptor {
1296 region_id: RegionId::new(table_id, 2),
1297 partition_expr: range_expr("x", 100, 200),
1298 }],
1299 target_regions: vec![
1300 RegionDescriptor {
1301 region_id: RegionId::new(table_id, 2),
1302 partition_expr: range_expr("x", 100, 150),
1303 },
1304 RegionDescriptor {
1305 region_id: RegionId::new(table_id, 4),
1306 partition_expr: range_expr("x", 150, 200),
1307 },
1308 ],
1309 allocated_region_ids: vec![RegionId::new(table_id, 4)],
1310 pending_deallocate_region_ids: vec![],
1311 transition_map: vec![vec![0, 1]],
1312 original_target_routes: vec![
1313 original_region_routes[1].clone(),
1314 original_region_routes[3].clone(),
1315 ],
1316 };
1317 let current_region_routes = apply_group_staging(&failed_plan, &original_region_routes);
1318 let current_region_routes = apply_group_staging(&succeeded_plan, ¤t_region_routes);
1319 let current_region_routes = exit_group_staging(&succeeded_plan, ¤t_region_routes);
1320 env.create_physical_table_metadata_with_wal_options(
1321 table_id,
1322 current_region_routes,
1323 test_region_wal_options(&[1, 2, 3, 4]),
1324 )
1325 .await;
1326
1327 let mut persistent_ctx = PersistentContext::new(
1328 TableName::new("test_catalog", "test_schema", "test_table"),
1329 table_id,
1330 None,
1331 );
1332 persistent_ctx.plans = vec![failed_plan, succeeded_plan.clone()];
1333 persistent_ctx.failed_procedures = vec![ProcedureMeta {
1334 plan_index: 0,
1335 group_id: persistent_ctx.plans[0].group_id,
1336 procedure_id: ProcedureId::random(),
1337 }];
1338
1339 let context = Context::new(
1340 &ddl_ctx,
1341 env.mailbox_ctx.mailbox().clone(),
1342 env.server_addr.clone(),
1343 persistent_ctx,
1344 );
1345 let mut procedure = RepartitionProcedure {
1346 state: Box::new(Collect::new(vec![])),
1347 context,
1348 };
1349
1350 procedure
1351 .rollback(&TestingEnv::procedure_context())
1352 .await
1353 .unwrap();
1354
1355 assert_eq!(
1356 current_parent_region_routes(&procedure.context).await,
1357 vec![
1358 test_region_route(
1359 RegionId::new(table_id, 1),
1360 &range_expr("x", 0, 100).as_json_str().unwrap(),
1361 ),
1362 RegionRoute {
1363 region: Region {
1364 id: RegionId::new(table_id, 2),
1365 partition_expr: range_expr("x", 100, 150).as_json_str().unwrap(),
1366 ..Default::default()
1367 },
1368 leader_peer: Some(Peer::empty(1)),
1369 ..Default::default()
1370 },
1371 RegionRoute {
1372 region: Region {
1373 id: RegionId::new(table_id, 4),
1374 partition_expr: range_expr("x", 150, 200).as_json_str().unwrap(),
1375 ..Default::default()
1376 },
1377 leader_peer: Some(Peer::empty(1)),
1378 ..Default::default()
1379 },
1380 ]
1381 );
1382 }
1383
1384 #[tokio::test]
1385 async fn test_repartition_rollback_from_collect_restores_unknown_group_metadata() {
1386 let env = TestingEnv::new();
1387 let table_id = 1024;
1388 let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
1389 let ddl_ctx = env.ddl_context(node_manager);
1390 let original_region_routes = vec![
1391 test_region_route(
1392 RegionId::new(table_id, 1),
1393 &range_expr("x", 0, 100).as_json_str().unwrap(),
1394 ),
1395 test_region_route(
1396 RegionId::new(table_id, 2),
1397 &range_expr("x", 100, 200).as_json_str().unwrap(),
1398 ),
1399 test_region_route(RegionId::new(table_id, 3), ""),
1400 ];
1401 let plan = with_rollback_metadata(
1402 test_plan(table_id),
1403 vec![
1404 original_region_routes[0].clone(),
1405 original_region_routes[2].clone(),
1406 ],
1407 );
1408 let staged_region_routes = apply_group_staging(&plan, &original_region_routes);
1409 assert_eq!(
1410 region_route_by_id(&staged_region_routes, RegionId::new(table_id, 1))
1411 .region
1412 .partition_expr(),
1413 range_expr("x", 0, 50).as_json_str().unwrap()
1414 );
1415 assert!(
1416 region_route_by_id(&staged_region_routes, RegionId::new(table_id, 1))
1417 .is_leader_staging()
1418 );
1419 assert_eq!(
1420 region_route_by_id(&staged_region_routes, RegionId::new(table_id, 3))
1421 .region
1422 .partition_expr(),
1423 range_expr("x", 50, 100).as_json_str().unwrap()
1424 );
1425 assert!(
1426 region_route_by_id(&staged_region_routes, RegionId::new(table_id, 3))
1427 .is_leader_staging()
1428 );
1429 env.create_physical_table_metadata_with_wal_options(
1430 table_id,
1431 staged_region_routes,
1432 test_region_wal_options(&[1, 2, 3]),
1433 )
1434 .await;
1435
1436 let mut persistent_ctx = PersistentContext::new(
1437 TableName::new("test_catalog", "test_schema", "test_table"),
1438 table_id,
1439 None,
1440 );
1441 persistent_ctx.plans = vec![plan.clone()];
1442 persistent_ctx.unknown_procedures = vec![ProcedureMeta {
1443 plan_index: 0,
1444 group_id: plan.group_id,
1445 procedure_id: ProcedureId::random(),
1446 }];
1447
1448 let context = Context::new(
1449 &ddl_ctx,
1450 env.mailbox_ctx.mailbox().clone(),
1451 env.server_addr.clone(),
1452 persistent_ctx,
1453 );
1454 let mut procedure = RepartitionProcedure {
1455 state: Box::new(Collect::new(vec![])),
1456 context,
1457 };
1458
1459 procedure
1460 .rollback(&TestingEnv::procedure_context())
1461 .await
1462 .unwrap();
1463
1464 assert_eq!(
1465 current_parent_region_routes(&procedure.context).await,
1466 vec![
1467 original_region_routes[0].clone(),
1468 original_region_routes[1].clone()
1469 ]
1470 );
1471 }
1472
1473 #[tokio::test]
1474 async fn test_repartition_rollback_is_idempotent() {
1475 let env = TestingEnv::new();
1476 let table_id = 1024;
1477 let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
1478 let ddl_ctx = env.ddl_context(node_manager);
1479 let original_region_routes = vec![
1480 test_region_route(
1481 RegionId::new(table_id, 1),
1482 &range_expr("x", 0, 100).as_json_str().unwrap(),
1483 ),
1484 test_region_route(
1485 RegionId::new(table_id, 2),
1486 &range_expr("x", 50, 100).as_json_str().unwrap(),
1487 ),
1488 test_region_route(RegionId::new(table_id, 3), ""),
1489 ];
1490 env.create_physical_table_metadata_with_wal_options(
1491 table_id,
1492 original_region_routes,
1493 test_region_wal_options(&[1, 2]),
1494 )
1495 .await;
1496
1497 let mut persistent_ctx = PersistentContext::new(
1498 TableName::new("test_catalog", "test_schema", "test_table"),
1499 table_id,
1500 None,
1501 );
1502 persistent_ctx.plans = vec![with_rollback_metadata(
1503 test_plan(table_id),
1504 vec![
1505 test_region_route(
1506 RegionId::new(table_id, 1),
1507 &range_expr("x", 0, 100).as_json_str().unwrap(),
1508 ),
1509 test_region_route(RegionId::new(table_id, 3), ""),
1510 ],
1511 )];
1512 persistent_ctx.failed_procedures = vec![ProcedureMeta {
1513 plan_index: 0,
1514 group_id: Uuid::new_v4(),
1515 procedure_id: ProcedureId::random(),
1516 }];
1517 let context = Context::new(
1518 &ddl_ctx,
1519 env.mailbox_ctx.mailbox().clone(),
1520 env.server_addr.clone(),
1521 persistent_ctx,
1522 );
1523 let mut procedure = RepartitionProcedure {
1524 state: Box::new(Dispatch),
1525 context,
1526 };
1527
1528 procedure
1529 .rollback(&TestingEnv::procedure_context())
1530 .await
1531 .unwrap();
1532 let once = current_parent_region_routes(&procedure.context).await;
1533
1534 procedure
1535 .rollback(&TestingEnv::procedure_context())
1536 .await
1537 .unwrap();
1538 let twice = current_parent_region_routes(&procedure.context).await;
1539
1540 assert_eq!(once, twice);
1541 assert_eq!(once.len(), 2);
1542 assert_eq!(once[0].region.id, RegionId::new(table_id, 1));
1543 assert_eq!(once[1].region.id, RegionId::new(table_id, 2));
1544 }
1545
1546 #[tokio::test]
1547 async fn test_repartition_rollback_from_collect_restores_failed_merge_group_metadata_only() {
1548 let env = TestingEnv::new();
1549 let table_id = 1024;
1550 let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
1551 let ddl_ctx = env.ddl_context(node_manager);
1552 let original_region_routes = vec![
1553 test_region_route(
1554 RegionId::new(table_id, 1),
1555 &range_expr("x", 0, 100).as_json_str().unwrap(),
1556 ),
1557 test_region_route(
1558 RegionId::new(table_id, 2),
1559 &range_expr("x", 100, 200).as_json_str().unwrap(),
1560 ),
1561 test_region_route(
1562 RegionId::new(table_id, 3),
1563 &range_expr("x", 200, 300).as_json_str().unwrap(),
1564 ),
1565 test_region_route(RegionId::new(table_id, 4), ""),
1566 ];
1567 let failed_merge_plan = RepartitionPlanEntry {
1568 group_id: Uuid::new_v4(),
1569 source_regions: vec![
1570 RegionDescriptor {
1571 region_id: RegionId::new(table_id, 1),
1572 partition_expr: range_expr("x", 0, 100),
1573 },
1574 RegionDescriptor {
1575 region_id: RegionId::new(table_id, 2),
1576 partition_expr: range_expr("x", 100, 200),
1577 },
1578 ],
1579 target_regions: vec![RegionDescriptor {
1580 region_id: RegionId::new(table_id, 1),
1581 partition_expr: range_expr("x", 0, 200),
1582 }],
1583 allocated_region_ids: vec![],
1584 pending_deallocate_region_ids: vec![RegionId::new(table_id, 2)],
1585 transition_map: vec![vec![0], vec![0]],
1586 original_target_routes: vec![original_region_routes[0].clone()],
1587 };
1588 let succeeded_split_plan = RepartitionPlanEntry {
1589 group_id: Uuid::new_v4(),
1590 source_regions: vec![RegionDescriptor {
1591 region_id: RegionId::new(table_id, 3),
1592 partition_expr: range_expr("x", 200, 300),
1593 }],
1594 target_regions: vec![
1595 RegionDescriptor {
1596 region_id: RegionId::new(table_id, 3),
1597 partition_expr: range_expr("x", 200, 250),
1598 },
1599 RegionDescriptor {
1600 region_id: RegionId::new(table_id, 4),
1601 partition_expr: range_expr("x", 250, 300),
1602 },
1603 ],
1604 allocated_region_ids: vec![RegionId::new(table_id, 4)],
1605 pending_deallocate_region_ids: vec![],
1606 transition_map: vec![vec![0, 1]],
1607 original_target_routes: vec![
1608 original_region_routes[2].clone(),
1609 original_region_routes[3].clone(),
1610 ],
1611 };
1612 let current_region_routes =
1613 apply_group_staging(&failed_merge_plan, &original_region_routes);
1614 let current_region_routes =
1615 apply_group_staging(&succeeded_split_plan, ¤t_region_routes);
1616 let staged_region_routes =
1617 exit_group_staging(&succeeded_split_plan, ¤t_region_routes);
1618 env.create_physical_table_metadata_with_wal_options(
1619 table_id,
1620 staged_region_routes,
1621 test_region_wal_options(&[1, 2, 3, 4]),
1622 )
1623 .await;
1624
1625 let mut persistent_ctx = PersistentContext::new(
1626 TableName::new("test_catalog", "test_schema", "test_table"),
1627 table_id,
1628 None,
1629 );
1630 persistent_ctx.plans = vec![failed_merge_plan, succeeded_split_plan.clone()];
1631 persistent_ctx.failed_procedures = vec![ProcedureMeta {
1632 plan_index: 0,
1633 group_id: persistent_ctx.plans[0].group_id,
1634 procedure_id: ProcedureId::random(),
1635 }];
1636
1637 let context = Context::new(
1638 &ddl_ctx,
1639 env.mailbox_ctx.mailbox().clone(),
1640 env.server_addr.clone(),
1641 persistent_ctx,
1642 );
1643 let mut procedure = RepartitionProcedure {
1644 state: Box::new(Collect::new(vec![])),
1645 context,
1646 };
1647
1648 procedure
1649 .rollback(&TestingEnv::procedure_context())
1650 .await
1651 .unwrap();
1652
1653 let region_routes = current_parent_region_routes(&procedure.context).await;
1654 assert_eq!(
1655 region_routes,
1656 vec![
1657 test_region_route(
1658 RegionId::new(table_id, 1),
1659 &range_expr("x", 0, 100).as_json_str().unwrap(),
1660 ),
1661 test_region_route(
1662 RegionId::new(table_id, 2),
1663 &range_expr("x", 100, 200).as_json_str().unwrap(),
1664 ),
1665 RegionRoute {
1666 region: Region {
1667 id: RegionId::new(table_id, 3),
1668 partition_expr: range_expr("x", 200, 250).as_json_str().unwrap(),
1669 ..Default::default()
1670 },
1671 leader_peer: Some(Peer::empty(1)),
1672 ..Default::default()
1673 },
1674 RegionRoute {
1675 region: Region {
1676 id: RegionId::new(table_id, 4),
1677 partition_expr: range_expr("x", 250, 300).as_json_str().unwrap(),
1678 ..Default::default()
1679 },
1680 leader_peer: Some(Peer::empty(1)),
1681 ..Default::default()
1682 },
1683 ]
1684 );
1685 }
1686
1687 #[tokio::test]
1688 async fn test_repartition_procedure_flow_split_failed_and_full_rollback() {
1689 let env = TestingEnv::new();
1690 let table_id = 1024;
1691 let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
1692
1693 env.create_physical_table_metadata_for_repartition(
1694 table_id,
1695 vec![
1696 test_region_route(
1697 RegionId::new(table_id, 1),
1698 &range_expr("x", 0, 100).as_json_str().unwrap(),
1699 ),
1700 test_region_route(
1701 RegionId::new(table_id, 2),
1702 &range_expr("x", 100, 200).as_json_str().unwrap(),
1703 ),
1704 ],
1705 test_region_wal_options(&[1, 2]),
1706 )
1707 .await;
1708
1709 let context = new_parent_context(&env, node_manager, table_id);
1710 let mut procedure = RepartitionProcedure::new(
1711 vec![range_expr("x", 0, 100)],
1712 vec![range_expr("x", 0, 50), range_expr("x", 50, 100)],
1713 context,
1714 );
1715
1716 let start_status = procedure
1717 .execute(&TestingEnv::procedure_context())
1718 .await
1719 .unwrap();
1720 assert!(!start_status.need_persist());
1721 let start_status = procedure
1722 .execute(&TestingEnv::procedure_context())
1723 .await
1724 .unwrap();
1725 assert!(start_status.need_persist());
1726 assert_parent_state::<AllocateRegion>(&procedure);
1727
1728 let allocate_status = procedure
1729 .execute(&TestingEnv::procedure_context())
1730 .await
1731 .unwrap();
1732 assert!(allocate_status.need_persist());
1733 assert_parent_state::<Dispatch>(&procedure);
1734 assert_eq!(procedure.context.persistent_ctx.plans.len(), 1);
1735 let plan = &procedure.context.persistent_ctx.plans[0];
1736 let expected_plan = test_plan(table_id);
1737 assert_eq!(plan.source_regions, expected_plan.source_regions);
1738 assert_eq!(plan.target_regions, expected_plan.target_regions);
1739 assert_eq!(
1740 plan.allocated_region_ids,
1741 expected_plan.allocated_region_ids
1742 );
1743 assert_eq!(
1744 plan.pending_deallocate_region_ids,
1745 expected_plan.pending_deallocate_region_ids
1746 );
1747 assert_eq!(plan.transition_map, expected_plan.transition_map);
1748 assert_eq!(
1749 current_parent_region_routes(&procedure.context).await,
1750 vec![
1751 test_region_route(
1752 RegionId::new(table_id, 1),
1753 &range_expr("x", 0, 100).as_json_str().unwrap(),
1754 ),
1755 test_region_route(
1756 RegionId::new(table_id, 2),
1757 &range_expr("x", 100, 200).as_json_str().unwrap(),
1758 ),
1759 RegionRoute {
1760 region: Region {
1761 id: RegionId::new(table_id, 3),
1762 partition_expr: range_expr("x", 50, 100).as_json_str().unwrap(),
1763 ..Default::default()
1764 },
1765 leader_peer: Some(Peer::empty(0)),
1766 ..Default::default()
1767 },
1768 ]
1769 );
1770
1771 let dispatch_status = procedure
1772 .execute(&TestingEnv::procedure_context())
1773 .await
1774 .unwrap();
1775 assert!(!dispatch_status.need_persist());
1776 let subprocedure_ids = extract_subprocedure_ids(dispatch_status);
1777 assert_eq!(subprocedure_ids.len(), 1);
1778 assert_parent_state::<Collect>(&procedure);
1779
1780 let failed_state = ProcedureState::failed(Arc::new(ProcedureError::external(
1781 MockError::new(StatusCode::Internal),
1782 )));
1783 let collect_ctx = procedure_context_with_receivers(HashMap::from([(
1784 subprocedure_ids[0],
1785 procedure_state_receiver(failed_state),
1786 )]));
1787
1788 let err = procedure.execute(&collect_ctx).await.unwrap_err();
1789 assert!(!err.is_retry_later());
1790 assert_parent_state::<Collect>(&procedure);
1791
1792 procedure
1793 .rollback(&TestingEnv::procedure_context())
1794 .await
1795 .unwrap();
1796
1797 let region_routes = current_parent_region_routes(&procedure.context).await;
1798 assert_eq!(
1799 region_routes,
1800 vec![
1801 test_region_route(
1802 RegionId::new(table_id, 1),
1803 &range_expr("x", 0, 100).as_json_str().unwrap(),
1804 ),
1805 test_region_route(
1806 RegionId::new(table_id, 2),
1807 &range_expr("x", 100, 200).as_json_str().unwrap(),
1808 ),
1809 ]
1810 );
1811 }
1812
1813 #[tokio::test]
1814 async fn test_repartition_procedure_flow_split_allocate_retryable_then_resume() {
1815 common_telemetry::init_default_ut_logging();
1816 let env = TestingEnv::new();
1817 let table_id = 1024;
1818 let (tx, _rx) = mpsc::channel(8);
1819 let should_retry = Arc::new(AtomicBool::new(true));
1820 let datanode_handler = DatanodeWatcher::new(tx).with_handler(move |_, _| {
1821 if should_retry.swap(false, Ordering::SeqCst) {
1822 return Err(error::Error::RetryLater {
1823 source: BoxedError::new(
1824 error::UnexpectedSnafu {
1825 err_msg: "retry later",
1826 }
1827 .build(),
1828 ),
1829 clean_poisons: false,
1830 });
1831 }
1832
1833 Ok(api::region::RegionResponse::new(0))
1834 });
1835 let node_manager = Arc::new(MockDatanodeManager::new(datanode_handler));
1836
1837 env.create_physical_table_metadata_for_repartition(
1838 table_id,
1839 vec![
1840 test_region_route(
1841 RegionId::new(table_id, 1),
1842 &range_expr("x", 0, 100).as_json_str().unwrap(),
1843 ),
1844 test_region_route(
1845 RegionId::new(table_id, 2),
1846 &range_expr("x", 100, 200).as_json_str().unwrap(),
1847 ),
1848 ],
1849 test_region_wal_options(&[1, 2]),
1850 )
1851 .await;
1852
1853 let context = new_parent_context(&env, node_manager, table_id);
1854 let mut procedure = RepartitionProcedure::new(
1855 vec![range_expr("x", 0, 100)],
1856 vec![range_expr("x", 0, 50), range_expr("x", 50, 100)],
1857 context,
1858 );
1859
1860 let start_status = procedure
1861 .execute(&TestingEnv::procedure_context())
1862 .await
1863 .unwrap();
1864 assert!(!start_status.need_persist());
1865 let start_status = procedure
1866 .execute(&TestingEnv::procedure_context())
1867 .await
1868 .unwrap();
1869 assert!(start_status.need_persist());
1870 assert_parent_state::<AllocateRegion>(&procedure);
1871
1872 let err = procedure
1873 .execute(&TestingEnv::procedure_context())
1874 .await
1875 .unwrap_err();
1876 assert!(err.is_retry_later());
1877 assert_parent_state::<AllocateRegion>(&procedure);
1878 assert!(!procedure.context.persistent_ctx.plans.is_empty());
1879 assert_eq!(
1880 current_parent_region_routes(&procedure.context).await,
1881 vec![
1882 test_region_route(
1883 RegionId::new(table_id, 1),
1884 &range_expr("x", 0, 100).as_json_str().unwrap(),
1885 ),
1886 test_region_route(
1887 RegionId::new(table_id, 2),
1888 &range_expr("x", 100, 200).as_json_str().unwrap(),
1889 ),
1890 ]
1891 );
1892
1893 let allocate_status = procedure
1894 .execute(&TestingEnv::procedure_context())
1895 .await
1896 .unwrap();
1897 assert!(allocate_status.need_persist());
1898 assert_parent_state::<Dispatch>(&procedure);
1899
1900 assert_eq!(procedure.context.persistent_ctx.plans.len(), 1);
1901 let plan = &procedure.context.persistent_ctx.plans[0];
1902 let expected_plan = test_plan(table_id);
1903 assert_eq!(plan.source_regions, expected_plan.source_regions);
1904 assert_eq!(plan.target_regions, expected_plan.target_regions);
1905 assert_eq!(
1906 plan.allocated_region_ids,
1907 expected_plan.allocated_region_ids
1908 );
1909 assert_eq!(plan.transition_map, expected_plan.transition_map);
1910 assert_eq!(
1911 current_parent_region_routes(&procedure.context).await,
1912 vec![
1913 test_region_route(
1914 RegionId::new(table_id, 1),
1915 &range_expr("x", 0, 100).as_json_str().unwrap(),
1916 ),
1917 test_region_route(
1918 RegionId::new(table_id, 2),
1919 &range_expr("x", 100, 200).as_json_str().unwrap(),
1920 ),
1921 RegionRoute {
1922 region: Region {
1923 id: RegionId::new(table_id, 3),
1924 partition_expr: range_expr("x", 50, 100).as_json_str().unwrap(),
1925 ..Default::default()
1926 },
1927 leader_peer: Some(Peer::empty(0)),
1928 ..Default::default()
1929 },
1930 ]
1931 );
1932
1933 let dispatch_status = procedure
1934 .execute(&TestingEnv::procedure_context())
1935 .await
1936 .unwrap();
1937 assert!(!dispatch_status.need_persist());
1938 let subprocedure_ids = extract_subprocedure_ids(dispatch_status);
1939 assert_eq!(subprocedure_ids.len(), 1);
1940 assert_parent_state::<Collect>(&procedure);
1941 }
1942}