1pub mod allocate_region;
16pub mod collect;
17pub mod deallocate_region;
18pub mod dispatch;
19pub mod group;
20pub mod plan;
21pub mod repartition_end;
22pub mod repartition_start;
23pub mod utils;
24
25use std::any::Any;
26use std::collections::HashMap;
27use std::fmt::{Debug, Display};
28use std::time::{Duration, Instant};
29
30use common_error::ext::BoxedError;
31use common_meta::cache_invalidator::CacheInvalidatorRef;
32use common_meta::ddl::DdlContext;
33use common_meta::ddl::allocator::region_routes::RegionRoutesAllocatorRef;
34use common_meta::ddl::allocator::wal_options::WalOptionsAllocatorRef;
35use common_meta::ddl_manager::RepartitionProcedureFactory;
36use common_meta::instruction::CacheIdent;
37use common_meta::key::datanode_table::RegionInfo;
38use common_meta::key::table_info::TableInfoValue;
39use common_meta::key::table_route::TableRouteValue;
40use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
41use common_meta::lock_key::{CatalogLock, SchemaLock, TableLock, TableNameLock};
42use common_meta::node_manager::NodeManagerRef;
43use common_meta::region_keeper::MemoryRegionKeeperRef;
44use common_meta::region_registry::LeaderRegionRegistryRef;
45use common_meta::rpc::router::RegionRoute;
46use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
47use common_procedure::{
48 BoxedProcedure, Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
49 ProcedureManagerRef, Result as ProcedureResult, Status, StringKey, UserMetadata,
50};
51use common_telemetry::{error, info};
52use partition::expr::PartitionExpr;
53use serde::{Deserialize, Serialize};
54use snafu::{OptionExt, ResultExt};
55use store_api::storage::{RegionNumber, TableId};
56use table::table_name::TableName;
57
58use crate::error::{self, Result};
59use crate::procedure::repartition::group::{
60 Context as RepartitionGroupContext, RepartitionGroupProcedure,
61};
62use crate::procedure::repartition::plan::RepartitionPlanEntry;
63use crate::procedure::repartition::repartition_start::RepartitionStart;
64use crate::procedure::repartition::utils::get_datanode_table_value;
65use crate::service::mailbox::MailboxRef;
66
67#[cfg(test)]
68pub mod test_util;
69
70#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
71pub struct PersistentContext {
72 pub catalog_name: String,
73 pub schema_name: String,
74 pub table_name: String,
75 pub table_id: TableId,
76 pub plans: Vec<RepartitionPlanEntry>,
77 #[serde(with = "humantime_serde", default = "default_timeout")]
79 pub timeout: Duration,
80}
81
82fn default_timeout() -> Duration {
83 Duration::from_mins(2)
84}
85
86impl PersistentContext {
87 pub fn new(
91 TableName {
92 catalog_name,
93 schema_name,
94 table_name,
95 }: TableName,
96 table_id: TableId,
97 timeout: Option<Duration>,
98 ) -> Self {
99 Self {
100 catalog_name,
101 schema_name,
102 table_name,
103 table_id,
104 plans: vec![],
105 timeout: timeout.unwrap_or_else(default_timeout),
106 }
107 }
108
109 pub fn lock_key(&self) -> Vec<StringKey> {
110 vec![
111 CatalogLock::Read(&self.catalog_name).into(),
112 SchemaLock::read(&self.catalog_name, &self.schema_name).into(),
113 TableLock::Write(self.table_id).into(),
114 TableNameLock::new(&self.catalog_name, &self.schema_name, &self.table_name).into(),
115 ]
116 }
117}
118
119#[derive(Clone)]
120pub struct Context {
121 pub persistent_ctx: PersistentContext,
122 pub volatile_ctx: VolatileContext,
123 pub table_metadata_manager: TableMetadataManagerRef,
124 pub memory_region_keeper: MemoryRegionKeeperRef,
125 pub node_manager: NodeManagerRef,
126 pub leader_region_registry: LeaderRegionRegistryRef,
127 pub mailbox: MailboxRef,
128 pub server_addr: String,
129 pub cache_invalidator: CacheInvalidatorRef,
130 pub region_routes_allocator: RegionRoutesAllocatorRef,
131 pub wal_options_allocator: WalOptionsAllocatorRef,
132 pub start_time: Instant,
133}
134
135#[derive(Debug, Clone, Default)]
136pub struct VolatileContext {
137 pub metrics: Metrics,
138 pub dispatch_start_time: Option<Instant>,
139}
140
141#[derive(Debug, Clone, Default)]
143pub struct Metrics {
144 build_plan_elapsed: Duration,
146 allocate_region_elapsed: Duration,
148 finish_groups_elapsed: Duration,
150 deallocate_region_elapsed: Duration,
152}
153
154impl Display for Metrics {
155 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
156 let total = self.build_plan_elapsed
157 + self.allocate_region_elapsed
158 + self.finish_groups_elapsed
159 + self.deallocate_region_elapsed;
160 write!(f, "total: {:?}", total)?;
161 let mut parts = Vec::with_capacity(4);
162 if self.build_plan_elapsed > Duration::ZERO {
163 parts.push(format!("build_plan_elapsed: {:?}", self.build_plan_elapsed));
164 }
165 if self.allocate_region_elapsed > Duration::ZERO {
166 parts.push(format!(
167 "allocate_region_elapsed: {:?}",
168 self.allocate_region_elapsed
169 ));
170 }
171 if self.finish_groups_elapsed > Duration::ZERO {
172 parts.push(format!(
173 "finish_groups_elapsed: {:?}",
174 self.finish_groups_elapsed
175 ));
176 }
177 if self.deallocate_region_elapsed > Duration::ZERO {
178 parts.push(format!(
179 "deallocate_region_elapsed: {:?}",
180 self.deallocate_region_elapsed
181 ));
182 }
183
184 if !parts.is_empty() {
185 write!(f, ", {}", parts.join(", "))?;
186 }
187 Ok(())
188 }
189}
190
191impl Metrics {
192 pub fn update_build_plan_elapsed(&mut self, elapsed: Duration) {
194 self.build_plan_elapsed += elapsed;
195 }
196
197 pub fn update_allocate_region_elapsed(&mut self, elapsed: Duration) {
199 self.allocate_region_elapsed += elapsed;
200 }
201
202 pub fn update_finish_groups_elapsed(&mut self, elapsed: Duration) {
204 self.finish_groups_elapsed += elapsed;
205 }
206
207 pub fn update_deallocate_region_elapsed(&mut self, elapsed: Duration) {
209 self.deallocate_region_elapsed += elapsed;
210 }
211}
212
213impl Context {
214 pub fn new(
215 ddl_ctx: &DdlContext,
216 mailbox: MailboxRef,
217 server_addr: String,
218 persistent_ctx: PersistentContext,
219 ) -> Self {
220 Self {
221 persistent_ctx,
222 table_metadata_manager: ddl_ctx.table_metadata_manager.clone(),
223 memory_region_keeper: ddl_ctx.memory_region_keeper.clone(),
224 node_manager: ddl_ctx.node_manager.clone(),
225 leader_region_registry: ddl_ctx.leader_region_registry.clone(),
226 mailbox,
227 server_addr,
228 cache_invalidator: ddl_ctx.cache_invalidator.clone(),
229 region_routes_allocator: ddl_ctx.table_metadata_allocator.region_routes_allocator(),
230 wal_options_allocator: ddl_ctx.table_metadata_allocator.wal_options_allocator(),
231 start_time: Instant::now(),
232 volatile_ctx: VolatileContext::default(),
233 }
234 }
235
236 pub fn next_operation_timeout(&self) -> Option<Duration> {
238 self.persistent_ctx
239 .timeout
240 .checked_sub(self.start_time.elapsed())
241 }
242
243 pub fn update_build_plan_elapsed(&mut self, elapsed: Duration) {
245 self.volatile_ctx.metrics.update_build_plan_elapsed(elapsed);
246 }
247
248 pub fn update_allocate_region_elapsed(&mut self, elapsed: Duration) {
250 self.volatile_ctx
251 .metrics
252 .update_allocate_region_elapsed(elapsed);
253 }
254
255 pub fn update_finish_groups_elapsed(&mut self, elapsed: Duration) {
257 self.volatile_ctx
258 .metrics
259 .update_finish_groups_elapsed(elapsed);
260 }
261
262 pub fn update_deallocate_region_elapsed(&mut self, elapsed: Duration) {
264 self.volatile_ctx
265 .metrics
266 .update_deallocate_region_elapsed(elapsed);
267 }
268
269 pub async fn get_table_route_value(
277 &self,
278 ) -> Result<DeserializedValueWithBytes<TableRouteValue>> {
279 let table_id = self.persistent_ctx.table_id;
280 let table_route_value = self
281 .table_metadata_manager
282 .table_route_manager()
283 .table_route_storage()
284 .get_with_raw_bytes(table_id)
285 .await
286 .map_err(BoxedError::new)
287 .with_context(|_| error::RetryLaterWithSourceSnafu {
288 reason: format!("Failed to get table route for table: {}", table_id),
289 })?
290 .context(error::TableRouteNotFoundSnafu { table_id })?;
291
292 Ok(table_route_value)
293 }
294
295 pub async fn get_table_info_value(&self) -> Result<TableInfoValue> {
303 let table_id = self.persistent_ctx.table_id;
304 let table_info_value = self
305 .table_metadata_manager
306 .table_info_manager()
307 .get(table_id)
308 .await
309 .map_err(BoxedError::new)
310 .with_context(|_| error::RetryLaterWithSourceSnafu {
311 reason: format!("Failed to get table info for table: {}", table_id),
312 })?
313 .context(error::TableInfoNotFoundSnafu { table_id })?
314 .into_inner();
315 Ok(table_info_value)
316 }
317
318 pub async fn update_table_route(
327 &self,
328 current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
329 new_region_routes: Vec<RegionRoute>,
330 new_region_wal_options: HashMap<RegionNumber, String>,
331 ) -> Result<()> {
332 let table_id = self.persistent_ctx.table_id;
333 if new_region_routes.is_empty() {
334 return error::UnexpectedSnafu {
335 violated: format!("new_region_routes is empty for table: {}", table_id),
336 }
337 .fail();
338 }
339 let datanode_id = new_region_routes
340 .first()
341 .unwrap()
342 .leader_peer
343 .as_ref()
344 .context(error::NoLeaderSnafu)?
345 .id;
346 let datanode_table_value =
347 get_datanode_table_value(&self.table_metadata_manager, table_id, datanode_id).await?;
348
349 let RegionInfo {
350 region_options,
351 region_wal_options,
352 ..
353 } = &datanode_table_value.region_info;
354
355 let validated_region_wal_options =
357 crate::procedure::repartition::utils::merge_and_validate_region_wal_options(
358 region_wal_options,
359 new_region_wal_options,
360 &new_region_routes,
361 table_id,
362 )?;
363 info!(
364 "Updating table route for table: {}, new region routes: {:?}",
365 table_id, new_region_routes
366 );
367 self.table_metadata_manager
368 .update_table_route(
369 table_id,
370 datanode_table_value.region_info.clone(),
371 current_table_route_value,
372 new_region_routes,
373 region_options,
374 &validated_region_wal_options,
375 )
376 .await
377 .context(error::TableMetadataManagerSnafu)
378 }
379
380 pub async fn invalidate_table_cache(&self) -> Result<()> {
382 let table_id = self.persistent_ctx.table_id;
383 let subject = format!(
384 "Invalidate table cache for repartition table, table: {}",
385 table_id,
386 );
387 let ctx = common_meta::cache_invalidator::Context {
388 subject: Some(subject),
389 };
390 let _ = self
391 .cache_invalidator
392 .invalidate(&ctx, &[CacheIdent::TableId(table_id)])
393 .await;
394 Ok(())
395 }
396}
397
398#[async_trait::async_trait]
399#[typetag::serde(tag = "repartition_state")]
400pub(crate) trait State: Sync + Send + Debug {
401 fn name(&self) -> &'static str {
402 let type_name = std::any::type_name::<Self>();
403 type_name.split("::").last().unwrap_or(type_name)
405 }
406
407 async fn next(
409 &mut self,
410 ctx: &mut Context,
411 procedure_ctx: &ProcedureContext,
412 ) -> Result<(Box<dyn State>, Status)>;
413
414 fn as_any(&self) -> &dyn Any;
415}
416
417pub struct RepartitionProcedure {
418 state: Box<dyn State>,
419 context: Context,
420}
421
422#[derive(Debug, Serialize)]
423struct RepartitionData<'a> {
424 state: &'a dyn State,
425 persistent_ctx: &'a PersistentContext,
426}
427
428#[derive(Debug, Deserialize)]
429struct RepartitionDataOwned {
430 state: Box<dyn State>,
431 persistent_ctx: PersistentContext,
432}
433
434impl RepartitionProcedure {
435 const TYPE_NAME: &'static str = "metasrv-procedure::Repartition";
436
437 pub fn new(
438 from_exprs: Vec<PartitionExpr>,
439 to_exprs: Vec<PartitionExpr>,
440 context: Context,
441 ) -> Self {
442 let state = Box::new(RepartitionStart::new(from_exprs, to_exprs));
443
444 Self { state, context }
445 }
446
447 pub fn from_json<F>(json: &str, ctx_factory: F) -> ProcedureResult<Self>
448 where
449 F: FnOnce(PersistentContext) -> Context,
450 {
451 let RepartitionDataOwned {
452 state,
453 persistent_ctx,
454 } = serde_json::from_str(json).context(FromJsonSnafu)?;
455 let context = ctx_factory(persistent_ctx);
456
457 Ok(Self { state, context })
458 }
459}
460
461#[async_trait::async_trait]
462impl Procedure for RepartitionProcedure {
463 fn type_name(&self) -> &str {
464 Self::TYPE_NAME
465 }
466
467 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
468 let state = &mut self.state;
469 let state_name = state.name();
470 common_telemetry::info!(
472 "Repartition procedure executing state: {}, table_id: {}",
473 state_name,
474 self.context.persistent_ctx.table_id
475 );
476 match state.next(&mut self.context, _ctx).await {
477 Ok((next, status)) => {
478 *state = next;
479 Ok(status)
480 }
481 Err(e) => {
482 if e.is_retryable() {
483 Err(ProcedureError::retry_later(e))
484 } else {
485 error!(
486 e;
487 "Repartition procedure failed, table id: {}",
488 self.context.persistent_ctx.table_id,
489 );
490 Err(ProcedureError::external(e))
491 }
492 }
493 }
494 }
495
496 fn rollback_supported(&self) -> bool {
497 false
499 }
500
501 fn dump(&self) -> ProcedureResult<String> {
502 let data = RepartitionData {
503 state: self.state.as_ref(),
504 persistent_ctx: &self.context.persistent_ctx,
505 };
506 serde_json::to_string(&data).context(ToJsonSnafu)
507 }
508
509 fn lock_key(&self) -> LockKey {
510 LockKey::new(self.context.persistent_ctx.lock_key())
511 }
512
513 fn user_metadata(&self) -> Option<UserMetadata> {
514 None
516 }
517}
518
519pub struct DefaultRepartitionProcedureFactory {
520 mailbox: MailboxRef,
521 server_addr: String,
522}
523
524impl DefaultRepartitionProcedureFactory {
525 pub fn new(mailbox: MailboxRef, server_addr: String) -> Self {
526 Self {
527 mailbox,
528 server_addr,
529 }
530 }
531}
532
533impl RepartitionProcedureFactory for DefaultRepartitionProcedureFactory {
534 fn create(
535 &self,
536 ddl_ctx: &DdlContext,
537 table_name: TableName,
538 table_id: TableId,
539 from_exprs: Vec<String>,
540 to_exprs: Vec<String>,
541 timeout: Option<Duration>,
542 ) -> std::result::Result<BoxedProcedure, BoxedError> {
543 let persistent_ctx = PersistentContext::new(table_name, table_id, timeout);
544 let from_exprs = from_exprs
545 .iter()
546 .map(|e| {
547 PartitionExpr::from_json_str(e)
548 .context(error::DeserializePartitionExprSnafu)?
549 .context(error::EmptyPartitionExprSnafu)
550 })
551 .collect::<Result<Vec<_>>>()
552 .map_err(BoxedError::new)?;
553 let to_exprs = to_exprs
554 .iter()
555 .map(|e| {
556 PartitionExpr::from_json_str(e)
557 .context(error::DeserializePartitionExprSnafu)?
558 .context(error::EmptyPartitionExprSnafu)
559 })
560 .collect::<Result<Vec<_>>>()
561 .map_err(BoxedError::new)?;
562
563 let procedure = RepartitionProcedure::new(
564 from_exprs,
565 to_exprs,
566 Context::new(
567 ddl_ctx,
568 self.mailbox.clone(),
569 self.server_addr.clone(),
570 persistent_ctx,
571 ),
572 );
573
574 Ok(Box::new(procedure))
575 }
576
577 fn register_loaders(
578 &self,
579 ddl_ctx: &DdlContext,
580 procedure_manager: &ProcedureManagerRef,
581 ) -> std::result::Result<(), BoxedError> {
582 let mailbox = self.mailbox.clone();
584 let server_addr = self.server_addr.clone();
585 let moved_ddl_ctx = ddl_ctx.clone();
586 procedure_manager
587 .register_loader(
588 RepartitionProcedure::TYPE_NAME,
589 Box::new(move |json| {
590 let mailbox = mailbox.clone();
591 let server_addr = server_addr.clone();
592 let ddl_ctx = moved_ddl_ctx.clone();
593 let factory = move |persistent_ctx| {
594 Context::new(&ddl_ctx, mailbox, server_addr, persistent_ctx)
595 };
596 RepartitionProcedure::from_json(json, factory).map(|p| Box::new(p) as _)
597 }),
598 )
599 .map_err(BoxedError::new)?;
600
601 let mailbox = self.mailbox.clone();
603 let server_addr = self.server_addr.clone();
604 let moved_ddl_ctx = ddl_ctx.clone();
605 procedure_manager
606 .register_loader(
607 RepartitionGroupProcedure::TYPE_NAME,
608 Box::new(move |json| {
609 let mailbox = mailbox.clone();
610 let server_addr = server_addr.clone();
611 let ddl_ctx = moved_ddl_ctx.clone();
612 let factory = move |persistent_ctx| {
613 RepartitionGroupContext::new(&ddl_ctx, mailbox, server_addr, persistent_ctx)
614 };
615 RepartitionGroupProcedure::from_json(json, factory).map(|p| Box::new(p) as _)
616 }),
617 )
618 .map_err(BoxedError::new)?;
619
620 Ok(())
621 }
622}