1pub(crate) mod apply_staging_manifest;
16pub(crate) mod enter_staging_region;
17pub(crate) mod remap_manifest;
18pub(crate) mod repartition_end;
19pub(crate) mod repartition_start;
20pub(crate) mod sync_region;
21pub(crate) mod update_metadata;
22pub(crate) mod utils;
23
24use std::any::Any;
25use std::collections::HashMap;
26use std::fmt::{Debug, Display};
27use std::time::{Duration, Instant};
28
29use common_error::ext::BoxedError;
30use common_meta::cache_invalidator::CacheInvalidatorRef;
31use common_meta::ddl::DdlContext;
32use common_meta::instruction::CacheIdent;
33use common_meta::key::datanode_table::{DatanodeTableValue, RegionInfo};
34use common_meta::key::table_route::TableRouteValue;
35use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
36use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock};
37use common_meta::peer::Peer;
38use common_meta::rpc::router::RegionRoute;
39use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
40use common_procedure::{
41 Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
42 Result as ProcedureResult, Status, StringKey, UserMetadata,
43};
44use common_telemetry::{error, info};
45use serde::{Deserialize, Serialize};
46use snafu::{OptionExt, ResultExt};
47use store_api::storage::{RegionId, TableId};
48use uuid::Uuid;
49
50use crate::error::{self, Result};
51use crate::procedure::repartition::group::repartition_start::RepartitionStart;
52use crate::procedure::repartition::plan::RegionDescriptor;
53use crate::procedure::repartition::utils::get_datanode_table_value;
54use crate::procedure::repartition::{self};
55use crate::service::mailbox::MailboxRef;
56
57#[derive(Debug, Clone, Default)]
58pub struct Metrics {
59 flush_pending_deallocate_regions_elapsed: Duration,
61 enter_staging_region_elapsed: Duration,
63 apply_staging_manifest_elapsed: Duration,
65 remap_manifest_elapsed: Duration,
67 update_metadata_elapsed: Duration,
69}
70
71impl Display for Metrics {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 let total = self.flush_pending_deallocate_regions_elapsed
74 + self.enter_staging_region_elapsed
75 + self.apply_staging_manifest_elapsed
76 + self.remap_manifest_elapsed
77 + self.update_metadata_elapsed;
78 write!(f, "total: {:?}", total)?;
79 let mut parts = Vec::with_capacity(5);
80 if self.flush_pending_deallocate_regions_elapsed > Duration::ZERO {
81 parts.push(format!(
82 "flush_pending_deallocate_regions_elapsed: {:?}",
83 self.flush_pending_deallocate_regions_elapsed
84 ));
85 }
86 if self.enter_staging_region_elapsed > Duration::ZERO {
87 parts.push(format!(
88 "enter_staging_region_elapsed: {:?}",
89 self.enter_staging_region_elapsed
90 ));
91 }
92 if self.apply_staging_manifest_elapsed > Duration::ZERO {
93 parts.push(format!(
94 "apply_staging_manifest_elapsed: {:?}",
95 self.apply_staging_manifest_elapsed
96 ));
97 }
98 if self.remap_manifest_elapsed > Duration::ZERO {
99 parts.push(format!(
100 "remap_manifest_elapsed: {:?}",
101 self.remap_manifest_elapsed
102 ));
103 }
104 if self.update_metadata_elapsed > Duration::ZERO {
105 parts.push(format!(
106 "update_metadata_elapsed: {:?}",
107 self.update_metadata_elapsed
108 ));
109 }
110
111 if !parts.is_empty() {
112 write!(f, ", {}", parts.join(", "))?;
113 }
114 Ok(())
115 }
116}
117
118impl Metrics {
119 pub fn update_enter_staging_region_elapsed(&mut self, elapsed: Duration) {
121 self.enter_staging_region_elapsed += elapsed;
122 }
123
124 pub fn update_flush_pending_deallocate_regions_elapsed(&mut self, elapsed: Duration) {
125 self.flush_pending_deallocate_regions_elapsed += elapsed;
126 }
127
128 pub fn update_apply_staging_manifest_elapsed(&mut self, elapsed: Duration) {
130 self.apply_staging_manifest_elapsed += elapsed;
131 }
132
133 pub fn update_remap_manifest_elapsed(&mut self, elapsed: Duration) {
135 self.remap_manifest_elapsed += elapsed;
136 }
137
138 pub fn update_update_metadata_elapsed(&mut self, elapsed: Duration) {
140 self.update_metadata_elapsed += elapsed;
141 }
142}
143
144pub type GroupId = Uuid;
145
146pub struct RepartitionGroupProcedure {
147 state: Box<dyn State>,
148 context: Context,
149}
150
151#[derive(Debug, Serialize)]
152struct RepartitionGroupData<'a> {
153 persistent_ctx: &'a PersistentContext,
154 state: &'a dyn State,
155}
156
157#[derive(Debug, Deserialize)]
158struct RepartitionGroupDataOwned {
159 persistent_ctx: PersistentContext,
160 state: Box<dyn State>,
161}
162
163impl RepartitionGroupProcedure {
164 pub(crate) const TYPE_NAME: &'static str = "metasrv-procedure::RepartitionGroup";
165
166 pub fn new(persistent_context: PersistentContext, context: &repartition::Context) -> Self {
167 let state = Box::new(RepartitionStart);
168
169 Self {
170 state,
171 context: Context {
172 persistent_ctx: persistent_context,
173 cache_invalidator: context.cache_invalidator.clone(),
174 table_metadata_manager: context.table_metadata_manager.clone(),
175 mailbox: context.mailbox.clone(),
176 server_addr: context.server_addr.clone(),
177 start_time: Instant::now(),
178 volatile_ctx: VolatileContext::default(),
179 },
180 }
181 }
182
183 pub fn from_json<F>(json: &str, ctx_factory: F) -> ProcedureResult<Self>
184 where
185 F: FnOnce(PersistentContext) -> Context,
186 {
187 let RepartitionGroupDataOwned {
188 state,
189 persistent_ctx,
190 } = serde_json::from_str(json).context(FromJsonSnafu)?;
191 let context = ctx_factory(persistent_ctx);
192
193 Ok(Self { state, context })
194 }
195}
196
197#[async_trait::async_trait]
198impl Procedure for RepartitionGroupProcedure {
199 fn type_name(&self) -> &str {
200 Self::TYPE_NAME
201 }
202
203 async fn rollback(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<()> {
204 Ok(())
207 }
208
209 #[tracing::instrument(skip_all, fields(
210 state = %self.state.name(),
211 table_id = self.context.persistent_ctx.table_id,
212 group_id = %self.context.persistent_ctx.group_id,
213 ))]
214 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
215 let state = &mut self.state;
216 let state_name = state.name();
217 common_telemetry::info!(
219 "Repartition group procedure executing state: {}, group id: {}, table id: {}",
220 state_name,
221 self.context.persistent_ctx.group_id,
222 self.context.persistent_ctx.table_id
223 );
224
225 match state.next(&mut self.context, _ctx).await {
226 Ok((next, status)) => {
227 *state = next;
228 Ok(status)
229 }
230 Err(e) => {
231 if e.is_retryable() {
232 Err(ProcedureError::retry_later(e))
233 } else {
234 error!(
235 e;
236 "Repartition group procedure failed, group id: {}, table id: {}",
237 self.context.persistent_ctx.group_id,
238 self.context.persistent_ctx.table_id,
239 );
240 Err(ProcedureError::external(e))
241 }
242 }
243 }
244 }
245
246 fn rollback_supported(&self) -> bool {
247 false
250 }
251
252 fn dump(&self) -> ProcedureResult<String> {
253 let data = RepartitionGroupData {
254 persistent_ctx: &self.context.persistent_ctx,
255 state: self.state.as_ref(),
256 };
257 serde_json::to_string(&data).context(ToJsonSnafu)
258 }
259
260 fn lock_key(&self) -> LockKey {
261 LockKey::new(self.context.persistent_ctx.lock_key())
262 }
263
264 fn user_metadata(&self) -> Option<UserMetadata> {
265 None
267 }
268}
269
270pub struct Context {
271 pub persistent_ctx: PersistentContext,
272
273 pub cache_invalidator: CacheInvalidatorRef,
274
275 pub table_metadata_manager: TableMetadataManagerRef,
276
277 pub mailbox: MailboxRef,
278
279 pub server_addr: String,
280
281 pub start_time: Instant,
282
283 pub volatile_ctx: VolatileContext,
284}
285
286#[derive(Debug, Clone, Default)]
287pub struct VolatileContext {
288 pub metrics: Metrics,
289}
290
291impl Context {
292 pub fn new(
293 ddl_ctx: &DdlContext,
294 mailbox: MailboxRef,
295 server_addr: String,
296 persistent_ctx: PersistentContext,
297 ) -> Self {
298 Self {
299 persistent_ctx,
300 cache_invalidator: ddl_ctx.cache_invalidator.clone(),
301 table_metadata_manager: ddl_ctx.table_metadata_manager.clone(),
302 mailbox,
303 server_addr,
304 start_time: Instant::now(),
305 volatile_ctx: VolatileContext::default(),
306 }
307 }
308}
309
310#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
312pub struct GroupPrepareResult {
313 pub source_routes: Vec<RegionRoute>,
315 pub target_routes: Vec<RegionRoute>,
317 pub central_region: RegionId,
319 pub central_region_datanode: Peer,
321}
322
323#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
324pub struct PersistentContext {
325 pub group_id: GroupId,
326 pub table_id: TableId,
328 pub catalog_name: String,
330 pub schema_name: String,
332 pub sources: Vec<RegionDescriptor>,
334 pub targets: Vec<RegionDescriptor>,
336 pub region_mapping: HashMap<RegionId, Vec<RegionId>>,
339 pub group_prepare_result: Option<GroupPrepareResult>,
342 pub staging_manifest_paths: HashMap<RegionId, String>,
345 pub sync_region: bool,
347 pub allocated_region_ids: Vec<RegionId>,
349 pub pending_deallocate_region_ids: Vec<RegionId>,
351 #[serde(with = "humantime_serde")]
353 pub timeout: Duration,
354}
355
356impl PersistentContext {
357 #[allow(clippy::too_many_arguments)]
358 pub fn new(
359 group_id: GroupId,
360 table_id: TableId,
361 catalog_name: String,
362 schema_name: String,
363 sources: Vec<RegionDescriptor>,
364 targets: Vec<RegionDescriptor>,
365 region_mapping: HashMap<RegionId, Vec<RegionId>>,
366 sync_region: bool,
367 allocated_region_ids: Vec<RegionId>,
368 pending_deallocate_region_ids: Vec<RegionId>,
369 timeout: Duration,
370 ) -> Self {
371 Self {
372 group_id,
373 table_id,
374 catalog_name,
375 schema_name,
376 sources,
377 targets,
378 region_mapping,
379 group_prepare_result: None,
380 staging_manifest_paths: HashMap::new(),
381 sync_region,
382 allocated_region_ids,
383 pending_deallocate_region_ids,
384 timeout,
385 }
386 }
387
388 pub fn lock_key(&self) -> Vec<StringKey> {
389 let mut lock_keys = Vec::with_capacity(2 + self.sources.len());
390 lock_keys.extend([
391 CatalogLock::Read(&self.catalog_name).into(),
392 SchemaLock::read(&self.catalog_name, &self.schema_name).into(),
393 ]);
394 for source in &self.sources {
395 lock_keys.push(RegionLock::Write(source.region_id).into());
396 }
397 lock_keys
398 }
399}
400
401impl Context {
402 pub async fn get_table_route_value(
410 &self,
411 ) -> Result<DeserializedValueWithBytes<TableRouteValue>> {
412 let table_id = self.persistent_ctx.table_id;
413 let group_id = self.persistent_ctx.group_id;
414 let table_route_value = self
415 .table_metadata_manager
416 .table_route_manager()
417 .table_route_storage()
418 .get_with_raw_bytes(table_id)
419 .await
420 .map_err(BoxedError::new)
421 .with_context(|_| error::RetryLaterWithSourceSnafu {
422 reason: format!(
423 "Failed to get table route for table: {}, repartition group: {}",
424 table_id, group_id
425 ),
426 })?
427 .context(error::TableRouteNotFoundSnafu { table_id })?;
428
429 Ok(table_route_value)
430 }
431
432 pub async fn get_datanode_table_value(
437 &self,
438 table_id: TableId,
439 datanode_id: u64,
440 ) -> Result<DatanodeTableValue> {
441 get_datanode_table_value(&self.table_metadata_manager, table_id, datanode_id).await
442 }
443
444 pub async fn invalidate_table_cache(&self) -> Result<()> {
446 let table_id = self.persistent_ctx.table_id;
447 let group_id = self.persistent_ctx.group_id;
448 let subject = format!(
449 "Invalidate table cache for repartition table, group: {}, table: {}",
450 group_id, table_id,
451 );
452 let ctx = common_meta::cache_invalidator::Context {
453 subject: Some(subject),
454 };
455 let _ = self
456 .cache_invalidator
457 .invalidate(&ctx, &[CacheIdent::TableId(table_id)])
458 .await;
459 Ok(())
460 }
461
462 pub async fn update_table_route(
471 &self,
472 current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
473 new_region_routes: Vec<RegionRoute>,
474 ) -> Result<()> {
475 let table_id = self.persistent_ctx.table_id;
476 let group_id = self.persistent_ctx.group_id;
477 let prepare_result = self.persistent_ctx.group_prepare_result.as_ref().unwrap();
479 let central_region_datanode_table_value = self
480 .get_datanode_table_value(table_id, prepare_result.central_region_datanode.id)
481 .await?;
482 let RegionInfo {
483 region_options,
484 region_wal_options,
485 ..
486 } = ¢ral_region_datanode_table_value.region_info;
487
488 info!(
489 "Updating table route for table: {}, group_id: {}, new region routes: {:?}",
490 table_id, group_id, new_region_routes
491 );
492 self.table_metadata_manager
493 .update_table_route(
494 table_id,
495 central_region_datanode_table_value.region_info.clone(),
496 current_table_route_value,
497 new_region_routes,
498 region_options,
499 region_wal_options,
500 )
501 .await
502 .context(error::TableMetadataManagerSnafu)
503 }
504
505 pub async fn update_table_repart_mapping(&self) -> Result<()> {
507 info!(
508 "Updating table repart mapping for table: {}, group_id: {}, region mapping: {:?}",
509 self.persistent_ctx.table_id,
510 self.persistent_ctx.group_id,
511 self.persistent_ctx.region_mapping
512 );
513
514 self.table_metadata_manager
515 .table_repart_manager()
516 .update_mappings(
517 self.persistent_ctx.table_id,
518 &self.persistent_ctx.region_mapping,
519 )
520 .await
521 .context(error::TableMetadataManagerSnafu)
522 }
523
524 pub fn next_operation_timeout(&self) -> Option<Duration> {
528 self.persistent_ctx
529 .timeout
530 .checked_sub(self.start_time.elapsed())
531 }
532
533 pub fn update_enter_staging_region_elapsed(&mut self, elapsed: Duration) {
535 self.volatile_ctx
536 .metrics
537 .update_enter_staging_region_elapsed(elapsed);
538 }
539
540 pub fn update_flush_pending_deallocate_regions_elapsed(&mut self, elapsed: Duration) {
542 self.volatile_ctx
543 .metrics
544 .update_flush_pending_deallocate_regions_elapsed(elapsed);
545 }
546
547 pub fn update_apply_staging_manifest_elapsed(&mut self, elapsed: Duration) {
549 self.volatile_ctx
550 .metrics
551 .update_apply_staging_manifest_elapsed(elapsed);
552 }
553
554 pub fn update_remap_manifest_elapsed(&mut self, elapsed: Duration) {
556 self.volatile_ctx
557 .metrics
558 .update_remap_manifest_elapsed(elapsed);
559 }
560
561 pub fn update_update_metadata_elapsed(&mut self, elapsed: Duration) {
563 self.volatile_ctx
564 .metrics
565 .update_update_metadata_elapsed(elapsed);
566 }
567}
568
569pub fn region_routes(
574 table_id: TableId,
575 table_route_value: &TableRouteValue,
576) -> Result<&Vec<RegionRoute>> {
577 table_route_value
578 .region_routes()
579 .with_context(|_| error::UnexpectedLogicalRouteTableSnafu {
580 err_msg: format!(
581 "TableRoute({:?}) is a non-physical TableRouteValue.",
582 table_id
583 ),
584 })
585}
586
587#[async_trait::async_trait]
588#[typetag::serde(tag = "repartition_group_state")]
589pub(crate) trait State: Sync + Send + Debug {
590 fn name(&self) -> &'static str {
591 let type_name = std::any::type_name::<Self>();
592 type_name.split("::").last().unwrap_or(type_name)
594 }
595
596 async fn next(
598 &mut self,
599 ctx: &mut Context,
600 procedure_ctx: &ProcedureContext,
601 ) -> Result<(Box<dyn State>, Status)>;
602
603 fn as_any(&self) -> &dyn Any;
604}
605
606#[cfg(test)]
607mod tests {
608 use std::assert_matches;
609 use std::sync::Arc;
610
611 use common_meta::key::TableMetadataManager;
612 use common_meta::kv_backend::test_util::MockKvBackendBuilder;
613
614 use crate::error::Error;
615 use crate::procedure::repartition::test_util::{TestingEnv, new_persistent_context};
616
617 #[tokio::test]
618 async fn test_get_table_route_value_not_found_error() {
619 let env = TestingEnv::new();
620 let persistent_context = new_persistent_context(1024, vec![], vec![]);
621 let ctx = env.create_context(persistent_context);
622 let err = ctx.get_table_route_value().await.unwrap_err();
623 assert_matches!(err, Error::TableRouteNotFound { .. });
624 assert!(!err.is_retryable());
625 }
626
627 #[tokio::test]
628 async fn test_get_table_route_value_retry_error() {
629 let kv = MockKvBackendBuilder::default()
630 .range_fn(Arc::new(|_| {
631 common_meta::error::UnexpectedSnafu {
632 err_msg: "mock err",
633 }
634 .fail()
635 }))
636 .build()
637 .unwrap();
638 let mut env = TestingEnv::new();
639 env.table_metadata_manager = Arc::new(TableMetadataManager::new(Arc::new(kv)));
640 let persistent_context = new_persistent_context(1024, vec![], vec![]);
641 let ctx = env.create_context(persistent_context);
642 let err = ctx.get_table_route_value().await.unwrap_err();
643 assert!(err.is_retryable());
644 }
645
646 #[tokio::test]
647 async fn test_get_datanode_table_value_retry_error() {
648 let kv = MockKvBackendBuilder::default()
649 .range_fn(Arc::new(|_| {
650 common_meta::error::UnexpectedSnafu {
651 err_msg: "mock err",
652 }
653 .fail()
654 }))
655 .build()
656 .unwrap();
657 let mut env = TestingEnv::new();
658 env.table_metadata_manager = Arc::new(TableMetadataManager::new(Arc::new(kv)));
659 let persistent_context = new_persistent_context(1024, vec![], vec![]);
660 let ctx = env.create_context(persistent_context);
661 let err = ctx.get_datanode_table_value(1024, 1).await.unwrap_err();
662 assert!(err.is_retryable());
663 }
664}