1pub(crate) mod close_downgraded_region;
16pub(crate) mod downgrade_leader_region;
17pub(crate) mod flush_leader_region;
18pub(crate) mod manager;
19pub(crate) mod migration_abort;
20pub(crate) mod migration_end;
21pub(crate) mod migration_start;
22pub(crate) mod open_candidate_region;
23#[cfg(test)]
24pub mod test_util;
25pub(crate) mod update_metadata;
26pub(crate) mod upgrade_candidate_region;
27
28use std::any::Any;
29use std::collections::{HashMap, HashSet};
30use std::fmt::{Debug, Display};
31use std::sync::Arc;
32use std::time::Duration;
33
34use common_error::ext::BoxedError;
35use common_event_recorder::{Event, Eventable};
36use common_meta::cache_invalidator::CacheInvalidatorRef;
37use common_meta::ddl::RegionFailureDetectorControllerRef;
38use common_meta::instruction::CacheIdent;
39use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue};
40use common_meta::key::table_route::TableRouteValue;
41use common_meta::key::topic_region::{ReplayCheckpoint, TopicRegionKey};
42use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
43use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
44use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock, TableLock};
45use common_meta::peer::Peer;
46use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard};
47use common_procedure::error::{
48 Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
49};
50use common_procedure::{
51 Context as ProcedureContext, LockKey, Procedure, Status, StringKey, UserMetadata,
52};
53use common_telemetry::{error, info};
54use manager::RegionMigrationProcedureGuard;
55pub use manager::{
56 RegionMigrationManagerRef, RegionMigrationProcedureTask, RegionMigrationProcedureTracker,
57 RegionMigrationTriggerReason,
58};
59use serde::{Deserialize, Deserializer, Serialize};
60use snafu::{OptionExt, ResultExt};
61use store_api::storage::{RegionId, TableId};
62use tokio::time::Instant;
63
64use self::migration_start::RegionMigrationStart;
65use crate::error::{self, Result};
66use crate::events::region_migration_event::RegionMigrationEvent;
67use crate::metrics::{
68 METRIC_META_REGION_MIGRATION_ERROR, METRIC_META_REGION_MIGRATION_EXECUTE,
69 METRIC_META_REGION_MIGRATION_STAGE_ELAPSED,
70};
71use crate::service::mailbox::MailboxRef;
72
73pub const DEFAULT_REGION_MIGRATION_TIMEOUT: Duration = Duration::from_secs(120);
75
76#[derive(Debug, Deserialize)]
77#[serde(untagged)]
78enum SingleOrMultiple<T> {
79 Single(T),
80 Multiple(Vec<T>),
81}
82
83fn single_or_multiple_from<'de, D, T>(deserializer: D) -> std::result::Result<Vec<T>, D::Error>
84where
85 D: Deserializer<'de>,
86 T: Deserialize<'de>,
87{
88 let helper = SingleOrMultiple::<T>::deserialize(deserializer)?;
89 Ok(match helper {
90 SingleOrMultiple::Single(x) => vec![x],
91 SingleOrMultiple::Multiple(xs) => xs,
92 })
93}
94
95#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
101pub struct PersistentContext {
102 pub(crate) catalog: String,
104 pub(crate) schema: String,
106 pub(crate) from_peer: Peer,
108 pub(crate) to_peer: Peer,
110 #[serde(deserialize_with = "single_or_multiple_from", alias = "region_id")]
112 pub(crate) region_ids: Vec<RegionId>,
113 #[serde(with = "humantime_serde", default = "default_timeout")]
115 pub(crate) timeout: Duration,
116 #[serde(default)]
118 pub(crate) trigger_reason: RegionMigrationTriggerReason,
119}
120
121fn default_timeout() -> Duration {
122 Duration::from_secs(10)
123}
124
125impl PersistentContext {
126 pub fn lock_key(&self) -> Vec<StringKey> {
127 let mut lock_keys = Vec::with_capacity(self.region_ids.len() + 2);
128 lock_keys.push(CatalogLock::Read(&self.catalog).into());
129 lock_keys.push(SchemaLock::read(&self.catalog, &self.schema).into());
130 let mut region_ids = self.region_ids.clone();
132 region_ids.sort_unstable();
133 for region_id in region_ids {
134 lock_keys.push(RegionLock::Write(region_id).into());
135 }
136 lock_keys
137 }
138
139 pub fn region_table_ids(&self) -> Vec<TableId> {
143 self.region_ids
144 .iter()
145 .map(|region_id| region_id.table_id())
146 .collect::<HashSet<_>>()
147 .into_iter()
148 .collect()
149 }
150
151 pub fn table_regions(&self) -> HashMap<TableId, Vec<RegionId>> {
155 let mut table_regions = HashMap::new();
156 for region_id in &self.region_ids {
157 table_regions
158 .entry(region_id.table_id())
159 .or_insert_with(Vec::new)
160 .push(*region_id);
161 }
162 table_regions
163 }
164}
165
166impl Eventable for PersistentContext {
167 fn to_event(&self) -> Option<Box<dyn Event>> {
168 Some(Box::new(RegionMigrationEvent::from_persistent_ctx(self)))
169 }
170}
171
172#[derive(Debug, Clone, Default)]
174pub struct Metrics {
175 operations_elapsed: Duration,
177 flush_leader_region_elapsed: Duration,
179 downgrade_leader_region_elapsed: Duration,
181 open_candidate_region_elapsed: Duration,
183 upgrade_candidate_region_elapsed: Duration,
185}
186
187impl Display for Metrics {
188 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
189 let total = self.flush_leader_region_elapsed
190 + self.downgrade_leader_region_elapsed
191 + self.open_candidate_region_elapsed
192 + self.upgrade_candidate_region_elapsed;
193 write!(
194 f,
195 "total: {:?}, flush_leader_region_elapsed: {:?}, downgrade_leader_region_elapsed: {:?}, open_candidate_region_elapsed: {:?}, upgrade_candidate_region_elapsed: {:?}",
196 total,
197 self.flush_leader_region_elapsed,
198 self.downgrade_leader_region_elapsed,
199 self.open_candidate_region_elapsed,
200 self.upgrade_candidate_region_elapsed
201 )
202 }
203}
204
205impl Metrics {
206 pub fn update_operations_elapsed(&mut self, elapsed: Duration) {
208 self.operations_elapsed += elapsed;
209 }
210
211 pub fn update_flush_leader_region_elapsed(&mut self, elapsed: Duration) {
213 self.flush_leader_region_elapsed += elapsed;
214 }
215
216 pub fn update_downgrade_leader_region_elapsed(&mut self, elapsed: Duration) {
218 self.downgrade_leader_region_elapsed += elapsed;
219 }
220
221 pub fn update_open_candidate_region_elapsed(&mut self, elapsed: Duration) {
223 self.open_candidate_region_elapsed += elapsed;
224 }
225
226 pub fn update_upgrade_candidate_region_elapsed(&mut self, elapsed: Duration) {
228 self.upgrade_candidate_region_elapsed += elapsed;
229 }
230}
231
232impl Drop for Metrics {
233 fn drop(&mut self) {
234 let total = self.flush_leader_region_elapsed
235 + self.downgrade_leader_region_elapsed
236 + self.open_candidate_region_elapsed
237 + self.upgrade_candidate_region_elapsed;
238 METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
239 .with_label_values(&["total"])
240 .observe(total.as_secs_f64());
241
242 if !self.flush_leader_region_elapsed.is_zero() {
243 METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
244 .with_label_values(&["flush_leader_region"])
245 .observe(self.flush_leader_region_elapsed.as_secs_f64());
246 }
247
248 if !self.downgrade_leader_region_elapsed.is_zero() {
249 METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
250 .with_label_values(&["downgrade_leader_region"])
251 .observe(self.downgrade_leader_region_elapsed.as_secs_f64());
252 }
253
254 if !self.open_candidate_region_elapsed.is_zero() {
255 METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
256 .with_label_values(&["open_candidate_region"])
257 .observe(self.open_candidate_region_elapsed.as_secs_f64());
258 }
259
260 if !self.upgrade_candidate_region_elapsed.is_zero() {
261 METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
262 .with_label_values(&["upgrade_candidate_region"])
263 .observe(self.upgrade_candidate_region_elapsed.as_secs_f64());
264 }
265 }
266}
267
268#[derive(Debug, Clone, Default)]
274pub struct VolatileContext {
275 opening_region_guards: Vec<OperatingRegionGuard>,
282 leader_region_lease_deadline: Option<Instant>,
284 from_peer_datanode_table_values: Option<HashMap<TableId, DatanodeTableValue>>,
286 leader_region_last_entry_ids: HashMap<RegionId, u64>,
288 leader_region_metadata_last_entry_ids: HashMap<RegionId, u64>,
290 metrics: Metrics,
292}
293
294impl VolatileContext {
295 pub fn set_leader_region_lease_deadline(&mut self, lease_timeout: Duration) {
297 if self.leader_region_lease_deadline.is_none() {
298 self.leader_region_lease_deadline = Some(Instant::now() + lease_timeout);
299 }
300 }
301
302 pub fn reset_leader_region_lease_deadline(&mut self) {
304 self.leader_region_lease_deadline = None;
305 }
306
307 pub fn set_last_entry_id(&mut self, region_id: RegionId, last_entry_id: u64) {
309 self.leader_region_last_entry_ids
310 .insert(region_id, last_entry_id);
311 }
312
313 pub fn set_metadata_last_entry_id(&mut self, region_id: RegionId, last_entry_id: u64) {
315 self.leader_region_metadata_last_entry_ids
316 .insert(region_id, last_entry_id);
317 }
318}
319
320pub trait ContextFactory {
322 fn new_context(self, persistent_ctx: PersistentContext) -> Context;
323}
324
325#[derive(Clone)]
327pub struct DefaultContextFactory {
328 volatile_ctx: VolatileContext,
329 in_memory_key: ResettableKvBackendRef,
330 table_metadata_manager: TableMetadataManagerRef,
331 opening_region_keeper: MemoryRegionKeeperRef,
332 region_failure_detector_controller: RegionFailureDetectorControllerRef,
333 mailbox: MailboxRef,
334 server_addr: String,
335 cache_invalidator: CacheInvalidatorRef,
336}
337
338impl DefaultContextFactory {
339 pub fn new(
341 in_memory_key: ResettableKvBackendRef,
342 table_metadata_manager: TableMetadataManagerRef,
343 opening_region_keeper: MemoryRegionKeeperRef,
344 region_failure_detector_controller: RegionFailureDetectorControllerRef,
345 mailbox: MailboxRef,
346 server_addr: String,
347 cache_invalidator: CacheInvalidatorRef,
348 ) -> Self {
349 Self {
350 volatile_ctx: VolatileContext::default(),
351 in_memory_key,
352 table_metadata_manager,
353 opening_region_keeper,
354 region_failure_detector_controller,
355 mailbox,
356 server_addr,
357 cache_invalidator,
358 }
359 }
360}
361
362impl ContextFactory for DefaultContextFactory {
363 fn new_context(self, persistent_ctx: PersistentContext) -> Context {
364 Context {
365 persistent_ctx,
366 volatile_ctx: self.volatile_ctx,
367 in_memory: self.in_memory_key,
368 table_metadata_manager: self.table_metadata_manager,
369 opening_region_keeper: self.opening_region_keeper,
370 region_failure_detector_controller: self.region_failure_detector_controller,
371 mailbox: self.mailbox,
372 server_addr: self.server_addr,
373 cache_invalidator: self.cache_invalidator,
374 }
375 }
376}
377
378pub struct Context {
380 persistent_ctx: PersistentContext,
381 volatile_ctx: VolatileContext,
382 in_memory: KvBackendRef,
383 table_metadata_manager: TableMetadataManagerRef,
384 opening_region_keeper: MemoryRegionKeeperRef,
385 region_failure_detector_controller: RegionFailureDetectorControllerRef,
386 mailbox: MailboxRef,
387 server_addr: String,
388 cache_invalidator: CacheInvalidatorRef,
389}
390
391impl Context {
392 pub fn next_operation_timeout(&self) -> Option<Duration> {
394 self.persistent_ctx
395 .timeout
396 .checked_sub(self.volatile_ctx.metrics.operations_elapsed)
397 }
398
399 pub fn update_operations_elapsed(&mut self, instant: Instant) {
401 self.volatile_ctx
402 .metrics
403 .update_operations_elapsed(instant.elapsed());
404 }
405
406 pub fn update_flush_leader_region_elapsed(&mut self, instant: Instant) {
408 self.volatile_ctx
409 .metrics
410 .update_flush_leader_region_elapsed(instant.elapsed());
411 }
412
413 pub fn update_downgrade_leader_region_elapsed(&mut self, instant: Instant) {
415 self.volatile_ctx
416 .metrics
417 .update_downgrade_leader_region_elapsed(instant.elapsed());
418 }
419
420 pub fn update_open_candidate_region_elapsed(&mut self, instant: Instant) {
422 self.volatile_ctx
423 .metrics
424 .update_open_candidate_region_elapsed(instant.elapsed());
425 }
426
427 pub fn update_upgrade_candidate_region_elapsed(&mut self, instant: Instant) {
429 self.volatile_ctx
430 .metrics
431 .update_upgrade_candidate_region_elapsed(instant.elapsed());
432 }
433
434 pub fn server_addr(&self) -> &str {
436 &self.server_addr
437 }
438
439 pub fn region_table_ids(&self) -> Vec<TableId> {
441 self.persistent_ctx
442 .region_ids
443 .iter()
444 .map(|region_id| region_id.table_id())
445 .collect::<HashSet<_>>()
446 .into_iter()
447 .collect()
448 }
449
450 pub async fn get_table_route_values(
456 &self,
457 ) -> Result<HashMap<TableId, DeserializedValueWithBytes<TableRouteValue>>> {
458 let table_ids = self.persistent_ctx.region_table_ids();
459 let table_routes = self
460 .table_metadata_manager
461 .table_route_manager()
462 .table_route_storage()
463 .batch_get_with_raw_bytes(&table_ids)
464 .await
465 .context(error::TableMetadataManagerSnafu)
466 .map_err(BoxedError::new)
467 .with_context(|_| error::RetryLaterWithSourceSnafu {
468 reason: format!("Failed to get table routes: {table_ids:?}"),
469 })?;
470 let table_routes = table_ids
471 .into_iter()
472 .zip(table_routes)
473 .filter_map(|(table_id, table_route)| {
474 table_route.map(|table_route| (table_id, table_route))
475 })
476 .collect::<HashMap<_, _>>();
477 Ok(table_routes)
478 }
479
480 pub async fn get_table_route_value(
486 &self,
487 table_id: TableId,
488 ) -> Result<DeserializedValueWithBytes<TableRouteValue>> {
489 let table_route_value = self
490 .table_metadata_manager
491 .table_route_manager()
492 .table_route_storage()
493 .get_with_raw_bytes(table_id)
494 .await
495 .context(error::TableMetadataManagerSnafu)
496 .map_err(BoxedError::new)
497 .with_context(|_| error::RetryLaterWithSourceSnafu {
498 reason: format!("Failed to get table routes: {table_id:}"),
499 })?
500 .context(error::TableRouteNotFoundSnafu { table_id })?;
501 Ok(table_route_value)
502 }
503
504 pub async fn get_from_peer_datanode_table_values(
510 &mut self,
511 ) -> Result<&HashMap<TableId, DatanodeTableValue>> {
512 let from_peer_datanode_table_values =
513 &mut self.volatile_ctx.from_peer_datanode_table_values;
514 if from_peer_datanode_table_values.is_none() {
515 let table_ids = self.persistent_ctx.region_table_ids();
516 let datanode_table_keys = table_ids
517 .iter()
518 .map(|table_id| DatanodeTableKey {
519 datanode_id: self.persistent_ctx.from_peer.id,
520 table_id: *table_id,
521 })
522 .collect::<Vec<_>>();
523 let datanode_table_values = self
524 .table_metadata_manager
525 .datanode_table_manager()
526 .batch_get(&datanode_table_keys)
527 .await
528 .context(error::TableMetadataManagerSnafu)
529 .map_err(BoxedError::new)
530 .with_context(|_| error::RetryLaterWithSourceSnafu {
531 reason: format!("Failed to get DatanodeTable: {table_ids:?}"),
532 })?
533 .into_iter()
534 .map(|(k, v)| (k.table_id, v))
535 .collect();
536 *from_peer_datanode_table_values = Some(datanode_table_values);
537 }
538 Ok(from_peer_datanode_table_values.as_ref().unwrap())
539 }
540
541 pub async fn get_from_peer_datanode_table_value(
547 &self,
548 table_id: TableId,
549 ) -> Result<DatanodeTableValue> {
550 let datanode_table_value = self
551 .table_metadata_manager
552 .datanode_table_manager()
553 .get(&DatanodeTableKey {
554 datanode_id: self.persistent_ctx.from_peer.id,
555 table_id,
556 })
557 .await
558 .context(error::TableMetadataManagerSnafu)
559 .map_err(BoxedError::new)
560 .with_context(|_| error::RetryLaterWithSourceSnafu {
561 reason: format!("Failed to get DatanodeTable: {table_id}"),
562 })?
563 .context(error::DatanodeTableNotFoundSnafu {
564 table_id,
565 datanode_id: self.persistent_ctx.from_peer.id,
566 })?;
567 Ok(datanode_table_value)
568 }
569
570 pub async fn register_failure_detectors(&self) {
575 let datanode_id = self.persistent_ctx.from_peer.id;
576 let region_ids = &self.persistent_ctx.region_ids;
577 let detecting_regions = region_ids
578 .iter()
579 .map(|region_id| (datanode_id, *region_id))
580 .collect::<Vec<_>>();
581 self.region_failure_detector_controller
582 .register_failure_detectors(detecting_regions)
583 .await;
584 info!(
585 "Registered failure detectors after migration failures for datanode {}, regions {:?}",
586 datanode_id, region_ids
587 );
588 }
589
590 pub async fn deregister_failure_detectors(&self) {
595 let datanode_id = self.persistent_ctx.from_peer.id;
596 let region_ids = &self.persistent_ctx.region_ids;
597 let detecting_regions = region_ids
598 .iter()
599 .map(|region_id| (datanode_id, *region_id))
600 .collect::<Vec<_>>();
601
602 self.region_failure_detector_controller
603 .deregister_failure_detectors(detecting_regions)
604 .await;
605 }
606
607 pub async fn deregister_failure_detectors_for_candidate_region(&self) {
612 let to_peer_id = self.persistent_ctx.to_peer.id;
613 let region_ids = &self.persistent_ctx.region_ids;
614 let detecting_regions = region_ids
615 .iter()
616 .map(|region_id| (to_peer_id, *region_id))
617 .collect::<Vec<_>>();
618
619 self.region_failure_detector_controller
620 .deregister_failure_detectors(detecting_regions)
621 .await;
622 }
623
624 pub async fn get_replay_checkpoints(
626 &self,
627 topic_region_keys: Vec<TopicRegionKey<'_>>,
628 ) -> Result<HashMap<RegionId, ReplayCheckpoint>> {
629 let topic_region_values = self
630 .table_metadata_manager
631 .topic_region_manager()
632 .batch_get(topic_region_keys)
633 .await
634 .context(error::TableMetadataManagerSnafu)?;
635
636 let replay_checkpoints = topic_region_values
637 .into_iter()
638 .flat_map(|(key, value)| value.checkpoint.map(|value| (key, value)))
639 .collect::<HashMap<_, _>>();
640
641 Ok(replay_checkpoints)
642 }
643
644 pub async fn invalidate_table_cache(&self) -> Result<()> {
646 let table_ids = self.region_table_ids();
647 let mut cache_idents = Vec::with_capacity(table_ids.len());
648 for table_id in &table_ids {
649 cache_idents.push(CacheIdent::TableId(*table_id));
650 }
651 let ctx = common_meta::cache_invalidator::Context::default();
653 let _ = self.cache_invalidator.invalidate(&ctx, &cache_idents).await;
654 Ok(())
655 }
656
657 pub fn persistent_ctx(&self) -> PersistentContext {
659 self.persistent_ctx.clone()
660 }
661}
662
663#[async_trait::async_trait]
664#[typetag::serde(tag = "region_migration_state")]
665pub(crate) trait State: Sync + Send + Debug {
666 fn name(&self) -> &'static str {
667 let type_name = std::any::type_name::<Self>();
668 type_name.split("::").last().unwrap_or(type_name)
670 }
671
672 async fn next(
674 &mut self,
675 ctx: &mut Context,
676 procedure_ctx: &ProcedureContext,
677 ) -> Result<(Box<dyn State>, Status)>;
678
679 fn as_any(&self) -> &dyn Any;
681}
682
683#[derive(Debug, Serialize, Deserialize)]
685pub struct RegionMigrationDataOwned {
686 persistent_ctx: PersistentContext,
687 state: Box<dyn State>,
688}
689
690#[derive(Debug, Serialize)]
692pub struct RegionMigrationData<'a> {
693 persistent_ctx: &'a PersistentContext,
694 state: &'a dyn State,
695}
696
697pub(crate) struct RegionMigrationProcedure {
698 state: Box<dyn State>,
699 context: Context,
700 _guards: Vec<RegionMigrationProcedureGuard>,
701}
702
703impl RegionMigrationProcedure {
704 const TYPE_NAME: &'static str = "metasrv-procedure::RegionMigration";
705
706 pub fn new(
707 persistent_context: PersistentContext,
708 context_factory: impl ContextFactory,
709 guards: Vec<RegionMigrationProcedureGuard>,
710 ) -> Self {
711 let state = Box::new(RegionMigrationStart {});
712 Self::new_inner(state, persistent_context, context_factory, guards)
713 }
714
715 fn new_inner(
716 state: Box<dyn State>,
717 persistent_context: PersistentContext,
718 context_factory: impl ContextFactory,
719 guards: Vec<RegionMigrationProcedureGuard>,
720 ) -> Self {
721 Self {
722 state,
723 context: context_factory.new_context(persistent_context),
724 _guards: guards,
725 }
726 }
727
728 fn from_json(
729 json: &str,
730 context_factory: impl ContextFactory,
731 tracker: RegionMigrationProcedureTracker,
732 ) -> ProcedureResult<Self> {
733 let RegionMigrationDataOwned {
734 persistent_ctx,
735 state,
736 } = serde_json::from_str(json).context(FromJsonSnafu)?;
737 let guards = persistent_ctx
738 .region_ids
739 .iter()
740 .flat_map(|region_id| {
741 tracker.insert_running_procedure(&RegionMigrationProcedureTask {
742 region_id: *region_id,
743 from_peer: persistent_ctx.from_peer.clone(),
744 to_peer: persistent_ctx.to_peer.clone(),
745 timeout: persistent_ctx.timeout,
746 trigger_reason: persistent_ctx.trigger_reason,
747 })
748 })
749 .collect::<Vec<_>>();
750
751 let context = context_factory.new_context(persistent_ctx);
752
753 Ok(Self {
754 state,
755 context,
756 _guards: guards,
757 })
758 }
759
760 async fn rollback_inner(&mut self, procedure_ctx: &ProcedureContext) -> Result<()> {
761 let _timer = METRIC_META_REGION_MIGRATION_EXECUTE
762 .with_label_values(&["rollback"])
763 .start_timer();
764 let ctx = &self.context;
765 let table_regions = ctx.persistent_ctx.table_regions();
766 for (table_id, regions) in table_regions {
767 let table_lock = TableLock::Write(table_id).into();
768 let _guard = procedure_ctx.provider.acquire_lock(&table_lock).await;
769 let table_route = ctx.get_table_route_value(table_id).await?;
770 let region_routes = table_route.region_routes().unwrap();
771 let downgraded = region_routes
772 .iter()
773 .filter(|route| regions.contains(&route.region.id))
774 .any(|route| route.is_leader_downgrading());
775 if downgraded {
776 info!(
777 "Rollbacking downgraded region leader table route, table: {table_id}, regions: {regions:?}"
778 );
779 let table_metadata_manager = &ctx.table_metadata_manager;
780 table_metadata_manager
781 .update_leader_region_status(table_id, &table_route, |route| {
782 if regions.contains(&route.region.id) {
783 Some(None)
784 } else {
785 None
786 }
787 })
788 .await
789 .context(error::TableMetadataManagerSnafu)
790 .map_err(BoxedError::new)
791 .with_context(|_| error::RetryLaterWithSourceSnafu {
792 reason: format!("Failed to update the table route during the rollback downgraded leader region: {regions:?}"),
793 })?;
794 }
795 }
796 self.context
797 .deregister_failure_detectors_for_candidate_region()
798 .await;
799 self.context.register_failure_detectors().await;
800
801 Ok(())
802 }
803}
804
805#[async_trait::async_trait]
806impl Procedure for RegionMigrationProcedure {
807 fn type_name(&self) -> &str {
808 Self::TYPE_NAME
809 }
810
811 async fn rollback(&mut self, ctx: &ProcedureContext) -> ProcedureResult<()> {
812 self.rollback_inner(ctx)
813 .await
814 .map_err(ProcedureError::external)
815 }
816
817 fn rollback_supported(&self) -> bool {
818 true
819 }
820
821 async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
822 let state = &mut self.state;
823
824 let name = state.name();
825 let _timer = METRIC_META_REGION_MIGRATION_EXECUTE
826 .with_label_values(&[name])
827 .start_timer();
828 match state.next(&mut self.context, ctx).await {
829 Ok((next, status)) => {
830 *state = next;
831 Ok(status)
832 }
833 Err(e) => {
834 if e.is_retryable() {
835 METRIC_META_REGION_MIGRATION_ERROR
836 .with_label_values(&[name, "retryable"])
837 .inc();
838 Err(ProcedureError::retry_later(e))
839 } else {
840 self.context.volatile_ctx.opening_region_guards.clear();
842 self.context
843 .deregister_failure_detectors_for_candidate_region()
844 .await;
845 error!(
846 e;
847 "Region migration procedure failed, regions: {:?}, from_peer: {}, to_peer: {}, {}",
848 self.context.persistent_ctx.region_ids,
849 self.context.persistent_ctx.from_peer,
850 self.context.persistent_ctx.to_peer,
851 self.context.volatile_ctx.metrics,
852 );
853 METRIC_META_REGION_MIGRATION_ERROR
854 .with_label_values(&[name, "external"])
855 .inc();
856 Err(ProcedureError::external(e))
857 }
858 }
859 }
860 }
861
862 fn dump(&self) -> ProcedureResult<String> {
863 let data = RegionMigrationData {
864 state: self.state.as_ref(),
865 persistent_ctx: &self.context.persistent_ctx,
866 };
867 serde_json::to_string(&data).context(ToJsonSnafu)
868 }
869
870 fn lock_key(&self) -> LockKey {
871 LockKey::new(self.context.persistent_ctx.lock_key())
872 }
873
874 fn user_metadata(&self) -> Option<UserMetadata> {
875 Some(UserMetadata::new(Arc::new(self.context.persistent_ctx())))
876 }
877}
878
879#[cfg(test)]
880mod tests {
881 use std::assert_matches::assert_matches;
882 use std::sync::Arc;
883
884 use common_meta::distributed_time_constants::REGION_LEASE_SECS;
885 use common_meta::instruction::Instruction;
886 use common_meta::key::test_utils::new_test_table_info;
887 use common_meta::rpc::router::{Region, RegionRoute};
888
889 use super::*;
890 use crate::handler::HeartbeatMailbox;
891 use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion;
892 use crate::procedure::region_migration::test_util::*;
893 use crate::procedure::test_util::{
894 new_downgrade_region_reply, new_flush_region_reply_for_region, new_open_region_reply,
895 new_upgrade_region_reply,
896 };
897 use crate::service::mailbox::Channel;
898
899 fn new_persistent_context() -> PersistentContext {
900 test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
901 }
902
903 #[test]
904 fn test_lock_key() {
905 let persistent_context = new_persistent_context();
906 let expected_keys = persistent_context.lock_key();
907
908 let env = TestingEnv::new();
909 let context = env.context_factory();
910
911 let procedure = RegionMigrationProcedure::new(persistent_context, context, vec![]);
912
913 let key = procedure.lock_key();
914 let keys = key.keys_to_lock().cloned().collect::<Vec<_>>();
915
916 for key in expected_keys {
917 assert!(keys.contains(&key));
918 }
919 }
920
921 #[test]
922 fn test_data_serialization() {
923 let persistent_context = new_persistent_context();
924
925 let env = TestingEnv::new();
926 let context = env.context_factory();
927
928 let procedure = RegionMigrationProcedure::new(persistent_context, context, vec![]);
929
930 let serialized = procedure.dump().unwrap();
931 let expected = r#"{"persistent_ctx":{"catalog":"greptime","schema":"public","from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_ids":[4398046511105],"timeout":"10s","trigger_reason":"Unknown"},"state":{"region_migration_state":"RegionMigrationStart"}}"#;
932 assert_eq!(expected, serialized);
933 }
934
935 #[test]
936 fn test_backward_compatibility() {
937 let persistent_ctx = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
938 let serialized = r#"{"catalog":"greptime","schema":"public","from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105}"#;
940 let deserialized: PersistentContext = serde_json::from_str(serialized).unwrap();
941
942 assert_eq!(persistent_ctx, deserialized);
943 }
944
945 #[derive(Debug, Serialize, Deserialize, Default)]
946 pub struct MockState;
947
948 #[async_trait::async_trait]
949 #[typetag::serde]
950 impl State for MockState {
951 async fn next(
952 &mut self,
953 _ctx: &mut Context,
954 _procedure_ctx: &ProcedureContext,
955 ) -> Result<(Box<dyn State>, Status)> {
956 Ok((Box::new(MockState), Status::done()))
957 }
958
959 fn as_any(&self) -> &dyn Any {
960 self
961 }
962 }
963
964 #[tokio::test]
965 async fn test_execution_after_deserialized() {
966 let env = TestingEnv::new();
967
968 fn new_mock_procedure(env: &TestingEnv) -> RegionMigrationProcedure {
969 let persistent_context = new_persistent_context();
970 let context_factory = env.context_factory();
971 let state = Box::<MockState>::default();
972 RegionMigrationProcedure::new_inner(state, persistent_context, context_factory, vec![])
973 }
974
975 let ctx = TestingEnv::procedure_context();
976 let mut procedure = new_mock_procedure(&env);
977 let mut status = None;
978 for _ in 0..3 {
979 status = Some(procedure.execute(&ctx).await.unwrap());
980 }
981 assert!(status.unwrap().is_done());
982
983 let ctx = TestingEnv::procedure_context();
984 let mut procedure = new_mock_procedure(&env);
985
986 status = Some(procedure.execute(&ctx).await.unwrap());
987
988 let serialized = procedure.dump().unwrap();
989
990 let context_factory = env.context_factory();
991 let tracker = env.tracker();
992 let mut procedure =
993 RegionMigrationProcedure::from_json(&serialized, context_factory, tracker.clone())
994 .unwrap();
995 for region_id in &procedure.context.persistent_ctx.region_ids {
996 assert!(tracker.contains(*region_id));
997 }
998
999 for _ in 1..3 {
1000 status = Some(procedure.execute(&ctx).await.unwrap());
1001 }
1002 assert!(status.unwrap().is_done());
1003 }
1004
1005 #[tokio::test]
1006 async fn test_broadcast_invalidate_table_cache() {
1007 let mut env = TestingEnv::new();
1008 let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1009 let ctx = env.context_factory().new_context(persistent_context);
1010 let mailbox_ctx = env.mailbox_context();
1011
1012 ctx.invalidate_table_cache().await.unwrap();
1014
1015 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
1016
1017 mailbox_ctx
1018 .insert_heartbeat_response_receiver(Channel::Frontend(1), tx)
1019 .await;
1020
1021 ctx.invalidate_table_cache().await.unwrap();
1022
1023 let resp = rx.recv().await.unwrap().unwrap();
1024 let msg = resp.mailbox_message.unwrap();
1025
1026 let instruction = HeartbeatMailbox::json_instruction(&msg).unwrap();
1027 assert_eq!(
1028 instruction,
1029 Instruction::InvalidateCaches(vec![CacheIdent::TableId(1024)])
1030 );
1031 }
1032
1033 fn procedure_flow_steps(from_peer_id: u64, to_peer_id: u64) -> Vec<Step> {
1034 vec![
1035 Step::next(
1037 "Should be the open candidate region",
1038 None,
1039 Assertion::simple(assert_open_candidate_region, assert_need_persist),
1040 ),
1041 Step::next(
1043 "Should be the flush leader region",
1044 Some(mock_datanode_reply(
1045 to_peer_id,
1046 Arc::new(|id| Ok(new_open_region_reply(id, true, None))),
1047 )),
1048 Assertion::simple(assert_flush_leader_region, assert_no_persist),
1049 ),
1050 Step::next(
1052 "Should be the flush leader region",
1053 Some(mock_datanode_reply(
1054 from_peer_id,
1055 Arc::new(move |id| {
1056 Ok(new_flush_region_reply_for_region(
1057 id,
1058 RegionId::new(1024, 1),
1059 true,
1060 None,
1061 ))
1062 }),
1063 )),
1064 Assertion::simple(assert_update_metadata_downgrade, assert_no_persist),
1065 ),
1066 Step::next(
1068 "Should be the downgrade leader region",
1069 None,
1070 Assertion::simple(assert_downgrade_leader_region, assert_no_persist),
1071 ),
1072 Step::next(
1074 "Should be the upgrade candidate region",
1075 Some(mock_datanode_reply(
1076 from_peer_id,
1077 Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))),
1078 )),
1079 Assertion::simple(assert_upgrade_candidate_region, assert_no_persist),
1080 ),
1081 Step::next(
1083 "Should be the update metadata for upgrading",
1084 Some(mock_datanode_reply(
1085 to_peer_id,
1086 Arc::new(|id| Ok(new_upgrade_region_reply(id, true, true, None))),
1087 )),
1088 Assertion::simple(assert_update_metadata_upgrade, assert_no_persist),
1089 ),
1090 Step::next(
1092 "Should be the close downgraded region",
1093 None,
1094 Assertion::simple(assert_close_downgraded_region, assert_no_persist),
1095 ),
1096 Step::next(
1098 "Should be the region migration end",
1099 None,
1100 Assertion::simple(assert_region_migration_end, assert_done),
1101 ),
1102 Step::next(
1104 "Should be the region migration end again",
1105 None,
1106 Assertion::simple(assert_region_migration_end, assert_done),
1107 ),
1108 ]
1109 }
1110
1111 #[tokio::test]
1112 async fn test_procedure_flow() {
1113 common_telemetry::init_default_ut_logging();
1114
1115 let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1116 let state = Box::new(RegionMigrationStart);
1117
1118 let from_peer_id = persistent_context.from_peer.id;
1120 let to_peer_id = persistent_context.to_peer.id;
1121 let from_peer = persistent_context.from_peer.clone();
1122 let to_peer = persistent_context.to_peer.clone();
1123 let region_id = persistent_context.region_ids[0];
1124 let table_info = new_test_table_info(1024, vec![1]).into();
1125 let region_routes = vec![RegionRoute {
1126 region: Region::new_test(region_id),
1127 leader_peer: Some(from_peer),
1128 follower_peers: vec![to_peer],
1129 ..Default::default()
1130 }];
1131
1132 let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1133 suite.init_table_metadata(table_info, region_routes).await;
1134
1135 let steps = procedure_flow_steps(from_peer_id, to_peer_id);
1136 let timer = Instant::now();
1137
1138 let runner = ProcedureMigrationSuiteRunner::new(suite)
1140 .steps(steps)
1141 .run_once()
1142 .await;
1143
1144 assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS / 2);
1146
1147 runner.suite.verify_table_metadata().await;
1148 }
1149
1150 #[tokio::test]
1151 async fn test_procedure_flow_open_candidate_region_retryable_error() {
1152 common_telemetry::init_default_ut_logging();
1153
1154 let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1155 let state = Box::new(RegionMigrationStart);
1156
1157 let to_peer_id = persistent_context.to_peer.id;
1159 let from_peer = persistent_context.from_peer.clone();
1160 let region_id = persistent_context.region_ids[0];
1161 let table_info = new_test_table_info(1024, vec![1]).into();
1162 let region_routes = vec![RegionRoute {
1163 region: Region::new_test(region_id),
1164 leader_peer: Some(from_peer),
1165 follower_peers: vec![],
1166 ..Default::default()
1167 }];
1168
1169 let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1170 suite.init_table_metadata(table_info, region_routes).await;
1171
1172 let steps = vec![
1173 Step::next(
1175 "Should be the open candidate region",
1176 None,
1177 Assertion::simple(assert_open_candidate_region, assert_need_persist),
1178 ),
1179 Step::next(
1181 "Should be throwing a non-retry error",
1182 Some(mock_datanode_reply(
1183 to_peer_id,
1184 Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1185 )),
1186 Assertion::error(|error| assert!(error.is_retryable())),
1187 ),
1188 Step::next(
1190 "Should be throwing a non-retry error again",
1191 Some(mock_datanode_reply(
1192 to_peer_id,
1193 Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1194 )),
1195 Assertion::error(|error| assert!(error.is_retryable())),
1196 ),
1197 ];
1198
1199 let setup_to_latest_persisted_state = Step::setup(
1200 "Sets state to UpdateMetadata::Downgrade",
1201 merge_before_test_fn(vec![
1202 setup_state(Arc::new(|| Box::new(OpenCandidateRegion))),
1203 Arc::new(reset_volatile_ctx),
1204 ]),
1205 );
1206
1207 let steps = [
1208 steps.clone(),
1209 vec![setup_to_latest_persisted_state.clone()],
1211 steps.clone()[1..].to_vec(),
1212 vec![setup_to_latest_persisted_state],
1213 steps.clone()[1..].to_vec(),
1214 ]
1215 .concat();
1216
1217 let runner = ProcedureMigrationSuiteRunner::new(suite)
1219 .steps(steps.clone())
1220 .run_once()
1221 .await;
1222
1223 let table_routes_version = runner
1224 .env()
1225 .table_metadata_manager()
1226 .table_route_manager()
1227 .table_route_storage()
1228 .get(region_id.table_id())
1229 .await
1230 .unwrap()
1231 .unwrap()
1232 .version();
1233 assert_eq!(table_routes_version.unwrap(), 0);
1235 }
1236
1237 #[tokio::test]
1238 async fn test_procedure_flow_upgrade_candidate_with_retry_and_failed() {
1239 common_telemetry::init_default_ut_logging();
1240
1241 let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1242 let state = Box::new(RegionMigrationStart);
1243
1244 let from_peer_id = persistent_context.from_peer.id;
1246 let to_peer_id = persistent_context.to_peer.id;
1247 let from_peer = persistent_context.from_peer.clone();
1248 let region_id = persistent_context.region_ids[0];
1249 let table_info = new_test_table_info(1024, vec![1]).into();
1250 let region_routes = vec![RegionRoute {
1251 region: Region::new_test(region_id),
1252 leader_peer: Some(from_peer),
1253 follower_peers: vec![],
1254 ..Default::default()
1255 }];
1256
1257 let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1258 suite.init_table_metadata(table_info, region_routes).await;
1259
1260 let steps = vec![
1261 Step::next(
1263 "Should be the open candidate region",
1264 None,
1265 Assertion::simple(assert_open_candidate_region, assert_need_persist),
1266 ),
1267 Step::next(
1269 "Should be the flush leader region",
1270 Some(mock_datanode_reply(
1271 to_peer_id,
1272 Arc::new(|id| Ok(new_open_region_reply(id, true, None))),
1273 )),
1274 Assertion::simple(assert_flush_leader_region, assert_no_persist),
1275 ),
1276 Step::next(
1278 "Should be the flush leader region",
1279 Some(mock_datanode_reply(
1280 from_peer_id,
1281 Arc::new(move |id| {
1282 Ok(new_flush_region_reply_for_region(
1283 id,
1284 RegionId::new(1024, 1),
1285 true,
1286 None,
1287 ))
1288 }),
1289 )),
1290 Assertion::simple(assert_update_metadata_downgrade, assert_no_persist),
1291 ),
1292 Step::next(
1294 "Should be the downgrade leader region",
1295 None,
1296 Assertion::simple(assert_downgrade_leader_region, assert_no_persist),
1297 ),
1298 Step::next(
1300 "Should be the upgrade candidate region",
1301 Some(mock_datanode_reply(
1302 from_peer_id,
1303 Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))),
1304 )),
1305 Assertion::simple(assert_upgrade_candidate_region, assert_no_persist),
1306 ),
1307 Step::next(
1309 "Should be the rollback metadata",
1310 Some(mock_datanode_reply(
1311 to_peer_id,
1312 Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1313 )),
1314 Assertion::simple(assert_update_metadata_rollback, assert_no_persist),
1315 ),
1316 Step::next(
1318 "Should be the region migration abort",
1319 None,
1320 Assertion::simple(assert_region_migration_abort, assert_no_persist),
1321 ),
1322 Step::next(
1324 "Should throw an error",
1325 None,
1326 Assertion::error(|error| {
1327 assert!(!error.is_retryable());
1328 assert_matches!(error, error::Error::MigrationAbort { .. });
1329 }),
1330 ),
1331 ];
1332
1333 let setup_to_latest_persisted_state = Step::setup(
1334 "Sets state to OpenCandidateRegion",
1335 merge_before_test_fn(vec![
1336 setup_state(Arc::new(|| Box::new(OpenCandidateRegion))),
1337 Arc::new(reset_volatile_ctx),
1338 ]),
1339 );
1340
1341 let steps = [
1342 steps.clone(),
1343 vec![setup_to_latest_persisted_state.clone()],
1344 steps.clone()[1..].to_vec(),
1345 vec![setup_to_latest_persisted_state],
1346 steps.clone()[1..].to_vec(),
1347 ]
1348 .concat();
1349
1350 ProcedureMigrationSuiteRunner::new(suite)
1352 .steps(steps.clone())
1353 .run_once()
1354 .await;
1355 }
1356
1357 #[tokio::test]
1358 async fn test_procedure_flow_upgrade_candidate_with_retry() {
1359 common_telemetry::init_default_ut_logging();
1360
1361 let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1362 let state = Box::new(RegionMigrationStart);
1363
1364 let to_peer_id = persistent_context.to_peer.id;
1366 let from_peer_id = persistent_context.from_peer.id;
1367 let from_peer = persistent_context.from_peer.clone();
1368 let region_id = persistent_context.region_ids[0];
1369 let table_info = new_test_table_info(1024, vec![1]).into();
1370 let region_routes = vec![RegionRoute {
1371 region: Region::new_test(region_id),
1372 leader_peer: Some(from_peer),
1373 follower_peers: vec![],
1374 ..Default::default()
1375 }];
1376
1377 let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1378 suite.init_table_metadata(table_info, region_routes).await;
1379
1380 let steps = vec![
1381 Step::next(
1383 "Should be the open candidate region",
1384 None,
1385 Assertion::simple(assert_open_candidate_region, assert_need_persist),
1386 ),
1387 Step::next(
1389 "Should be throwing a retryable error",
1390 Some(mock_datanode_reply(
1391 to_peer_id,
1392 Arc::new(|id| Ok(new_open_region_reply(id, false, None))),
1393 )),
1394 Assertion::error(|error| assert!(error.is_retryable(), "err: {error:?}")),
1395 ),
1396 Step::next(
1398 "Should be the update metadata for downgrading",
1399 Some(mock_datanode_reply(
1400 to_peer_id,
1401 Arc::new(|id| Ok(new_open_region_reply(id, true, None))),
1402 )),
1403 Assertion::simple(assert_flush_leader_region, assert_no_persist),
1404 ),
1405 Step::next(
1407 "Should be the flush leader region",
1408 Some(mock_datanode_reply(
1409 from_peer_id,
1410 Arc::new(move |id| {
1411 Ok(new_flush_region_reply_for_region(id, region_id, true, None))
1412 }),
1413 )),
1414 Assertion::simple(assert_update_metadata_downgrade, assert_no_persist),
1415 ),
1416 Step::next(
1418 "Should be the downgrade leader region",
1419 None,
1420 Assertion::simple(assert_downgrade_leader_region, assert_no_persist),
1421 ),
1422 Step::next(
1424 "Should be the upgrade candidate region",
1425 Some(mock_datanode_reply(
1426 from_peer_id,
1427 merge_mailbox_messages(vec![
1428 Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1429 Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))),
1430 ]),
1431 )),
1432 Assertion::simple(assert_upgrade_candidate_region, assert_no_persist),
1433 ),
1434 Step::next(
1436 "Should be the update metadata for upgrading",
1437 Some(mock_datanode_reply(
1438 to_peer_id,
1439 merge_mailbox_messages(vec![
1440 Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1441 Arc::new(|id| Ok(new_upgrade_region_reply(id, true, true, None))),
1442 ]),
1443 )),
1444 Assertion::simple(assert_update_metadata_upgrade, assert_no_persist),
1445 ),
1446 Step::next(
1448 "Should be the close downgraded region",
1449 None,
1450 Assertion::simple(assert_close_downgraded_region, assert_no_persist),
1451 ),
1452 Step::next(
1454 "Should be the region migration end",
1455 None,
1456 Assertion::simple(assert_region_migration_end, assert_done),
1457 ),
1458 Step::next(
1460 "Should be the region migration end again",
1461 None,
1462 Assertion::simple(assert_region_migration_end, assert_done),
1463 ),
1464 Step::setup(
1466 "Sets state to RegionMigrationStart",
1467 merge_before_test_fn(vec![
1468 setup_state(Arc::new(|| Box::new(RegionMigrationStart))),
1469 Arc::new(reset_volatile_ctx),
1470 ]),
1471 ),
1472 Step::next(
1476 "Should be the region migration end(has been migrated)",
1477 None,
1478 Assertion::simple(assert_region_migration_end, assert_done),
1479 ),
1480 ];
1481
1482 let steps = [steps.clone()].concat();
1483 let timer = Instant::now();
1484
1485 let runner = ProcedureMigrationSuiteRunner::new(suite)
1487 .steps(steps.clone())
1488 .run_once()
1489 .await;
1490
1491 assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS);
1493 runner.suite.verify_table_metadata().await;
1494 }
1495}