1pub(crate) mod apply_staging_manifest;
16pub(crate) mod enter_staging_region;
17pub(crate) mod remap_manifest;
18pub(crate) mod repartition_end;
19pub(crate) mod repartition_start;
20pub(crate) mod sync_region;
21pub(crate) mod update_metadata;
22pub(crate) mod utils;
23
24use std::any::Any;
25use std::collections::HashMap;
26use std::fmt::{Debug, Display};
27use std::time::{Duration, Instant};
28
29use common_error::ext::BoxedError;
30use common_meta::cache_invalidator::CacheInvalidatorRef;
31use common_meta::ddl::DdlContext;
32use common_meta::instruction::CacheIdent;
33use common_meta::key::datanode_table::{DatanodeTableValue, RegionInfo};
34use common_meta::key::table_route::TableRouteValue;
35use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
36use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock};
37use common_meta::peer::Peer;
38use common_meta::rpc::router::RegionRoute;
39use common_procedure::error::{FromJsonSnafu, ToJsonSnafu};
40use common_procedure::{
41 Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
42 Result as ProcedureResult, Status, StringKey, UserMetadata,
43};
44use common_telemetry::{error, info};
45use serde::{Deserialize, Serialize};
46use snafu::{OptionExt, ResultExt};
47use store_api::storage::{RegionId, TableId};
48use uuid::Uuid;
49
50use crate::error::{self, Result};
51use crate::procedure::repartition::group::repartition_start::RepartitionStart;
52use crate::procedure::repartition::plan::RegionDescriptor;
53use crate::procedure::repartition::utils::get_datanode_table_value;
54use crate::procedure::repartition::{self};
55use crate::service::mailbox::MailboxRef;
56
57#[derive(Debug, Clone, Default)]
58pub struct Metrics {
59 flush_pending_deallocate_regions_elapsed: Duration,
61 enter_staging_region_elapsed: Duration,
63 apply_staging_manifest_elapsed: Duration,
65 remap_manifest_elapsed: Duration,
67 update_metadata_elapsed: Duration,
69}
70
71impl Display for Metrics {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 let total = self.flush_pending_deallocate_regions_elapsed
74 + self.enter_staging_region_elapsed
75 + self.apply_staging_manifest_elapsed
76 + self.remap_manifest_elapsed
77 + self.update_metadata_elapsed;
78 write!(f, "total: {:?}", total)?;
79 let mut parts = Vec::with_capacity(5);
80 if self.flush_pending_deallocate_regions_elapsed > Duration::ZERO {
81 parts.push(format!(
82 "flush_pending_deallocate_regions_elapsed: {:?}",
83 self.flush_pending_deallocate_regions_elapsed
84 ));
85 }
86 if self.enter_staging_region_elapsed > Duration::ZERO {
87 parts.push(format!(
88 "enter_staging_region_elapsed: {:?}",
89 self.enter_staging_region_elapsed
90 ));
91 }
92 if self.apply_staging_manifest_elapsed > Duration::ZERO {
93 parts.push(format!(
94 "apply_staging_manifest_elapsed: {:?}",
95 self.apply_staging_manifest_elapsed
96 ));
97 }
98 if self.remap_manifest_elapsed > Duration::ZERO {
99 parts.push(format!(
100 "remap_manifest_elapsed: {:?}",
101 self.remap_manifest_elapsed
102 ));
103 }
104 if self.update_metadata_elapsed > Duration::ZERO {
105 parts.push(format!(
106 "update_metadata_elapsed: {:?}",
107 self.update_metadata_elapsed
108 ));
109 }
110
111 if !parts.is_empty() {
112 write!(f, ", {}", parts.join(", "))?;
113 }
114 Ok(())
115 }
116}
117
118impl Metrics {
119 pub fn update_enter_staging_region_elapsed(&mut self, elapsed: Duration) {
121 self.enter_staging_region_elapsed += elapsed;
122 }
123
124 pub fn update_flush_pending_deallocate_regions_elapsed(&mut self, elapsed: Duration) {
125 self.flush_pending_deallocate_regions_elapsed += elapsed;
126 }
127
128 pub fn update_apply_staging_manifest_elapsed(&mut self, elapsed: Duration) {
130 self.apply_staging_manifest_elapsed += elapsed;
131 }
132
133 pub fn update_remap_manifest_elapsed(&mut self, elapsed: Duration) {
135 self.remap_manifest_elapsed += elapsed;
136 }
137
138 pub fn update_update_metadata_elapsed(&mut self, elapsed: Duration) {
140 self.update_metadata_elapsed += elapsed;
141 }
142}
143
144pub type GroupId = Uuid;
145
146pub struct RepartitionGroupProcedure {
147 state: Box<dyn State>,
148 context: Context,
149}
150
151#[derive(Debug, Serialize)]
152struct RepartitionGroupData<'a> {
153 persistent_ctx: &'a PersistentContext,
154 state: &'a dyn State,
155}
156
157#[derive(Debug, Deserialize)]
158struct RepartitionGroupDataOwned {
159 persistent_ctx: PersistentContext,
160 state: Box<dyn State>,
161}
162
163impl RepartitionGroupProcedure {
164 pub(crate) const TYPE_NAME: &'static str = "metasrv-procedure::RepartitionGroup";
165
166 pub fn new(persistent_context: PersistentContext, context: &repartition::Context) -> Self {
167 let state = Box::new(RepartitionStart);
168
169 Self {
170 state,
171 context: Context {
172 persistent_ctx: persistent_context,
173 cache_invalidator: context.cache_invalidator.clone(),
174 table_metadata_manager: context.table_metadata_manager.clone(),
175 mailbox: context.mailbox.clone(),
176 server_addr: context.server_addr.clone(),
177 start_time: Instant::now(),
178 volatile_ctx: VolatileContext::default(),
179 },
180 }
181 }
182
183 pub fn from_json<F>(json: &str, ctx_factory: F) -> ProcedureResult<Self>
184 where
185 F: FnOnce(PersistentContext) -> Context,
186 {
187 let RepartitionGroupDataOwned {
188 state,
189 persistent_ctx,
190 } = serde_json::from_str(json).context(FromJsonSnafu)?;
191 let context = ctx_factory(persistent_ctx);
192
193 Ok(Self { state, context })
194 }
195}
196
197#[async_trait::async_trait]
198impl Procedure for RepartitionGroupProcedure {
199 fn type_name(&self) -> &str {
200 Self::TYPE_NAME
201 }
202
203 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
204 let state = &mut self.state;
205 let state_name = state.name();
206 common_telemetry::info!(
208 "Repartition group procedure executing state: {}, group id: {}, table id: {}",
209 state_name,
210 self.context.persistent_ctx.group_id,
211 self.context.persistent_ctx.table_id
212 );
213
214 match state.next(&mut self.context, _ctx).await {
215 Ok((next, status)) => {
216 *state = next;
217 Ok(status)
218 }
219 Err(e) => {
220 if e.is_retryable() {
221 Err(ProcedureError::retry_later(e))
222 } else {
223 error!(
224 e;
225 "Repartition group procedure failed, group id: {}, table id: {}",
226 self.context.persistent_ctx.group_id,
227 self.context.persistent_ctx.table_id,
228 );
229 Err(ProcedureError::external(e))
230 }
231 }
232 }
233 }
234
235 fn rollback_supported(&self) -> bool {
236 false
237 }
238
239 fn dump(&self) -> ProcedureResult<String> {
240 let data = RepartitionGroupData {
241 persistent_ctx: &self.context.persistent_ctx,
242 state: self.state.as_ref(),
243 };
244 serde_json::to_string(&data).context(ToJsonSnafu)
245 }
246
247 fn lock_key(&self) -> LockKey {
248 LockKey::new(self.context.persistent_ctx.lock_key())
249 }
250
251 fn user_metadata(&self) -> Option<UserMetadata> {
252 None
254 }
255}
256
257pub struct Context {
258 pub persistent_ctx: PersistentContext,
259
260 pub cache_invalidator: CacheInvalidatorRef,
261
262 pub table_metadata_manager: TableMetadataManagerRef,
263
264 pub mailbox: MailboxRef,
265
266 pub server_addr: String,
267
268 pub start_time: Instant,
269
270 pub volatile_ctx: VolatileContext,
271}
272
273#[derive(Debug, Clone, Default)]
274pub struct VolatileContext {
275 pub metrics: Metrics,
276}
277
278impl Context {
279 pub fn new(
280 ddl_ctx: &DdlContext,
281 mailbox: MailboxRef,
282 server_addr: String,
283 persistent_ctx: PersistentContext,
284 ) -> Self {
285 Self {
286 persistent_ctx,
287 cache_invalidator: ddl_ctx.cache_invalidator.clone(),
288 table_metadata_manager: ddl_ctx.table_metadata_manager.clone(),
289 mailbox,
290 server_addr,
291 start_time: Instant::now(),
292 volatile_ctx: VolatileContext::default(),
293 }
294 }
295}
296
297#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
299pub struct GroupPrepareResult {
300 pub source_routes: Vec<RegionRoute>,
302 pub target_routes: Vec<RegionRoute>,
304 pub central_region: RegionId,
306 pub central_region_datanode: Peer,
308}
309
310#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
311pub struct PersistentContext {
312 pub group_id: GroupId,
313 pub table_id: TableId,
315 pub catalog_name: String,
317 pub schema_name: String,
319 pub sources: Vec<RegionDescriptor>,
321 pub targets: Vec<RegionDescriptor>,
323 pub region_mapping: HashMap<RegionId, Vec<RegionId>>,
326 pub group_prepare_result: Option<GroupPrepareResult>,
329 pub staging_manifest_paths: HashMap<RegionId, String>,
332 pub sync_region: bool,
334 pub allocated_region_ids: Vec<RegionId>,
336 pub pending_deallocate_region_ids: Vec<RegionId>,
338 #[serde(with = "humantime_serde")]
340 pub timeout: Duration,
341}
342
343impl PersistentContext {
344 #[allow(clippy::too_many_arguments)]
345 pub fn new(
346 group_id: GroupId,
347 table_id: TableId,
348 catalog_name: String,
349 schema_name: String,
350 sources: Vec<RegionDescriptor>,
351 targets: Vec<RegionDescriptor>,
352 region_mapping: HashMap<RegionId, Vec<RegionId>>,
353 sync_region: bool,
354 allocated_region_ids: Vec<RegionId>,
355 pending_deallocate_region_ids: Vec<RegionId>,
356 timeout: Duration,
357 ) -> Self {
358 Self {
359 group_id,
360 table_id,
361 catalog_name,
362 schema_name,
363 sources,
364 targets,
365 region_mapping,
366 group_prepare_result: None,
367 staging_manifest_paths: HashMap::new(),
368 sync_region,
369 allocated_region_ids,
370 pending_deallocate_region_ids,
371 timeout,
372 }
373 }
374
375 pub fn lock_key(&self) -> Vec<StringKey> {
376 let mut lock_keys = Vec::with_capacity(2 + self.sources.len());
377 lock_keys.extend([
378 CatalogLock::Read(&self.catalog_name).into(),
379 SchemaLock::read(&self.catalog_name, &self.schema_name).into(),
380 ]);
381 for source in &self.sources {
382 lock_keys.push(RegionLock::Write(source.region_id).into());
383 }
384 lock_keys
385 }
386}
387
388impl Context {
389 pub async fn get_table_route_value(
397 &self,
398 ) -> Result<DeserializedValueWithBytes<TableRouteValue>> {
399 let table_id = self.persistent_ctx.table_id;
400 let group_id = self.persistent_ctx.group_id;
401 let table_route_value = self
402 .table_metadata_manager
403 .table_route_manager()
404 .table_route_storage()
405 .get_with_raw_bytes(table_id)
406 .await
407 .map_err(BoxedError::new)
408 .with_context(|_| error::RetryLaterWithSourceSnafu {
409 reason: format!(
410 "Failed to get table route for table: {}, repartition group: {}",
411 table_id, group_id
412 ),
413 })?
414 .context(error::TableRouteNotFoundSnafu { table_id })?;
415
416 Ok(table_route_value)
417 }
418
419 pub async fn get_datanode_table_value(
424 &self,
425 table_id: TableId,
426 datanode_id: u64,
427 ) -> Result<DatanodeTableValue> {
428 get_datanode_table_value(&self.table_metadata_manager, table_id, datanode_id).await
429 }
430
431 pub async fn invalidate_table_cache(&self) -> Result<()> {
433 let table_id = self.persistent_ctx.table_id;
434 let group_id = self.persistent_ctx.group_id;
435 let subject = format!(
436 "Invalidate table cache for repartition table, group: {}, table: {}",
437 group_id, table_id,
438 );
439 let ctx = common_meta::cache_invalidator::Context {
440 subject: Some(subject),
441 };
442 let _ = self
443 .cache_invalidator
444 .invalidate(&ctx, &[CacheIdent::TableId(table_id)])
445 .await;
446 Ok(())
447 }
448
449 pub async fn update_table_route(
458 &self,
459 current_table_route_value: &DeserializedValueWithBytes<TableRouteValue>,
460 new_region_routes: Vec<RegionRoute>,
461 ) -> Result<()> {
462 let table_id = self.persistent_ctx.table_id;
463 let group_id = self.persistent_ctx.group_id;
464 let prepare_result = self.persistent_ctx.group_prepare_result.as_ref().unwrap();
466 let central_region_datanode_table_value = self
467 .get_datanode_table_value(table_id, prepare_result.central_region_datanode.id)
468 .await?;
469 let RegionInfo {
470 region_options,
471 region_wal_options,
472 ..
473 } = ¢ral_region_datanode_table_value.region_info;
474
475 info!(
476 "Updating table route for table: {}, group_id: {}, new region routes: {:?}",
477 table_id, group_id, new_region_routes
478 );
479 self.table_metadata_manager
480 .update_table_route(
481 table_id,
482 central_region_datanode_table_value.region_info.clone(),
483 current_table_route_value,
484 new_region_routes,
485 region_options,
486 region_wal_options,
487 )
488 .await
489 .context(error::TableMetadataManagerSnafu)
490 }
491
492 pub async fn update_table_repart_mapping(&self) -> Result<()> {
494 info!(
495 "Updating table repart mapping for table: {}, group_id: {}, region mapping: {:?}",
496 self.persistent_ctx.table_id,
497 self.persistent_ctx.group_id,
498 self.persistent_ctx.region_mapping
499 );
500
501 self.table_metadata_manager
502 .table_repart_manager()
503 .update_mappings(
504 self.persistent_ctx.table_id,
505 &self.persistent_ctx.region_mapping,
506 )
507 .await
508 .context(error::TableMetadataManagerSnafu)
509 }
510
511 pub fn next_operation_timeout(&self) -> Option<Duration> {
515 self.persistent_ctx
516 .timeout
517 .checked_sub(self.start_time.elapsed())
518 }
519
520 pub fn update_enter_staging_region_elapsed(&mut self, elapsed: Duration) {
522 self.volatile_ctx
523 .metrics
524 .update_enter_staging_region_elapsed(elapsed);
525 }
526
527 pub fn update_flush_pending_deallocate_regions_elapsed(&mut self, elapsed: Duration) {
529 self.volatile_ctx
530 .metrics
531 .update_flush_pending_deallocate_regions_elapsed(elapsed);
532 }
533
534 pub fn update_apply_staging_manifest_elapsed(&mut self, elapsed: Duration) {
536 self.volatile_ctx
537 .metrics
538 .update_apply_staging_manifest_elapsed(elapsed);
539 }
540
541 pub fn update_remap_manifest_elapsed(&mut self, elapsed: Duration) {
543 self.volatile_ctx
544 .metrics
545 .update_remap_manifest_elapsed(elapsed);
546 }
547
548 pub fn update_update_metadata_elapsed(&mut self, elapsed: Duration) {
550 self.volatile_ctx
551 .metrics
552 .update_update_metadata_elapsed(elapsed);
553 }
554}
555
556pub fn region_routes(
561 table_id: TableId,
562 table_route_value: &TableRouteValue,
563) -> Result<&Vec<RegionRoute>> {
564 table_route_value
565 .region_routes()
566 .with_context(|_| error::UnexpectedLogicalRouteTableSnafu {
567 err_msg: format!(
568 "TableRoute({:?}) is a non-physical TableRouteValue.",
569 table_id
570 ),
571 })
572}
573
574#[async_trait::async_trait]
575#[typetag::serde(tag = "repartition_group_state")]
576pub(crate) trait State: Sync + Send + Debug {
577 fn name(&self) -> &'static str {
578 let type_name = std::any::type_name::<Self>();
579 type_name.split("::").last().unwrap_or(type_name)
581 }
582
583 async fn next(
585 &mut self,
586 ctx: &mut Context,
587 procedure_ctx: &ProcedureContext,
588 ) -> Result<(Box<dyn State>, Status)>;
589
590 fn as_any(&self) -> &dyn Any;
591}
592
593#[cfg(test)]
594mod tests {
595 use std::assert_matches::assert_matches;
596 use std::sync::Arc;
597
598 use common_meta::key::TableMetadataManager;
599 use common_meta::kv_backend::test_util::MockKvBackendBuilder;
600
601 use crate::error::Error;
602 use crate::procedure::repartition::test_util::{TestingEnv, new_persistent_context};
603
604 #[tokio::test]
605 async fn test_get_table_route_value_not_found_error() {
606 let env = TestingEnv::new();
607 let persistent_context = new_persistent_context(1024, vec![], vec![]);
608 let ctx = env.create_context(persistent_context);
609 let err = ctx.get_table_route_value().await.unwrap_err();
610 assert_matches!(err, Error::TableRouteNotFound { .. });
611 assert!(!err.is_retryable());
612 }
613
614 #[tokio::test]
615 async fn test_get_table_route_value_retry_error() {
616 let kv = MockKvBackendBuilder::default()
617 .range_fn(Arc::new(|_| {
618 common_meta::error::UnexpectedSnafu {
619 err_msg: "mock err",
620 }
621 .fail()
622 }))
623 .build()
624 .unwrap();
625 let mut env = TestingEnv::new();
626 env.table_metadata_manager = Arc::new(TableMetadataManager::new(Arc::new(kv)));
627 let persistent_context = new_persistent_context(1024, vec![], vec![]);
628 let ctx = env.create_context(persistent_context);
629 let err = ctx.get_table_route_value().await.unwrap_err();
630 assert!(err.is_retryable());
631 }
632
633 #[tokio::test]
634 async fn test_get_datanode_table_value_retry_error() {
635 let kv = MockKvBackendBuilder::default()
636 .range_fn(Arc::new(|_| {
637 common_meta::error::UnexpectedSnafu {
638 err_msg: "mock err",
639 }
640 .fail()
641 }))
642 .build()
643 .unwrap();
644 let mut env = TestingEnv::new();
645 env.table_metadata_manager = Arc::new(TableMetadataManager::new(Arc::new(kv)));
646 let persistent_context = new_persistent_context(1024, vec![], vec![]);
647 let ctx = env.create_context(persistent_context);
648 let err = ctx.get_datanode_table_value(1024, 1).await.unwrap_err();
649 assert!(err.is_retryable());
650 }
651}