1pub mod allocate_region;
16pub mod collect;
17pub mod deallocate_region;
18pub mod dispatch;
19pub mod group;
20pub mod plan;
21pub mod repartition_end;
22pub mod repartition_start;
23pub mod utils;
24
25use std::any::Any;
26use std::collections::{HashMap, HashSet};
27use std::fmt::{Debug, Display};
28use std::time::{Duration, Instant};
29
30use common_error::ext::BoxedError;
31use common_meta::cache_invalidator::CacheInvalidatorRef;
32use common_meta::ddl::DdlContext;
33use common_meta::ddl::allocator::region_routes::RegionRoutesAllocatorRef;
34use common_meta::ddl::allocator::wal_options::WalOptionsAllocatorRef;
35use common_meta::ddl_manager::RepartitionProcedureFactory;
36use common_meta::instruction::CacheIdent;
37use common_meta::key::datanode_table::RegionInfo;
38use common_meta::key::table_info::TableInfoValue;
39use common_meta::key::table_route::TableRouteValue;
40use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
41use common_meta::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
42use common_meta::node_manager::NodeManagerRef;
43use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard};
44use common_meta::region_registry::LeaderRegionRegistryRef;
45use common_meta::rpc::router::{RegionRoute, operating_leader_regions};
46use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
47use common_procedure::{
48 BoxedProcedure, Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
49 ProcedureManagerRef, Result as ProcedureResult, Status, StringKey, UserMetadata,
50};
51use common_telemetry::{error, info, warn};
52use partition::expr::PartitionExpr;
53use serde::{Deserialize, Serialize};
54use snafu::{OptionExt, ResultExt};
55use store_api::storage::{RegionNumber, TableId};
56use table::table_name::TableName;
57
58use crate::error::{self, Result};
59use crate::procedure::repartition::collect::ProcedureMeta;
60use crate::procedure::repartition::deallocate_region::DeallocateRegion;
61use crate::procedure::repartition::group::{
62 Context as RepartitionGroupContext, RepartitionGroupProcedure,
63};
64use crate::procedure::repartition::plan::RepartitionPlanEntry;
65use crate::procedure::repartition::repartition_start::RepartitionStart;
66use crate::procedure::repartition::utils::get_datanode_table_value;
67use crate::service::mailbox::MailboxRef;
68
69#[cfg(test)]
70pub mod test_util;
71
72#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
73pub struct PersistentContext {
74 pub catalog_name: String,
75 pub schema_name: String,
76 pub table_name: String,
77 pub table_id: TableId,
78 pub plans: Vec<RepartitionPlanEntry>,
79 #[serde(default)]
81 pub failed_procedures: Vec<ProcedureMeta>,
82 #[serde(default)]
83 pub unknown_procedures: Vec<ProcedureMeta>,
85 #[serde(with = "humantime_serde", default = "default_timeout")]
87 pub timeout: Duration,
88}
89
90fn default_timeout() -> Duration {
91 Duration::from_mins(2)
92}
93
94impl PersistentContext {
95 pub fn new(
99 TableName {
100 catalog_name,
101 schema_name,
102 table_name,
103 }: TableName,
104 table_id: TableId,
105 timeout: Option<Duration>,
106 ) -> Self {
107 Self {
108 catalog_name,
109 schema_name,
110 table_name,
111 table_id,
112 plans: vec![],
113 failed_procedures: vec![],
114 unknown_procedures: vec![],
115 timeout: timeout.unwrap_or_else(default_timeout),
116 }
117 }
118
119 pub fn lock_key(&self) -> Vec<StringKey> {
120 vec![
121 CatalogLock::Read(&self.catalog_name).into(),
122 SchemaLock::read(&self.catalog_name, &self.schema_name).into(),
123 TableLock::Write(self.table_id).into(),
124 TableNameLock::new(&self.catalog_name, &self.schema_name, &self.table_name).into(),
125 ]
126 }
127}
128
129#[derive(Clone)]
130pub struct Context {
131 pub persistent_ctx: PersistentContext,
132 pub volatile_ctx: VolatileContext,
133 pub table_metadata_manager: TableMetadataManagerRef,
134 pub memory_region_keeper: MemoryRegionKeeperRef,
135 pub node_manager: NodeManagerRef,
136 pub leader_region_registry: LeaderRegionRegistryRef,
137 pub mailbox: MailboxRef,
138 pub server_addr: String,
139 pub cache_invalidator: CacheInvalidatorRef,
140 pub region_routes_allocator: RegionRoutesAllocatorRef,
141 pub wal_options_allocator: WalOptionsAllocatorRef,
142 pub start_time: Instant,
143}
144
145#[derive(Debug, Clone, Default)]
146pub struct VolatileContext {
147 pub metrics: Metrics,
148 pub dispatch_start_time: Option<Instant>,
149}
150
151#[derive(Debug, Clone, Default)]
153pub struct Metrics {
154 build_plan_elapsed: Duration,
156 allocate_region_elapsed: Duration,
158 finish_groups_elapsed: Duration,
160 deallocate_region_elapsed: Duration,
162}
163
164impl Display for Metrics {
165 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
166 let total = self.build_plan_elapsed
167 + self.allocate_region_elapsed
168 + self.finish_groups_elapsed
169 + self.deallocate_region_elapsed;
170 write!(f, "total: {:?}", total)?;
171 let mut parts = Vec::with_capacity(4);
172 if self.build_plan_elapsed > Duration::ZERO {
173 parts.push(format!("build_plan_elapsed: {:?}", self.build_plan_elapsed));
174 }
175 if self.allocate_region_elapsed > Duration::ZERO {
176 parts.push(format!(
177 "allocate_region_elapsed: {:?}",
178 self.allocate_region_elapsed
179 ));
180 }
181 if self.finish_groups_elapsed > Duration::ZERO {
182 parts.push(format!(
183 "finish_groups_elapsed: {:?}",
184 self.finish_groups_elapsed
185 ));
186 }
187 if self.deallocate_region_elapsed > Duration::ZERO {
188 parts.push(format!(
189 "deallocate_region_elapsed: {:?}",
190 self.deallocate_region_elapsed
191 ));
192 }
193
194 if !parts.is_empty() {
195 write!(f, ", {}", parts.join(", "))?;
196 }
197 Ok(())
198 }
199}
200
201impl Metrics {
202 pub fn update_build_plan_elapsed(&mut self, elapsed: Duration) {
204 self.build_plan_elapsed += elapsed;
205 }
206
207 pub fn update_allocate_region_elapsed(&mut self, elapsed: Duration) {
209 self.allocate_region_elapsed += elapsed;
210 }
211
212 pub fn update_finish_groups_elapsed(&mut self, elapsed: Duration) {
214 self.finish_groups_elapsed += elapsed;
215 }
216
217 pub fn update_deallocate_region_elapsed(&mut self, elapsed: Duration) {
219 self.deallocate_region_elapsed += elapsed;
220 }
221}
222
223impl Context {
224 pub fn new(
225 ddl_ctx: &DdlContext,
226 mailbox: MailboxRef,
227 server_addr: String,
228 persistent_ctx: PersistentContext,
229 ) -> Self {
230 Self {
231 persistent_ctx,
232 table_metadata_manager: ddl_ctx.table_metadata_manager.clone(),
233 memory_region_keeper: ddl_ctx.memory_region_keeper.clone(),
234 node_manager: ddl_ctx.node_manager.clone(),
235 leader_region_registry: ddl_ctx.leader_region_registry.clone(),
236 mailbox,
237 server_addr,
238 cache_invalidator: ddl_ctx.cache_invalidator.clone(),
239 region_routes_allocator: ddl_ctx.table_metadata_allocator.region_routes_allocator(),
240 wal_options_allocator: ddl_ctx.table_metadata_allocator.wal_options_allocator(),
241 start_time: Instant::now(),
242 volatile_ctx: VolatileContext::default(),
243 }
244 }
245
246 pub fn next_operation_timeout(&self) -> Option<Duration> {
248 self.persistent_ctx
249 .timeout
250 .checked_sub(self.start_time.elapsed())
251 }
252
253 pub fn update_build_plan_elapsed(&mut self, elapsed: Duration) {
255 self.volatile_ctx.metrics.update_build_plan_elapsed(elapsed);
256 }
257
258 pub fn update_allocate_region_elapsed(&mut self, elapsed: Duration) {
260 self.volatile_ctx
261 .metrics
262 .update_allocate_region_elapsed(elapsed);
263 }
264
265 pub fn update_finish_groups_elapsed(&mut self, elapsed: Duration) {
267 self.volatile_ctx
268 .metrics
269 .update_finish_groups_elapsed(elapsed);
270 }
271
272 pub fn update_deallocate_region_elapsed(&mut self, elapsed: Duration) {
274 self.volatile_ctx
275 .metrics
276 .update_deallocate_region_elapsed(elapsed);
277 }
278
279 pub async fn get_table_route_value(
287 &self,
288 ) -> Result<DeserializedValueWithBytes<TableRouteValue>> {
289 let table_id = self.persistent_ctx.table_id;
290 let table_route_value = self
291 .table_metadata_manager
292 .table_route_manager()
293 .table_route_storage()
294 .get_with_raw_bytes(table_id)
295 .await
296 .map_err(BoxedError::new)
297 .with_context(|_| error::RetryLaterWithSourceSnafu {
298 reason: format!("Failed to get table route for table: {}", table_id),
299 })?
300 .context(error::TableRouteNotFoundSnafu { table_id })?;
301
302 Ok(table_route_value)
303 }
304
305 pub async fn get_table_info_value(&self) -> Result<TableInfoValue> {
313 let table_id = self.persistent_ctx.table_id;
314 let table_info_value = self
315 .table_metadata_manager
316 .table_info_manager()
317 .get(table_id)
318 .await
319 .map_err(BoxedError::new)
320 .with_context(|_| error::RetryLaterWithSourceSnafu {
321 reason: format!("Failed to get table info for table: {}", table_id),
322 })?
323 .context(error::TableInfoNotFoundSnafu { table_id })?
324 .into_inner();
325 Ok(table_info_value)
326 }
327
328 pub async fn update_table_route(
337 &self,
338 current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
339 new_region_routes: Vec<RegionRoute>,
340 new_region_wal_options: HashMap<RegionNumber, String>,
341 ) -> Result<()> {
342 let table_id = self.persistent_ctx.table_id;
343 if new_region_routes.is_empty() {
344 return error::UnexpectedSnafu {
345 violated: format!("new_region_routes is empty for table: {}", table_id),
346 }
347 .fail();
348 }
349 let datanode_id = new_region_routes
350 .first()
351 .unwrap()
352 .leader_peer
353 .as_ref()
354 .context(error::NoLeaderSnafu)?
355 .id;
356 let datanode_table_value =
357 get_datanode_table_value(&self.table_metadata_manager, table_id, datanode_id).await?;
358
359 let RegionInfo {
360 region_options,
361 region_wal_options,
362 ..
363 } = &datanode_table_value.region_info;
364
365 let validated_region_wal_options =
367 crate::procedure::repartition::utils::merge_and_validate_region_wal_options(
368 region_wal_options,
369 new_region_wal_options,
370 &new_region_routes,
371 table_id,
372 )?;
373 info!(
374 "Updating table route for table: {}, new region routes: {:?}",
375 table_id, new_region_routes
376 );
377 self.table_metadata_manager
378 .update_table_route(
379 table_id,
380 datanode_table_value.region_info.clone(),
381 current_table_route_value,
382 new_region_routes,
383 region_options,
384 &validated_region_wal_options,
385 )
386 .await
387 .context(error::TableMetadataManagerSnafu)
388 }
389
390 pub async fn invalidate_table_cache(&self) -> Result<()> {
392 let table_id = self.persistent_ctx.table_id;
393 let subject = format!(
394 "Invalidate table cache for repartition table, table: {}",
395 table_id,
396 );
397 let ctx = common_meta::cache_invalidator::Context {
398 subject: Some(subject),
399 };
400 let _ = self
401 .cache_invalidator
402 .invalidate(&ctx, &[CacheIdent::TableId(table_id)])
403 .await;
404 Ok(())
405 }
406
407 pub fn register_operating_regions(
408 memory_region_keeper: &MemoryRegionKeeperRef,
409 region_routes: &[RegionRoute],
410 ) -> Result<Vec<OperatingRegionGuard>> {
411 let mut operating_guards = Vec::with_capacity(region_routes.len());
412 for (region_id, datanode_id) in operating_leader_regions(region_routes) {
413 let guard = memory_region_keeper
414 .register(datanode_id, region_id)
415 .context(error::RegionOperatingRaceSnafu {
416 peer_id: datanode_id,
417 region_id,
418 })?;
419 operating_guards.push(guard);
420 }
421 Ok(operating_guards)
422 }
423}
424
425#[async_trait::async_trait]
426#[typetag::serde(tag = "repartition_state")]
427pub(crate) trait State: Sync + Send + Debug {
428 fn name(&self) -> &'static str {
429 let type_name = std::any::type_name::<Self>();
430 type_name.split("::").last().unwrap_or(type_name)
432 }
433
434 async fn next(
436 &mut self,
437 ctx: &mut Context,
438 procedure_ctx: &ProcedureContext,
439 ) -> Result<(Box<dyn State>, Status)>;
440
441 fn as_any(&self) -> &dyn Any;
442}
443
444pub struct RepartitionProcedure {
445 state: Box<dyn State>,
446 context: Context,
447}
448
449#[derive(Debug, Serialize)]
450struct RepartitionData<'a> {
451 state: &'a dyn State,
452 persistent_ctx: &'a PersistentContext,
453}
454
455#[derive(Debug, Deserialize)]
456struct RepartitionDataOwned {
457 state: Box<dyn State>,
458 persistent_ctx: PersistentContext,
459}
460
461impl RepartitionProcedure {
462 const TYPE_NAME: &'static str = "metasrv-procedure::Repartition";
463
464 pub fn new(
465 from_exprs: Vec<PartitionExpr>,
466 to_exprs: Vec<PartitionExpr>,
467 context: Context,
468 ) -> Self {
469 let state = Box::new(RepartitionStart::new(from_exprs, to_exprs));
470
471 Self { state, context }
472 }
473
474 pub fn from_json<F>(json: &str, ctx_factory: F) -> ProcedureResult<Self>
475 where
476 F: FnOnce(PersistentContext) -> Context,
477 {
478 let RepartitionDataOwned {
479 state,
480 persistent_ctx,
481 } = serde_json::from_str(json).context(FromJsonSnafu)?;
482 let context = ctx_factory(persistent_ctx);
483
484 Ok(Self { state, context })
485 }
486
487 fn should_rollback_allocated_regions(&self) -> bool {
504 self.state.as_any().is::<allocate_region::AllocateRegion>()
505 || self.state.as_any().is::<dispatch::Dispatch>()
506 || self.state.as_any().is::<collect::Collect>()
507 }
508
509 fn rollback_allocated_region_ids(&self) -> HashSet<store_api::storage::RegionId> {
510 if self.state.as_any().is::<allocate_region::AllocateRegion>()
511 || self.state.as_any().is::<dispatch::Dispatch>()
512 {
513 return self
514 .context
515 .persistent_ctx
516 .plans
517 .iter()
518 .flat_map(|plan| plan.allocated_region_ids.iter().copied())
519 .collect();
520 }
521
522 self.context
523 .persistent_ctx
524 .failed_procedures
525 .iter()
526 .chain(self.context.persistent_ctx.unknown_procedures.iter())
527 .flat_map(|procedure_meta| {
528 let plan_index = procedure_meta.plan_index;
529 self.context.persistent_ctx.plans[plan_index]
530 .allocated_region_ids
531 .iter()
532 .copied()
533 })
534 .collect()
535 }
536
537 fn filter_allocated_region_routes(
538 region_routes: &[RegionRoute],
539 allocated_region_ids: &HashSet<store_api::storage::RegionId>,
540 ) -> Vec<RegionRoute> {
541 region_routes
542 .iter()
543 .filter(|route| !allocated_region_ids.contains(&route.region.id))
544 .cloned()
545 .collect()
546 }
547
548 async fn rollback_inner(&mut self, procedure_ctx: &ProcedureContext) -> Result<()> {
549 if !self.should_rollback_allocated_regions() {
550 return Ok(());
551 }
552
553 let table_id = self.context.persistent_ctx.table_id;
554 let allocated_region_ids = self.rollback_allocated_region_ids();
555 if allocated_region_ids.is_empty() {
556 return Ok(());
557 }
558
559 let table_lock = TableLock::Write(table_id).into();
560 let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
561 let table_route_value = self.context.get_table_route_value().await?;
562 let current_region_routes = table_route_value.region_routes().unwrap();
563 let allocated_region_routes = DeallocateRegion::filter_deallocatable_region_routes(
564 table_id,
565 current_region_routes,
566 &allocated_region_ids,
567 );
568 if !allocated_region_routes.is_empty() {
569 let table = TableName {
570 catalog_name: self.context.persistent_ctx.catalog_name.clone(),
571 schema_name: self.context.persistent_ctx.schema_name.clone(),
572 table_name: self.context.persistent_ctx.table_name.clone(),
573 };
574 if let Err(err) = DeallocateRegion::deallocate_regions(
577 &self.context.node_manager,
578 &self.context.leader_region_registry,
579 table,
580 table_id,
581 &allocated_region_routes,
582 )
583 .await
584 {
585 warn!(err; "Failed to drop allocated regions during repartition rollback, table_id: {}, regions: {:?}", table_id, allocated_region_ids);
586 }
587 }
588
589 let new_region_routes =
590 Self::filter_allocated_region_routes(current_region_routes, &allocated_region_ids);
591
592 if new_region_routes.len() != current_region_routes.len() {
593 self.context
594 .update_table_route(&table_route_value, new_region_routes, HashMap::new())
595 .await
596 .map_err(BoxedError::new)
597 .with_context(|_| error::RetryLaterWithSourceSnafu {
598 reason: format!(
599 "Failed to rollback allocated region routes for repartition table: {}",
600 table_id
601 ),
602 })?;
603 }
604
605 if let Err(err) = self.context.invalidate_table_cache().await {
606 warn!(err; "Failed to invalidate table cache during repartition rollback, table_id: {}", table_id);
607 }
608
609 Ok(())
610 }
611}
612
613#[async_trait::async_trait]
614impl Procedure for RepartitionProcedure {
615 fn type_name(&self) -> &str {
616 Self::TYPE_NAME
617 }
618
619 #[tracing::instrument(skip_all, fields(
620 state = %self.state.name(),
621 table_id = %self.context.persistent_ctx.table_id
622 ))]
623 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
624 let state = &mut self.state;
625 let state_name = state.name();
626 common_telemetry::info!(
628 "Repartition procedure executing state: {}, table_id: {}",
629 state_name,
630 self.context.persistent_ctx.table_id
631 );
632 match state.next(&mut self.context, _ctx).await {
633 Ok((next, status)) => {
634 *state = next;
635 Ok(status)
636 }
637 Err(e) => {
638 if e.is_retryable() {
639 Err(ProcedureError::retry_later(e))
640 } else {
641 error!(
642 e;
643 "Repartition procedure failed, table id: {}",
644 self.context.persistent_ctx.table_id,
645 );
646 Err(ProcedureError::external(e))
647 }
648 }
649 }
650 }
651
652 async fn rollback(&mut self, ctx: &ProcedureContext) -> ProcedureResult<()> {
653 self.rollback_inner(ctx)
654 .await
655 .map_err(ProcedureError::external)
656 }
657
658 fn rollback_supported(&self) -> bool {
659 true
660 }
661
662 fn dump(&self) -> ProcedureResult<String> {
663 let data = RepartitionData {
664 state: self.state.as_ref(),
665 persistent_ctx: &self.context.persistent_ctx,
666 };
667 serde_json::to_string(&data).context(ToJsonSnafu)
668 }
669
670 fn lock_key(&self) -> LockKey {
671 LockKey::new(self.context.persistent_ctx.lock_key())
672 }
673
674 fn user_metadata(&self) -> Option<UserMetadata> {
675 None
677 }
678}
679
680pub struct DefaultRepartitionProcedureFactory {
681 mailbox: MailboxRef,
682 server_addr: String,
683}
684
685impl DefaultRepartitionProcedureFactory {
686 pub fn new(mailbox: MailboxRef, server_addr: String) -> Self {
687 Self {
688 mailbox,
689 server_addr,
690 }
691 }
692}
693
694impl RepartitionProcedureFactory for DefaultRepartitionProcedureFactory {
695 fn create(
696 &self,
697 ddl_ctx: &DdlContext,
698 table_name: TableName,
699 table_id: TableId,
700 from_exprs: Vec<String>,
701 to_exprs: Vec<String>,
702 timeout: Option<Duration>,
703 ) -> std::result::Result<BoxedProcedure, BoxedError> {
704 let persistent_ctx = PersistentContext::new(table_name, table_id, timeout);
705 let from_exprs = from_exprs
706 .iter()
707 .map(|e| {
708 PartitionExpr::from_json_str(e)
709 .context(error::DeserializePartitionExprSnafu)?
710 .context(error::EmptyPartitionExprSnafu)
711 })
712 .collect::<Result<Vec<_>>>()
713 .map_err(BoxedError::new)?;
714 let to_exprs = to_exprs
715 .iter()
716 .map(|e| {
717 PartitionExpr::from_json_str(e)
718 .context(error::DeserializePartitionExprSnafu)?
719 .context(error::EmptyPartitionExprSnafu)
720 })
721 .collect::<Result<Vec<_>>>()
722 .map_err(BoxedError::new)?;
723
724 let procedure = RepartitionProcedure::new(
725 from_exprs,
726 to_exprs,
727 Context::new(
728 ddl_ctx,
729 self.mailbox.clone(),
730 self.server_addr.clone(),
731 persistent_ctx,
732 ),
733 );
734
735 Ok(Box::new(procedure))
736 }
737
738 fn register_loaders(
739 &self,
740 ddl_ctx: &DdlContext,
741 procedure_manager: &ProcedureManagerRef,
742 ) -> std::result::Result<(), BoxedError> {
743 let mailbox = self.mailbox.clone();
745 let server_addr = self.server_addr.clone();
746 let moved_ddl_ctx = ddl_ctx.clone();
747 procedure_manager
748 .register_loader(
749 RepartitionProcedure::TYPE_NAME,
750 Box::new(move |json| {
751 let mailbox = mailbox.clone();
752 let server_addr = server_addr.clone();
753 let ddl_ctx = moved_ddl_ctx.clone();
754 let factory = move |persistent_ctx| {
755 Context::new(&ddl_ctx, mailbox, server_addr, persistent_ctx)
756 };
757 RepartitionProcedure::from_json(json, factory).map(|p| Box::new(p) as _)
758 }),
759 )
760 .map_err(BoxedError::new)?;
761
762 let mailbox = self.mailbox.clone();
764 let server_addr = self.server_addr.clone();
765 let moved_ddl_ctx = ddl_ctx.clone();
766 procedure_manager
767 .register_loader(
768 RepartitionGroupProcedure::TYPE_NAME,
769 Box::new(move |json| {
770 let mailbox = mailbox.clone();
771 let server_addr = server_addr.clone();
772 let ddl_ctx = moved_ddl_ctx.clone();
773 let factory = move |persistent_ctx| {
774 RepartitionGroupContext::new(&ddl_ctx, mailbox, server_addr, persistent_ctx)
775 };
776 RepartitionGroupProcedure::from_json(json, factory).map(|p| Box::new(p) as _)
777 }),
778 )
779 .map_err(BoxedError::new)?;
780
781 Ok(())
782 }
783}
784
785#[cfg(test)]
786mod tests {
787 use std::collections::HashMap;
788 use std::sync::Arc;
789 use std::sync::atomic::{AtomicBool, Ordering};
790
791 use common_error::ext::BoxedError;
792 use common_error::mock::MockError;
793 use common_error::status_code::StatusCode;
794 use common_meta::ddl::test_util::datanode_handler::{
795 DatanodeWatcher, NaiveDatanodeHandler, UnexpectedErrorDatanodeHandler,
796 };
797 use common_meta::error;
798 use common_meta::peer::Peer;
799 use common_meta::rpc::router::{Region, RegionRoute};
800 use common_meta::test_util::MockDatanodeManager;
801 use common_procedure::{Error as ProcedureError, Procedure, ProcedureId, ProcedureState};
802 use store_api::storage::RegionId;
803 use table::table_name::TableName;
804 use tokio::sync::mpsc;
805 use uuid::Uuid;
806
807 use super::*;
808 use crate::procedure::repartition::allocate_region::AllocateRegion;
809 use crate::procedure::repartition::collect::Collect;
810 use crate::procedure::repartition::deallocate_region::DeallocateRegion;
811 use crate::procedure::repartition::dispatch::Dispatch;
812 use crate::procedure::repartition::plan::RegionDescriptor;
813 use crate::procedure::repartition::repartition_end::RepartitionEnd;
814 use crate::procedure::repartition::test_util::{
815 TestingEnv, assert_parent_state, current_parent_region_routes, extract_subprocedure_ids,
816 new_parent_context, procedure_context_with_receivers, procedure_state_receiver, range_expr,
817 test_region_route, test_region_wal_options,
818 };
819
820 fn test_plan(table_id: TableId) -> RepartitionPlanEntry {
821 RepartitionPlanEntry {
822 group_id: uuid::Uuid::new_v4(),
823 source_regions: vec![RegionDescriptor {
824 region_id: RegionId::new(table_id, 1),
825 partition_expr: range_expr("x", 0, 100),
826 }],
827 target_regions: vec![
828 RegionDescriptor {
829 region_id: RegionId::new(table_id, 1),
830 partition_expr: range_expr("x", 0, 50),
831 },
832 RegionDescriptor {
833 region_id: RegionId::new(table_id, 3),
834 partition_expr: range_expr("x", 50, 100),
835 },
836 ],
837 allocated_region_ids: vec![RegionId::new(table_id, 3)],
838 pending_deallocate_region_ids: vec![],
839 transition_map: vec![vec![0, 1]],
840 }
841 }
842
843 fn test_procedure(state: Box<dyn State>, context: Context) -> RepartitionProcedure {
844 RepartitionProcedure { state, context }
845 }
846
847 fn test_context(env: &TestingEnv, table_id: TableId) -> Context {
848 let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
849 let ddl_ctx = env.ddl_context(node_manager);
850 let persistent_ctx = PersistentContext::new(
851 TableName::new("test_catalog", "test_schema", "test_table"),
852 table_id,
853 None,
854 );
855
856 Context::new(
857 &ddl_ctx,
858 env.mailbox_ctx.mailbox().clone(),
859 env.server_addr.clone(),
860 persistent_ctx,
861 )
862 }
863
864 #[test]
865 fn test_filter_allocated_region_routes() {
866 let table_id = 1024;
867 let region_routes = vec![
868 test_region_route(RegionId::new(table_id, 1), "a"),
869 test_region_route(RegionId::new(table_id, 2), "b"),
870 ];
871 let allocated_region_ids = HashSet::from([RegionId::new(table_id, 2)]);
872
873 let new_region_routes = RepartitionProcedure::filter_allocated_region_routes(
874 ®ion_routes,
875 &allocated_region_ids,
876 );
877
878 assert_eq!(new_region_routes.len(), 1);
879 assert_eq!(new_region_routes[0].region.id, RegionId::new(table_id, 1));
880 }
881
882 #[test]
883 fn test_should_rollback_allocated_regions() {
884 let env = TestingEnv::new();
885 let table_id = 1024;
886
887 let procedure = test_procedure(
888 Box::new(RepartitionStart::new(vec![], vec![])),
889 test_context(&env, table_id),
890 );
891 assert!(!procedure.should_rollback_allocated_regions());
892
893 let procedure = test_procedure(
894 Box::new(AllocateRegion::new(vec![])),
895 test_context(&env, table_id),
896 );
897 assert!(procedure.should_rollback_allocated_regions());
898
899 let procedure = test_procedure(Box::new(Dispatch), test_context(&env, table_id));
900 assert!(procedure.should_rollback_allocated_regions());
901
902 let procedure =
903 test_procedure(Box::new(Collect::new(vec![])), test_context(&env, table_id));
904 assert!(procedure.should_rollback_allocated_regions());
905
906 let procedure = test_procedure(Box::new(DeallocateRegion), test_context(&env, table_id));
907 assert!(!procedure.should_rollback_allocated_regions());
908
909 let procedure = test_procedure(Box::new(RepartitionEnd), test_context(&env, table_id));
910 assert!(!procedure.should_rollback_allocated_regions());
911 }
912
913 #[tokio::test]
914 async fn test_repartition_rollback_removes_allocated_routes_from_dispatch() {
915 let env = TestingEnv::new();
916 let table_id = 1024;
917 let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
918 let ddl_ctx = env.ddl_context(node_manager);
919 let original_region_routes = vec![
920 test_region_route(
921 RegionId::new(table_id, 1),
922 &range_expr("x", 0, 100).as_json_str().unwrap(),
923 ),
924 test_region_route(
925 RegionId::new(table_id, 2),
926 &range_expr("x", 50, 100).as_json_str().unwrap(),
927 ),
928 test_region_route(RegionId::new(table_id, 3), ""),
929 ];
930 env.create_physical_table_metadata_with_wal_options(
931 table_id,
932 original_region_routes,
933 test_region_wal_options(&[1, 2]),
934 )
935 .await;
936
937 let mut persistent_ctx = PersistentContext::new(
938 TableName::new("test_catalog", "test_schema", "test_table"),
939 table_id,
940 None,
941 );
942 persistent_ctx.plans = vec![test_plan(table_id)];
943 persistent_ctx.failed_procedures = vec![ProcedureMeta {
944 plan_index: 0,
945 group_id: Uuid::new_v4(),
946 procedure_id: ProcedureId::random(),
947 }];
948 let context = Context::new(
949 &ddl_ctx,
950 env.mailbox_ctx.mailbox().clone(),
951 env.server_addr.clone(),
952 persistent_ctx,
953 );
954 let mut procedure = RepartitionProcedure {
955 state: Box::new(Dispatch),
956 context,
957 };
958
959 procedure
960 .rollback(&TestingEnv::procedure_context())
961 .await
962 .unwrap();
963
964 let region_routes = current_parent_region_routes(&procedure.context).await;
965 assert_eq!(region_routes.len(), 2);
966 assert_eq!(region_routes[0].region.id, RegionId::new(table_id, 1));
967 assert_eq!(region_routes[1].region.id, RegionId::new(table_id, 2));
968 }
969
970 #[tokio::test]
971 async fn test_repartition_rollback_removes_allocated_routes_from_allocate() {
972 let env = TestingEnv::new();
973 let table_id = 1024;
974 let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
975 let ddl_ctx = env.ddl_context(node_manager);
976 let original_region_routes = vec![
977 test_region_route(
978 RegionId::new(table_id, 1),
979 &range_expr("x", 0, 100).as_json_str().unwrap(),
980 ),
981 test_region_route(
982 RegionId::new(table_id, 2),
983 &range_expr("x", 50, 100).as_json_str().unwrap(),
984 ),
985 test_region_route(RegionId::new(table_id, 3), ""),
986 ];
987 env.create_physical_table_metadata_with_wal_options(
988 table_id,
989 original_region_routes,
990 test_region_wal_options(&[1, 2]),
991 )
992 .await;
993
994 let mut persistent_ctx = PersistentContext::new(
995 TableName::new("test_catalog", "test_schema", "test_table"),
996 table_id,
997 None,
998 );
999 persistent_ctx.plans = vec![test_plan(table_id)];
1000 let context = Context::new(
1001 &ddl_ctx,
1002 env.mailbox_ctx.mailbox().clone(),
1003 env.server_addr.clone(),
1004 persistent_ctx,
1005 );
1006 let mut procedure = RepartitionProcedure {
1007 state: Box::new(AllocateRegion::new(vec![])),
1008 context,
1009 };
1010
1011 procedure
1012 .rollback(&TestingEnv::procedure_context())
1013 .await
1014 .unwrap();
1015
1016 let region_routes = current_parent_region_routes(&procedure.context).await;
1017 assert_eq!(region_routes.len(), 2);
1018 assert_eq!(region_routes[0].region.id, RegionId::new(table_id, 1));
1019 assert_eq!(region_routes[1].region.id, RegionId::new(table_id, 2));
1020 }
1021
1022 #[tokio::test]
1023 async fn test_repartition_rollback_from_collect_only_removes_failed_allocated_routes() {
1024 let env = TestingEnv::new();
1025 let table_id = 1024;
1026 let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
1027 let ddl_ctx = env.ddl_context(node_manager);
1028 let original_region_routes = vec![
1029 test_region_route(
1030 RegionId::new(table_id, 1),
1031 &range_expr("x", 0, 100).as_json_str().unwrap(),
1032 ),
1033 test_region_route(
1034 RegionId::new(table_id, 2),
1035 &range_expr("x", 100, 200).as_json_str().unwrap(),
1036 ),
1037 test_region_route(RegionId::new(table_id, 3), ""),
1038 test_region_route(RegionId::new(table_id, 4), ""),
1039 ];
1040 env.create_physical_table_metadata_with_wal_options(
1041 table_id,
1042 original_region_routes,
1043 test_region_wal_options(&[1, 2, 3, 4]),
1044 )
1045 .await;
1046
1047 let mut persistent_ctx = PersistentContext::new(
1048 TableName::new("test_catalog", "test_schema", "test_table"),
1049 table_id,
1050 None,
1051 );
1052 let failed_plan = test_plan(table_id);
1053 let succeeded_plan = RepartitionPlanEntry {
1054 group_id: Uuid::new_v4(),
1055 source_regions: vec![RegionDescriptor {
1056 region_id: RegionId::new(table_id, 2),
1057 partition_expr: range_expr("x", 100, 200),
1058 }],
1059 target_regions: vec![
1060 RegionDescriptor {
1061 region_id: RegionId::new(table_id, 2),
1062 partition_expr: range_expr("x", 100, 150),
1063 },
1064 RegionDescriptor {
1065 region_id: RegionId::new(table_id, 4),
1066 partition_expr: range_expr("x", 150, 200),
1067 },
1068 ],
1069 allocated_region_ids: vec![RegionId::new(table_id, 4)],
1070 pending_deallocate_region_ids: vec![],
1071 transition_map: vec![vec![0]],
1072 };
1073 persistent_ctx.plans = vec![failed_plan, succeeded_plan];
1074 persistent_ctx.failed_procedures = vec![ProcedureMeta {
1075 plan_index: 0,
1076 group_id: persistent_ctx.plans[0].group_id,
1077 procedure_id: ProcedureId::random(),
1078 }];
1079
1080 let context = Context::new(
1081 &ddl_ctx,
1082 env.mailbox_ctx.mailbox().clone(),
1083 env.server_addr.clone(),
1084 persistent_ctx,
1085 );
1086 let mut procedure = RepartitionProcedure {
1087 state: Box::new(Collect::new(vec![])),
1088 context,
1089 };
1090
1091 procedure
1092 .rollback(&TestingEnv::procedure_context())
1093 .await
1094 .unwrap();
1095
1096 let region_routes = current_parent_region_routes(&procedure.context).await;
1097 assert_eq!(region_routes.len(), 3);
1098 assert_eq!(region_routes[0].region.id, RegionId::new(table_id, 1));
1099 assert_eq!(region_routes[1].region.id, RegionId::new(table_id, 2));
1100 assert_eq!(region_routes[2].region.id, RegionId::new(table_id, 4));
1101 }
1102
1103 #[tokio::test]
1104 async fn test_repartition_rollback_is_idempotent() {
1105 let env = TestingEnv::new();
1106 let table_id = 1024;
1107 let node_manager = Arc::new(MockDatanodeManager::new(UnexpectedErrorDatanodeHandler));
1108 let ddl_ctx = env.ddl_context(node_manager);
1109 let original_region_routes = vec![
1110 test_region_route(
1111 RegionId::new(table_id, 1),
1112 &range_expr("x", 0, 100).as_json_str().unwrap(),
1113 ),
1114 test_region_route(
1115 RegionId::new(table_id, 2),
1116 &range_expr("x", 50, 100).as_json_str().unwrap(),
1117 ),
1118 test_region_route(RegionId::new(table_id, 3), ""),
1119 ];
1120 env.create_physical_table_metadata_with_wal_options(
1121 table_id,
1122 original_region_routes,
1123 test_region_wal_options(&[1, 2]),
1124 )
1125 .await;
1126
1127 let mut persistent_ctx = PersistentContext::new(
1128 TableName::new("test_catalog", "test_schema", "test_table"),
1129 table_id,
1130 None,
1131 );
1132 persistent_ctx.plans = vec![test_plan(table_id)];
1133 persistent_ctx.failed_procedures = vec![ProcedureMeta {
1134 plan_index: 0,
1135 group_id: Uuid::new_v4(),
1136 procedure_id: ProcedureId::random(),
1137 }];
1138 let context = Context::new(
1139 &ddl_ctx,
1140 env.mailbox_ctx.mailbox().clone(),
1141 env.server_addr.clone(),
1142 persistent_ctx,
1143 );
1144 let mut procedure = RepartitionProcedure {
1145 state: Box::new(Dispatch),
1146 context,
1147 };
1148
1149 procedure
1150 .rollback(&TestingEnv::procedure_context())
1151 .await
1152 .unwrap();
1153 let once = current_parent_region_routes(&procedure.context).await;
1154
1155 procedure
1156 .rollback(&TestingEnv::procedure_context())
1157 .await
1158 .unwrap();
1159 let twice = current_parent_region_routes(&procedure.context).await;
1160
1161 assert_eq!(once, twice);
1162 assert_eq!(once.len(), 2);
1163 assert_eq!(once[0].region.id, RegionId::new(table_id, 1));
1164 assert_eq!(once[1].region.id, RegionId::new(table_id, 2));
1165 }
1166
1167 #[tokio::test]
1168 async fn test_repartition_procedure_flow_split_failed_and_full_rollback() {
1169 let env = TestingEnv::new();
1170 let table_id = 1024;
1171 let node_manager = Arc::new(MockDatanodeManager::new(NaiveDatanodeHandler));
1172
1173 env.create_physical_table_metadata_for_repartition(
1174 table_id,
1175 vec![
1176 test_region_route(
1177 RegionId::new(table_id, 1),
1178 &range_expr("x", 0, 100).as_json_str().unwrap(),
1179 ),
1180 test_region_route(
1181 RegionId::new(table_id, 2),
1182 &range_expr("x", 100, 200).as_json_str().unwrap(),
1183 ),
1184 ],
1185 test_region_wal_options(&[1, 2]),
1186 )
1187 .await;
1188
1189 let context = new_parent_context(&env, node_manager, table_id);
1190 let mut procedure = RepartitionProcedure::new(
1191 vec![range_expr("x", 0, 100)],
1192 vec![range_expr("x", 0, 50), range_expr("x", 50, 100)],
1193 context,
1194 );
1195
1196 let start_status = procedure
1197 .execute(&TestingEnv::procedure_context())
1198 .await
1199 .unwrap();
1200 assert!(!start_status.need_persist());
1201 let start_status = procedure
1202 .execute(&TestingEnv::procedure_context())
1203 .await
1204 .unwrap();
1205 assert!(start_status.need_persist());
1206 assert_parent_state::<AllocateRegion>(&procedure);
1207
1208 let allocate_status = procedure
1209 .execute(&TestingEnv::procedure_context())
1210 .await
1211 .unwrap();
1212 assert!(allocate_status.need_persist());
1213 assert_parent_state::<Dispatch>(&procedure);
1214 assert_eq!(procedure.context.persistent_ctx.plans.len(), 1);
1215 let plan = &procedure.context.persistent_ctx.plans[0];
1216 let expected_plan = test_plan(table_id);
1217 assert_eq!(plan.source_regions, expected_plan.source_regions);
1218 assert_eq!(plan.target_regions, expected_plan.target_regions);
1219 assert_eq!(
1220 plan.allocated_region_ids,
1221 expected_plan.allocated_region_ids
1222 );
1223 assert_eq!(
1224 plan.pending_deallocate_region_ids,
1225 expected_plan.pending_deallocate_region_ids
1226 );
1227 assert_eq!(plan.transition_map, expected_plan.transition_map);
1228 assert_eq!(
1229 current_parent_region_routes(&procedure.context).await,
1230 vec![
1231 test_region_route(
1232 RegionId::new(table_id, 1),
1233 &range_expr("x", 0, 100).as_json_str().unwrap(),
1234 ),
1235 test_region_route(
1236 RegionId::new(table_id, 2),
1237 &range_expr("x", 100, 200).as_json_str().unwrap(),
1238 ),
1239 RegionRoute {
1240 region: Region {
1241 id: RegionId::new(table_id, 3),
1242 partition_expr: range_expr("x", 50, 100).as_json_str().unwrap(),
1243 ..Default::default()
1244 },
1245 leader_peer: Some(Peer::empty(0)),
1246 ..Default::default()
1247 },
1248 ]
1249 );
1250
1251 let dispatch_status = procedure
1252 .execute(&TestingEnv::procedure_context())
1253 .await
1254 .unwrap();
1255 assert!(!dispatch_status.need_persist());
1256 let subprocedure_ids = extract_subprocedure_ids(dispatch_status);
1257 assert_eq!(subprocedure_ids.len(), 1);
1258 assert_parent_state::<Collect>(&procedure);
1259
1260 let failed_state = ProcedureState::failed(Arc::new(ProcedureError::external(
1261 MockError::new(StatusCode::Internal),
1262 )));
1263 let collect_ctx = procedure_context_with_receivers(HashMap::from([(
1264 subprocedure_ids[0],
1265 procedure_state_receiver(failed_state),
1266 )]));
1267
1268 let err = procedure.execute(&collect_ctx).await.unwrap_err();
1269 assert!(!err.is_retry_later());
1270 assert_parent_state::<Collect>(&procedure);
1271
1272 procedure
1273 .rollback(&TestingEnv::procedure_context())
1274 .await
1275 .unwrap();
1276
1277 let region_routes = current_parent_region_routes(&procedure.context).await;
1278 assert_eq!(
1279 region_routes,
1280 vec![
1281 test_region_route(
1282 RegionId::new(table_id, 1),
1283 &range_expr("x", 0, 100).as_json_str().unwrap(),
1284 ),
1285 test_region_route(
1286 RegionId::new(table_id, 2),
1287 &range_expr("x", 100, 200).as_json_str().unwrap(),
1288 ),
1289 ]
1290 );
1291 }
1292
1293 #[tokio::test]
1294 async fn test_repartition_procedure_flow_split_allocate_retryable_then_resume() {
1295 common_telemetry::init_default_ut_logging();
1296 let env = TestingEnv::new();
1297 let table_id = 1024;
1298 let (tx, _rx) = mpsc::channel(8);
1299 let should_retry = Arc::new(AtomicBool::new(true));
1300 let datanode_handler = DatanodeWatcher::new(tx).with_handler(move |_, _| {
1301 if should_retry.swap(false, Ordering::SeqCst) {
1302 return Err(error::Error::RetryLater {
1303 source: BoxedError::new(
1304 error::UnexpectedSnafu {
1305 err_msg: "retry later",
1306 }
1307 .build(),
1308 ),
1309 clean_poisons: false,
1310 });
1311 }
1312
1313 Ok(api::region::RegionResponse::new(0))
1314 });
1315 let node_manager = Arc::new(MockDatanodeManager::new(datanode_handler));
1316
1317 env.create_physical_table_metadata_for_repartition(
1318 table_id,
1319 vec![
1320 test_region_route(
1321 RegionId::new(table_id, 1),
1322 &range_expr("x", 0, 100).as_json_str().unwrap(),
1323 ),
1324 test_region_route(
1325 RegionId::new(table_id, 2),
1326 &range_expr("x", 100, 200).as_json_str().unwrap(),
1327 ),
1328 ],
1329 test_region_wal_options(&[1, 2]),
1330 )
1331 .await;
1332
1333 let context = new_parent_context(&env, node_manager, table_id);
1334 let mut procedure = RepartitionProcedure::new(
1335 vec![range_expr("x", 0, 100)],
1336 vec![range_expr("x", 0, 50), range_expr("x", 50, 100)],
1337 context,
1338 );
1339
1340 let start_status = procedure
1341 .execute(&TestingEnv::procedure_context())
1342 .await
1343 .unwrap();
1344 assert!(!start_status.need_persist());
1345 let start_status = procedure
1346 .execute(&TestingEnv::procedure_context())
1347 .await
1348 .unwrap();
1349 assert!(start_status.need_persist());
1350 assert_parent_state::<AllocateRegion>(&procedure);
1351
1352 let err = procedure
1353 .execute(&TestingEnv::procedure_context())
1354 .await
1355 .unwrap_err();
1356 assert!(err.is_retry_later());
1357 assert_parent_state::<AllocateRegion>(&procedure);
1358 assert!(!procedure.context.persistent_ctx.plans.is_empty());
1359 assert_eq!(
1360 current_parent_region_routes(&procedure.context).await,
1361 vec![
1362 test_region_route(
1363 RegionId::new(table_id, 1),
1364 &range_expr("x", 0, 100).as_json_str().unwrap(),
1365 ),
1366 test_region_route(
1367 RegionId::new(table_id, 2),
1368 &range_expr("x", 100, 200).as_json_str().unwrap(),
1369 ),
1370 ]
1371 );
1372
1373 let allocate_status = procedure
1374 .execute(&TestingEnv::procedure_context())
1375 .await
1376 .unwrap();
1377 assert!(allocate_status.need_persist());
1378 assert_parent_state::<Dispatch>(&procedure);
1379
1380 assert_eq!(procedure.context.persistent_ctx.plans.len(), 1);
1381 let plan = &procedure.context.persistent_ctx.plans[0];
1382 let expected_plan = test_plan(table_id);
1383 assert_eq!(plan.source_regions, expected_plan.source_regions);
1384 assert_eq!(plan.target_regions, expected_plan.target_regions);
1385 assert_eq!(
1386 plan.allocated_region_ids,
1387 expected_plan.allocated_region_ids
1388 );
1389 assert_eq!(plan.transition_map, expected_plan.transition_map);
1390 assert_eq!(
1391 current_parent_region_routes(&procedure.context).await,
1392 vec![
1393 test_region_route(
1394 RegionId::new(table_id, 1),
1395 &range_expr("x", 0, 100).as_json_str().unwrap(),
1396 ),
1397 test_region_route(
1398 RegionId::new(table_id, 2),
1399 &range_expr("x", 100, 200).as_json_str().unwrap(),
1400 ),
1401 RegionRoute {
1402 region: Region {
1403 id: RegionId::new(table_id, 3),
1404 partition_expr: range_expr("x", 50, 100).as_json_str().unwrap(),
1405 ..Default::default()
1406 },
1407 leader_peer: Some(Peer::empty(0)),
1408 ..Default::default()
1409 },
1410 ]
1411 );
1412
1413 let dispatch_status = procedure
1414 .execute(&TestingEnv::procedure_context())
1415 .await
1416 .unwrap();
1417 assert!(!dispatch_status.need_persist());
1418 let subprocedure_ids = extract_subprocedure_ids(dispatch_status);
1419 assert_eq!(subprocedure_ids.len(), 1);
1420 assert_parent_state::<Collect>(&procedure);
1421 }
1422}