1pub(crate) mod apply_staging_manifest;
16pub(crate) mod enter_staging_region;
17pub(crate) mod remap_manifest;
18pub(crate) mod repartition_end;
19pub(crate) mod repartition_start;
20pub(crate) mod sync_region;
21pub(crate) mod update_metadata;
22pub(crate) mod utils;
23
24use std::any::Any;
25use std::collections::HashMap;
26use std::fmt::{Debug, Display};
27use std::time::{Duration, Instant};
28
29use common_error::ext::BoxedError;
30use common_meta::cache_invalidator::CacheInvalidatorRef;
31use common_meta::ddl::DdlContext;
32use common_meta::instruction::CacheIdent;
33use common_meta::key::datanode_table::{DatanodeTableValue, RegionInfo};
34use common_meta::key::table_route::TableRouteValue;
35use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
36use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock};
37use common_meta::peer::Peer;
38use common_meta::rpc::router::RegionRoute;
39use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
40use common_procedure::{
41 Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
42 Result as ProcedureResult, Status, StringKey, UserMetadata,
43};
44use common_telemetry::{error, info};
45use serde::{Deserialize, Serialize};
46use snafu::{OptionExt, ResultExt};
47use store_api::storage::{RegionId, TableId};
48use uuid::Uuid;
49
50use crate::error::{self, Result};
51use crate::procedure::repartition::group::repartition_start::RepartitionStart;
52use crate::procedure::repartition::plan::RegionDescriptor;
53use crate::procedure::repartition::utils::get_datanode_table_value;
54use crate::procedure::repartition::{self};
55use crate::service::mailbox::MailboxRef;
56
57#[derive(Debug, Clone, Default)]
58pub struct Metrics {
59 flush_pending_deallocate_regions_elapsed: Duration,
61 enter_staging_region_elapsed: Duration,
63 apply_staging_manifest_elapsed: Duration,
65 remap_manifest_elapsed: Duration,
67 update_metadata_elapsed: Duration,
69}
70
71impl Display for Metrics {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 let total = self.flush_pending_deallocate_regions_elapsed
74 + self.enter_staging_region_elapsed
75 + self.apply_staging_manifest_elapsed
76 + self.remap_manifest_elapsed
77 + self.update_metadata_elapsed;
78 write!(f, "total: {:?}", total)?;
79 let mut parts = Vec::with_capacity(5);
80 if self.flush_pending_deallocate_regions_elapsed > Duration::ZERO {
81 parts.push(format!(
82 "flush_pending_deallocate_regions_elapsed: {:?}",
83 self.flush_pending_deallocate_regions_elapsed
84 ));
85 }
86 if self.enter_staging_region_elapsed > Duration::ZERO {
87 parts.push(format!(
88 "enter_staging_region_elapsed: {:?}",
89 self.enter_staging_region_elapsed
90 ));
91 }
92 if self.apply_staging_manifest_elapsed > Duration::ZERO {
93 parts.push(format!(
94 "apply_staging_manifest_elapsed: {:?}",
95 self.apply_staging_manifest_elapsed
96 ));
97 }
98 if self.remap_manifest_elapsed > Duration::ZERO {
99 parts.push(format!(
100 "remap_manifest_elapsed: {:?}",
101 self.remap_manifest_elapsed
102 ));
103 }
104 if self.update_metadata_elapsed > Duration::ZERO {
105 parts.push(format!(
106 "update_metadata_elapsed: {:?}",
107 self.update_metadata_elapsed
108 ));
109 }
110
111 if !parts.is_empty() {
112 write!(f, ", {}", parts.join(", "))?;
113 }
114 Ok(())
115 }
116}
117
118impl Metrics {
119 pub fn update_enter_staging_region_elapsed(&mut self, elapsed: Duration) {
121 self.enter_staging_region_elapsed += elapsed;
122 }
123
124 pub fn update_flush_pending_deallocate_regions_elapsed(&mut self, elapsed: Duration) {
125 self.flush_pending_deallocate_regions_elapsed += elapsed;
126 }
127
128 pub fn update_apply_staging_manifest_elapsed(&mut self, elapsed: Duration) {
130 self.apply_staging_manifest_elapsed += elapsed;
131 }
132
133 pub fn update_remap_manifest_elapsed(&mut self, elapsed: Duration) {
135 self.remap_manifest_elapsed += elapsed;
136 }
137
138 pub fn update_update_metadata_elapsed(&mut self, elapsed: Duration) {
140 self.update_metadata_elapsed += elapsed;
141 }
142}
143
144pub type GroupId = Uuid;
145
146pub struct RepartitionGroupProcedure {
147 state: Box<dyn State>,
148 context: Context,
149}
150
151#[derive(Debug, Serialize)]
152struct RepartitionGroupData<'a> {
153 persistent_ctx: &'a PersistentContext,
154 state: &'a dyn State,
155}
156
157#[derive(Debug, Deserialize)]
158struct RepartitionGroupDataOwned {
159 persistent_ctx: PersistentContext,
160 state: Box<dyn State>,
161}
162
163impl RepartitionGroupProcedure {
164 pub(crate) const TYPE_NAME: &'static str = "metasrv-procedure::RepartitionGroup";
165
166 pub fn new(persistent_context: PersistentContext, context: &repartition::Context) -> Self {
167 let state = Box::new(RepartitionStart);
168
169 Self {
170 state,
171 context: Context {
172 persistent_ctx: persistent_context,
173 cache_invalidator: context.cache_invalidator.clone(),
174 table_metadata_manager: context.table_metadata_manager.clone(),
175 mailbox: context.mailbox.clone(),
176 server_addr: context.server_addr.clone(),
177 start_time: Instant::now(),
178 volatile_ctx: VolatileContext::default(),
179 },
180 }
181 }
182
183 pub fn from_json<F>(json: &str, ctx_factory: F) -> ProcedureResult<Self>
184 where
185 F: FnOnce(PersistentContext) -> Context,
186 {
187 let RepartitionGroupDataOwned {
188 state,
189 persistent_ctx,
190 } = serde_json::from_str(json).context(FromJsonSnafu)?;
191 let context = ctx_factory(persistent_ctx);
192
193 Ok(Self { state, context })
194 }
195}
196
197#[async_trait::async_trait]
198impl Procedure for RepartitionGroupProcedure {
199 fn type_name(&self) -> &str {
200 Self::TYPE_NAME
201 }
202
203 #[tracing::instrument(skip_all, fields(
204 state = %self.state.name(),
205 table_id = self.context.persistent_ctx.table_id,
206 group_id = %self.context.persistent_ctx.group_id,
207 ))]
208 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
209 let state = &mut self.state;
210 let state_name = state.name();
211 common_telemetry::info!(
213 "Repartition group procedure executing state: {}, group id: {}, table id: {}",
214 state_name,
215 self.context.persistent_ctx.group_id,
216 self.context.persistent_ctx.table_id
217 );
218
219 match state.next(&mut self.context, _ctx).await {
220 Ok((next, status)) => {
221 *state = next;
222 Ok(status)
223 }
224 Err(e) => {
225 if e.is_retryable() {
226 Err(ProcedureError::retry_later(e))
227 } else {
228 error!(
229 e;
230 "Repartition group procedure failed, group id: {}, table id: {}",
231 self.context.persistent_ctx.group_id,
232 self.context.persistent_ctx.table_id,
233 );
234 Err(ProcedureError::external(e))
235 }
236 }
237 }
238 }
239
240 fn rollback_supported(&self) -> bool {
241 false
242 }
243
244 fn dump(&self) -> ProcedureResult<String> {
245 let data = RepartitionGroupData {
246 persistent_ctx: &self.context.persistent_ctx,
247 state: self.state.as_ref(),
248 };
249 serde_json::to_string(&data).context(ToJsonSnafu)
250 }
251
252 fn lock_key(&self) -> LockKey {
253 LockKey::new(self.context.persistent_ctx.lock_key())
254 }
255
256 fn user_metadata(&self) -> Option<UserMetadata> {
257 None
259 }
260}
261
262pub struct Context {
263 pub persistent_ctx: PersistentContext,
264
265 pub cache_invalidator: CacheInvalidatorRef,
266
267 pub table_metadata_manager: TableMetadataManagerRef,
268
269 pub mailbox: MailboxRef,
270
271 pub server_addr: String,
272
273 pub start_time: Instant,
274
275 pub volatile_ctx: VolatileContext,
276}
277
278#[derive(Debug, Clone, Default)]
279pub struct VolatileContext {
280 pub metrics: Metrics,
281}
282
283impl Context {
284 pub fn new(
285 ddl_ctx: &DdlContext,
286 mailbox: MailboxRef,
287 server_addr: String,
288 persistent_ctx: PersistentContext,
289 ) -> Self {
290 Self {
291 persistent_ctx,
292 cache_invalidator: ddl_ctx.cache_invalidator.clone(),
293 table_metadata_manager: ddl_ctx.table_metadata_manager.clone(),
294 mailbox,
295 server_addr,
296 start_time: Instant::now(),
297 volatile_ctx: VolatileContext::default(),
298 }
299 }
300}
301
302#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
304pub struct GroupPrepareResult {
305 pub source_routes: Vec<RegionRoute>,
307 pub target_routes: Vec<RegionRoute>,
309 pub central_region: RegionId,
311 pub central_region_datanode: Peer,
313}
314
315#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
316pub struct PersistentContext {
317 pub group_id: GroupId,
318 pub table_id: TableId,
320 pub catalog_name: String,
322 pub schema_name: String,
324 pub sources: Vec<RegionDescriptor>,
326 pub targets: Vec<RegionDescriptor>,
328 pub region_mapping: HashMap<RegionId, Vec<RegionId>>,
331 pub group_prepare_result: Option<GroupPrepareResult>,
334 pub staging_manifest_paths: HashMap<RegionId, String>,
337 pub sync_region: bool,
339 pub allocated_region_ids: Vec<RegionId>,
341 pub pending_deallocate_region_ids: Vec<RegionId>,
343 #[serde(with = "humantime_serde")]
345 pub timeout: Duration,
346}
347
348impl PersistentContext {
349 #[allow(clippy::too_many_arguments)]
350 pub fn new(
351 group_id: GroupId,
352 table_id: TableId,
353 catalog_name: String,
354 schema_name: String,
355 sources: Vec<RegionDescriptor>,
356 targets: Vec<RegionDescriptor>,
357 region_mapping: HashMap<RegionId, Vec<RegionId>>,
358 sync_region: bool,
359 allocated_region_ids: Vec<RegionId>,
360 pending_deallocate_region_ids: Vec<RegionId>,
361 timeout: Duration,
362 ) -> Self {
363 Self {
364 group_id,
365 table_id,
366 catalog_name,
367 schema_name,
368 sources,
369 targets,
370 region_mapping,
371 group_prepare_result: None,
372 staging_manifest_paths: HashMap::new(),
373 sync_region,
374 allocated_region_ids,
375 pending_deallocate_region_ids,
376 timeout,
377 }
378 }
379
380 pub fn lock_key(&self) -> Vec<StringKey> {
381 let mut lock_keys = Vec::with_capacity(2 + self.sources.len());
382 lock_keys.extend([
383 CatalogLock::Read(&self.catalog_name).into(),
384 SchemaLock::read(&self.catalog_name, &self.schema_name).into(),
385 ]);
386 for source in &self.sources {
387 lock_keys.push(RegionLock::Write(source.region_id).into());
388 }
389 lock_keys
390 }
391}
392
393impl Context {
394 pub async fn get_table_route_value(
402 &self,
403 ) -> Result<DeserializedValueWithBytes<TableRouteValue>> {
404 let table_id = self.persistent_ctx.table_id;
405 let group_id = self.persistent_ctx.group_id;
406 let table_route_value = self
407 .table_metadata_manager
408 .table_route_manager()
409 .table_route_storage()
410 .get_with_raw_bytes(table_id)
411 .await
412 .map_err(BoxedError::new)
413 .with_context(|_| error::RetryLaterWithSourceSnafu {
414 reason: format!(
415 "Failed to get table route for table: {}, repartition group: {}",
416 table_id, group_id
417 ),
418 })?
419 .context(error::TableRouteNotFoundSnafu { table_id })?;
420
421 Ok(table_route_value)
422 }
423
424 pub async fn get_datanode_table_value(
429 &self,
430 table_id: TableId,
431 datanode_id: u64,
432 ) -> Result<DatanodeTableValue> {
433 get_datanode_table_value(&self.table_metadata_manager, table_id, datanode_id).await
434 }
435
436 pub async fn invalidate_table_cache(&self) -> Result<()> {
438 let table_id = self.persistent_ctx.table_id;
439 let group_id = self.persistent_ctx.group_id;
440 let subject = format!(
441 "Invalidate table cache for repartition table, group: {}, table: {}",
442 group_id, table_id,
443 );
444 let ctx = common_meta::cache_invalidator::Context {
445 subject: Some(subject),
446 };
447 let _ = self
448 .cache_invalidator
449 .invalidate(&ctx, &[CacheIdent::TableId(table_id)])
450 .await;
451 Ok(())
452 }
453
454 pub async fn update_table_route(
463 &self,
464 current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
465 new_region_routes: Vec<RegionRoute>,
466 ) -> Result<()> {
467 let table_id = self.persistent_ctx.table_id;
468 let group_id = self.persistent_ctx.group_id;
469 let prepare_result = self.persistent_ctx.group_prepare_result.as_ref().unwrap();
471 let central_region_datanode_table_value = self
472 .get_datanode_table_value(table_id, prepare_result.central_region_datanode.id)
473 .await?;
474 let RegionInfo {
475 region_options,
476 region_wal_options,
477 ..
478 } = ¢ral_region_datanode_table_value.region_info;
479
480 info!(
481 "Updating table route for table: {}, group_id: {}, new region routes: {:?}",
482 table_id, group_id, new_region_routes
483 );
484 self.table_metadata_manager
485 .update_table_route(
486 table_id,
487 central_region_datanode_table_value.region_info.clone(),
488 current_table_route_value,
489 new_region_routes,
490 region_options,
491 region_wal_options,
492 )
493 .await
494 .context(error::TableMetadataManagerSnafu)
495 }
496
497 pub async fn update_table_repart_mapping(&self) -> Result<()> {
499 info!(
500 "Updating table repart mapping for table: {}, group_id: {}, region mapping: {:?}",
501 self.persistent_ctx.table_id,
502 self.persistent_ctx.group_id,
503 self.persistent_ctx.region_mapping
504 );
505
506 self.table_metadata_manager
507 .table_repart_manager()
508 .update_mappings(
509 self.persistent_ctx.table_id,
510 &self.persistent_ctx.region_mapping,
511 )
512 .await
513 .context(error::TableMetadataManagerSnafu)
514 }
515
516 pub fn next_operation_timeout(&self) -> Option<Duration> {
520 self.persistent_ctx
521 .timeout
522 .checked_sub(self.start_time.elapsed())
523 }
524
525 pub fn update_enter_staging_region_elapsed(&mut self, elapsed: Duration) {
527 self.volatile_ctx
528 .metrics
529 .update_enter_staging_region_elapsed(elapsed);
530 }
531
532 pub fn update_flush_pending_deallocate_regions_elapsed(&mut self, elapsed: Duration) {
534 self.volatile_ctx
535 .metrics
536 .update_flush_pending_deallocate_regions_elapsed(elapsed);
537 }
538
539 pub fn update_apply_staging_manifest_elapsed(&mut self, elapsed: Duration) {
541 self.volatile_ctx
542 .metrics
543 .update_apply_staging_manifest_elapsed(elapsed);
544 }
545
546 pub fn update_remap_manifest_elapsed(&mut self, elapsed: Duration) {
548 self.volatile_ctx
549 .metrics
550 .update_remap_manifest_elapsed(elapsed);
551 }
552
553 pub fn update_update_metadata_elapsed(&mut self, elapsed: Duration) {
555 self.volatile_ctx
556 .metrics
557 .update_update_metadata_elapsed(elapsed);
558 }
559}
560
561pub fn region_routes(
566 table_id: TableId,
567 table_route_value: &TableRouteValue,
568) -> Result<&Vec<RegionRoute>> {
569 table_route_value
570 .region_routes()
571 .with_context(|_| error::UnexpectedLogicalRouteTableSnafu {
572 err_msg: format!(
573 "TableRoute({:?}) is a non-physical TableRouteValue.",
574 table_id
575 ),
576 })
577}
578
579#[async_trait::async_trait]
580#[typetag::serde(tag = "repartition_group_state")]
581pub(crate) trait State: Sync + Send + Debug {
582 fn name(&self) -> &'static str {
583 let type_name = std::any::type_name::<Self>();
584 type_name.split("::").last().unwrap_or(type_name)
586 }
587
588 async fn next(
590 &mut self,
591 ctx: &mut Context,
592 procedure_ctx: &ProcedureContext,
593 ) -> Result<(Box<dyn State>, Status)>;
594
595 fn as_any(&self) -> &dyn Any;
596}
597
598#[cfg(test)]
599mod tests {
600 use std::assert_matches::assert_matches;
601 use std::sync::Arc;
602
603 use common_meta::key::TableMetadataManager;
604 use common_meta::kv_backend::test_util::MockKvBackendBuilder;
605
606 use crate::error::Error;
607 use crate::procedure::repartition::test_util::{TestingEnv, new_persistent_context};
608
609 #[tokio::test]
610 async fn test_get_table_route_value_not_found_error() {
611 let env = TestingEnv::new();
612 let persistent_context = new_persistent_context(1024, vec![], vec![]);
613 let ctx = env.create_context(persistent_context);
614 let err = ctx.get_table_route_value().await.unwrap_err();
615 assert_matches!(err, Error::TableRouteNotFound { .. });
616 assert!(!err.is_retryable());
617 }
618
619 #[tokio::test]
620 async fn test_get_table_route_value_retry_error() {
621 let kv = MockKvBackendBuilder::default()
622 .range_fn(Arc::new(|_| {
623 common_meta::error::UnexpectedSnafu {
624 err_msg: "mock err",
625 }
626 .fail()
627 }))
628 .build()
629 .unwrap();
630 let mut env = TestingEnv::new();
631 env.table_metadata_manager = Arc::new(TableMetadataManager::new(Arc::new(kv)));
632 let persistent_context = new_persistent_context(1024, vec![], vec![]);
633 let ctx = env.create_context(persistent_context);
634 let err = ctx.get_table_route_value().await.unwrap_err();
635 assert!(err.is_retryable());
636 }
637
638 #[tokio::test]
639 async fn test_get_datanode_table_value_retry_error() {
640 let kv = MockKvBackendBuilder::default()
641 .range_fn(Arc::new(|_| {
642 common_meta::error::UnexpectedSnafu {
643 err_msg: "mock err",
644 }
645 .fail()
646 }))
647 .build()
648 .unwrap();
649 let mut env = TestingEnv::new();
650 env.table_metadata_manager = Arc::new(TableMetadataManager::new(Arc::new(kv)));
651 let persistent_context = new_persistent_context(1024, vec![], vec![]);
652 let ctx = env.create_context(persistent_context);
653 let err = ctx.get_datanode_table_value(1024, 1).await.unwrap_err();
654 assert!(err.is_retryable());
655 }
656}