1pub(crate) mod close_downgraded_region;
16pub(crate) mod downgrade_leader_region;
17pub(crate) mod flush_leader_region;
18pub(crate) mod manager;
19pub(crate) mod migration_abort;
20pub(crate) mod migration_end;
21pub(crate) mod migration_start;
22pub(crate) mod open_candidate_region;
23#[cfg(test)]
24pub mod test_util;
25pub(crate) mod update_metadata;
26pub(crate) mod upgrade_candidate_region;
27
28use std::any::Any;
29use std::fmt::{Debug, Display};
30use std::sync::Arc;
31use std::time::Duration;
32
33use common_error::ext::BoxedError;
34use common_event_recorder::{Event, Eventable};
35use common_meta::cache_invalidator::CacheInvalidatorRef;
36use common_meta::ddl::RegionFailureDetectorControllerRef;
37use common_meta::instruction::CacheIdent;
38use common_meta::key::datanode_table::{DatanodeTableKey, DatanodeTableValue};
39use common_meta::key::table_info::TableInfoValue;
40use common_meta::key::table_route::TableRouteValue;
41use common_meta::key::topic_region::{ReplayCheckpoint, TopicRegionKey};
42use common_meta::key::{DeserializedValueWithBytes, TableMetadataManagerRef};
43use common_meta::kv_backend::ResettableKvBackendRef;
44use common_meta::lock_key::{CatalogLock, RegionLock, SchemaLock};
45use common_meta::peer::Peer;
46use common_meta::region_keeper::{MemoryRegionKeeperRef, OperatingRegionGuard};
47use common_procedure::error::{
48 Error as ProcedureError, FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu,
49};
50use common_procedure::{
51 Context as ProcedureContext, LockKey, Procedure, Status, StringKey, UserMetadata,
52};
53use common_telemetry::{error, info};
54use manager::RegionMigrationProcedureGuard;
55pub use manager::{
56 RegionMigrationManagerRef, RegionMigrationProcedureTask, RegionMigrationProcedureTracker,
57 RegionMigrationTriggerReason,
58};
59use serde::{Deserialize, Serialize};
60use snafu::{OptionExt, ResultExt};
61use store_api::storage::RegionId;
62use tokio::time::Instant;
63
64use self::migration_start::RegionMigrationStart;
65use crate::error::{self, Result};
66use crate::events::region_migration_event::RegionMigrationEvent;
67use crate::metrics::{
68 METRIC_META_REGION_MIGRATION_ERROR, METRIC_META_REGION_MIGRATION_EXECUTE,
69 METRIC_META_REGION_MIGRATION_STAGE_ELAPSED,
70};
71use crate::service::mailbox::MailboxRef;
72
73pub const DEFAULT_REGION_MIGRATION_TIMEOUT: Duration = Duration::from_secs(120);
75
76#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
82pub struct PersistentContext {
83 pub(crate) catalog: String,
85 pub(crate) schema: String,
87 pub(crate) from_peer: Peer,
89 pub(crate) to_peer: Peer,
91 pub(crate) region_id: RegionId,
93 #[serde(with = "humantime_serde", default = "default_timeout")]
95 pub(crate) timeout: Duration,
96 #[serde(default)]
98 pub(crate) trigger_reason: RegionMigrationTriggerReason,
99}
100
101fn default_timeout() -> Duration {
102 Duration::from_secs(10)
103}
104
105impl PersistentContext {
106 pub fn lock_key(&self) -> Vec<StringKey> {
107 let region_id = self.region_id;
108 let lock_key = vec![
109 CatalogLock::Read(&self.catalog).into(),
110 SchemaLock::read(&self.catalog, &self.schema).into(),
111 RegionLock::Write(region_id).into(),
112 ];
113
114 lock_key
115 }
116}
117
118impl Eventable for PersistentContext {
119 fn to_event(&self) -> Option<Box<dyn Event>> {
120 Some(Box::new(RegionMigrationEvent::from_persistent_ctx(self)))
121 }
122}
123
124#[derive(Debug, Clone, Default)]
126pub struct Metrics {
127 operations_elapsed: Duration,
129 flush_leader_region_elapsed: Duration,
131 downgrade_leader_region_elapsed: Duration,
133 open_candidate_region_elapsed: Duration,
135 upgrade_candidate_region_elapsed: Duration,
137}
138
139impl Display for Metrics {
140 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141 let total = self.flush_leader_region_elapsed
142 + self.downgrade_leader_region_elapsed
143 + self.open_candidate_region_elapsed
144 + self.upgrade_candidate_region_elapsed;
145 write!(
146 f,
147 "total: {:?}, flush_leader_region_elapsed: {:?}, downgrade_leader_region_elapsed: {:?}, open_candidate_region_elapsed: {:?}, upgrade_candidate_region_elapsed: {:?}",
148 total,
149 self.flush_leader_region_elapsed,
150 self.downgrade_leader_region_elapsed,
151 self.open_candidate_region_elapsed,
152 self.upgrade_candidate_region_elapsed
153 )
154 }
155}
156
157impl Metrics {
158 pub fn update_operations_elapsed(&mut self, elapsed: Duration) {
160 self.operations_elapsed += elapsed;
161 }
162
163 pub fn update_flush_leader_region_elapsed(&mut self, elapsed: Duration) {
165 self.flush_leader_region_elapsed += elapsed;
166 }
167
168 pub fn update_downgrade_leader_region_elapsed(&mut self, elapsed: Duration) {
170 self.downgrade_leader_region_elapsed += elapsed;
171 }
172
173 pub fn update_open_candidate_region_elapsed(&mut self, elapsed: Duration) {
175 self.open_candidate_region_elapsed += elapsed;
176 }
177
178 pub fn update_upgrade_candidate_region_elapsed(&mut self, elapsed: Duration) {
180 self.upgrade_candidate_region_elapsed += elapsed;
181 }
182}
183
184impl Drop for Metrics {
185 fn drop(&mut self) {
186 let total = self.flush_leader_region_elapsed
187 + self.downgrade_leader_region_elapsed
188 + self.open_candidate_region_elapsed
189 + self.upgrade_candidate_region_elapsed;
190 METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
191 .with_label_values(&["total"])
192 .observe(total.as_secs_f64());
193
194 if !self.flush_leader_region_elapsed.is_zero() {
195 METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
196 .with_label_values(&["flush_leader_region"])
197 .observe(self.flush_leader_region_elapsed.as_secs_f64());
198 }
199
200 if !self.downgrade_leader_region_elapsed.is_zero() {
201 METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
202 .with_label_values(&["downgrade_leader_region"])
203 .observe(self.downgrade_leader_region_elapsed.as_secs_f64());
204 }
205
206 if !self.open_candidate_region_elapsed.is_zero() {
207 METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
208 .with_label_values(&["open_candidate_region"])
209 .observe(self.open_candidate_region_elapsed.as_secs_f64());
210 }
211
212 if !self.upgrade_candidate_region_elapsed.is_zero() {
213 METRIC_META_REGION_MIGRATION_STAGE_ELAPSED
214 .with_label_values(&["upgrade_candidate_region"])
215 .observe(self.upgrade_candidate_region_elapsed.as_secs_f64());
216 }
217 }
218}
219
220#[derive(Debug, Clone, Default)]
226pub struct VolatileContext {
227 opening_region_guard: Option<OperatingRegionGuard>,
234 table_route: Option<DeserializedValueWithBytes<TableRouteValue>>,
236 from_peer_datanode_table: Option<DatanodeTableValue>,
238 table_info: Option<DeserializedValueWithBytes<TableInfoValue>>,
243 leader_region_lease_deadline: Option<Instant>,
245 leader_region_last_entry_id: Option<u64>,
247 leader_region_metadata_last_entry_id: Option<u64>,
249 metrics: Metrics,
251}
252
253impl VolatileContext {
254 pub fn set_leader_region_lease_deadline(&mut self, lease_timeout: Duration) {
256 if self.leader_region_lease_deadline.is_none() {
257 self.leader_region_lease_deadline = Some(Instant::now() + lease_timeout);
258 }
259 }
260
261 pub fn reset_leader_region_lease_deadline(&mut self) {
263 self.leader_region_lease_deadline = None;
264 }
265
266 pub fn set_last_entry_id(&mut self, last_entry_id: u64) {
268 self.leader_region_last_entry_id = Some(last_entry_id)
269 }
270
271 pub fn set_metadata_last_entry_id(&mut self, last_entry_id: u64) {
273 self.leader_region_metadata_last_entry_id = Some(last_entry_id);
274 }
275}
276
277pub trait ContextFactory {
279 fn new_context(self, persistent_ctx: PersistentContext) -> Context;
280}
281
282#[derive(Clone)]
284pub struct DefaultContextFactory {
285 volatile_ctx: VolatileContext,
286 in_memory_key: ResettableKvBackendRef,
287 table_metadata_manager: TableMetadataManagerRef,
288 opening_region_keeper: MemoryRegionKeeperRef,
289 region_failure_detector_controller: RegionFailureDetectorControllerRef,
290 mailbox: MailboxRef,
291 server_addr: String,
292 cache_invalidator: CacheInvalidatorRef,
293}
294
295impl DefaultContextFactory {
296 pub fn new(
298 in_memory_key: ResettableKvBackendRef,
299 table_metadata_manager: TableMetadataManagerRef,
300 opening_region_keeper: MemoryRegionKeeperRef,
301 region_failure_detector_controller: RegionFailureDetectorControllerRef,
302 mailbox: MailboxRef,
303 server_addr: String,
304 cache_invalidator: CacheInvalidatorRef,
305 ) -> Self {
306 Self {
307 volatile_ctx: VolatileContext::default(),
308 in_memory_key,
309 table_metadata_manager,
310 opening_region_keeper,
311 region_failure_detector_controller,
312 mailbox,
313 server_addr,
314 cache_invalidator,
315 }
316 }
317}
318
319impl ContextFactory for DefaultContextFactory {
320 fn new_context(self, persistent_ctx: PersistentContext) -> Context {
321 Context {
322 persistent_ctx: Arc::new(persistent_ctx),
323 volatile_ctx: self.volatile_ctx,
324 in_memory: self.in_memory_key,
325 table_metadata_manager: self.table_metadata_manager,
326 opening_region_keeper: self.opening_region_keeper,
327 region_failure_detector_controller: self.region_failure_detector_controller,
328 mailbox: self.mailbox,
329 server_addr: self.server_addr,
330 cache_invalidator: self.cache_invalidator,
331 }
332 }
333}
334
335pub struct Context {
337 persistent_ctx: Arc<PersistentContext>,
338 volatile_ctx: VolatileContext,
339 in_memory: ResettableKvBackendRef,
340 table_metadata_manager: TableMetadataManagerRef,
341 opening_region_keeper: MemoryRegionKeeperRef,
342 region_failure_detector_controller: RegionFailureDetectorControllerRef,
343 mailbox: MailboxRef,
344 server_addr: String,
345 cache_invalidator: CacheInvalidatorRef,
346}
347
348impl Context {
349 pub fn next_operation_timeout(&self) -> Option<Duration> {
351 self.persistent_ctx
352 .timeout
353 .checked_sub(self.volatile_ctx.metrics.operations_elapsed)
354 }
355
356 pub fn update_operations_elapsed(&mut self, instant: Instant) {
358 self.volatile_ctx
359 .metrics
360 .update_operations_elapsed(instant.elapsed());
361 }
362
363 pub fn update_flush_leader_region_elapsed(&mut self, instant: Instant) {
365 self.volatile_ctx
366 .metrics
367 .update_flush_leader_region_elapsed(instant.elapsed());
368 }
369
370 pub fn update_downgrade_leader_region_elapsed(&mut self, instant: Instant) {
372 self.volatile_ctx
373 .metrics
374 .update_downgrade_leader_region_elapsed(instant.elapsed());
375 }
376
377 pub fn update_open_candidate_region_elapsed(&mut self, instant: Instant) {
379 self.volatile_ctx
380 .metrics
381 .update_open_candidate_region_elapsed(instant.elapsed());
382 }
383
384 pub fn update_upgrade_candidate_region_elapsed(&mut self, instant: Instant) {
386 self.volatile_ctx
387 .metrics
388 .update_upgrade_candidate_region_elapsed(instant.elapsed());
389 }
390
391 pub fn server_addr(&self) -> &str {
393 &self.server_addr
394 }
395
396 pub async fn get_table_route_value(
402 &mut self,
403 ) -> Result<&DeserializedValueWithBytes<TableRouteValue>> {
404 let table_route_value = &mut self.volatile_ctx.table_route;
405
406 if table_route_value.is_none() {
407 let table_id = self.persistent_ctx.region_id.table_id();
408 let table_route = self
409 .table_metadata_manager
410 .table_route_manager()
411 .table_route_storage()
412 .get_with_raw_bytes(table_id)
413 .await
414 .context(error::TableMetadataManagerSnafu)
415 .map_err(BoxedError::new)
416 .with_context(|_| error::RetryLaterWithSourceSnafu {
417 reason: format!("Failed to get TableRoute: {table_id}"),
418 })?
419 .context(error::TableRouteNotFoundSnafu { table_id })?;
420
421 *table_route_value = Some(table_route);
422 }
423
424 Ok(table_route_value.as_ref().unwrap())
425 }
426
427 pub async fn register_failure_detectors(&self) {
432 let datanode_id = self.persistent_ctx.from_peer.id;
433 let region_id = self.persistent_ctx.region_id;
434
435 self.region_failure_detector_controller
436 .register_failure_detectors(vec![(datanode_id, region_id)])
437 .await;
438 }
439
440 pub async fn deregister_failure_detectors(&self) {
445 let datanode_id = self.persistent_ctx.from_peer.id;
446 let region_id = self.persistent_ctx.region_id;
447
448 self.region_failure_detector_controller
449 .deregister_failure_detectors(vec![(datanode_id, region_id)])
450 .await;
451 }
452
453 pub async fn deregister_failure_detectors_for_candidate_region(&self) {
458 let to_peer_id = self.persistent_ctx.to_peer.id;
459 let region_id = self.persistent_ctx.region_id;
460
461 self.region_failure_detector_controller
462 .deregister_failure_detectors(vec![(to_peer_id, region_id)])
463 .await;
464 }
465
466 pub fn remove_table_route_value(&mut self) -> bool {
468 let value = self.volatile_ctx.table_route.take();
469 value.is_some()
470 }
471
472 pub async fn get_table_info_value(
478 &mut self,
479 ) -> Result<&DeserializedValueWithBytes<TableInfoValue>> {
480 let table_info_value = &mut self.volatile_ctx.table_info;
481
482 if table_info_value.is_none() {
483 let table_id = self.persistent_ctx.region_id.table_id();
484 let table_info = self
485 .table_metadata_manager
486 .table_info_manager()
487 .get(table_id)
488 .await
489 .context(error::TableMetadataManagerSnafu)
490 .map_err(BoxedError::new)
491 .with_context(|_| error::RetryLaterWithSourceSnafu {
492 reason: format!("Failed to get TableInfo: {table_id}"),
493 })?
494 .context(error::TableInfoNotFoundSnafu { table_id })?;
495
496 *table_info_value = Some(table_info);
497 }
498
499 Ok(table_info_value.as_ref().unwrap())
500 }
501
502 pub async fn get_from_peer_datanode_table_value(&mut self) -> Result<&DatanodeTableValue> {
508 let datanode_value = &mut self.volatile_ctx.from_peer_datanode_table;
509
510 if datanode_value.is_none() {
511 let table_id = self.persistent_ctx.region_id.table_id();
512 let datanode_id = self.persistent_ctx.from_peer.id;
513
514 let datanode_table = self
515 .table_metadata_manager
516 .datanode_table_manager()
517 .get(&DatanodeTableKey {
518 datanode_id,
519 table_id,
520 })
521 .await
522 .context(error::TableMetadataManagerSnafu)
523 .map_err(BoxedError::new)
524 .with_context(|_| error::RetryLaterWithSourceSnafu {
525 reason: format!("Failed to get DatanodeTable: ({datanode_id},{table_id})"),
526 })?
527 .context(error::DatanodeTableNotFoundSnafu {
528 table_id,
529 datanode_id,
530 })?;
531
532 *datanode_value = Some(datanode_table);
533 }
534
535 Ok(datanode_value.as_ref().unwrap())
536 }
537
538 pub async fn fetch_replay_checkpoint(&self, topic: &str) -> Result<Option<ReplayCheckpoint>> {
540 let region_id = self.region_id();
541 let topic_region_key = TopicRegionKey::new(region_id, topic);
542 let value = self
543 .table_metadata_manager
544 .topic_region_manager()
545 .get(topic_region_key)
546 .await
547 .context(error::TableMetadataManagerSnafu)?;
548
549 Ok(value.and_then(|value| value.checkpoint))
550 }
551
552 pub fn region_id(&self) -> RegionId {
554 self.persistent_ctx.region_id
555 }
556
557 pub async fn invalidate_table_cache(&self) -> Result<()> {
559 let table_id = self.region_id().table_id();
560 let ctx = common_meta::cache_invalidator::Context::default();
562 let _ = self
563 .cache_invalidator
564 .invalidate(&ctx, &[CacheIdent::TableId(table_id)])
565 .await;
566 Ok(())
567 }
568
569 pub fn persistent_ctx(&self) -> Arc<PersistentContext> {
571 self.persistent_ctx.clone()
572 }
573}
574
575#[async_trait::async_trait]
576#[typetag::serde(tag = "region_migration_state")]
577pub(crate) trait State: Sync + Send + Debug {
578 fn name(&self) -> &'static str {
579 let type_name = std::any::type_name::<Self>();
580 type_name.split("::").last().unwrap_or(type_name)
582 }
583
584 async fn next(
586 &mut self,
587 ctx: &mut Context,
588 procedure_ctx: &ProcedureContext,
589 ) -> Result<(Box<dyn State>, Status)>;
590
591 fn as_any(&self) -> &dyn Any;
593}
594
595#[derive(Debug, Serialize, Deserialize)]
597pub struct RegionMigrationDataOwned {
598 persistent_ctx: PersistentContext,
599 state: Box<dyn State>,
600}
601
602#[derive(Debug, Serialize)]
604pub struct RegionMigrationData<'a> {
605 persistent_ctx: &'a PersistentContext,
606 state: &'a dyn State,
607}
608
609pub(crate) struct RegionMigrationProcedure {
610 state: Box<dyn State>,
611 context: Context,
612 _guard: Option<RegionMigrationProcedureGuard>,
613}
614
615impl RegionMigrationProcedure {
616 const TYPE_NAME: &'static str = "metasrv-procedure::RegionMigration";
617
618 pub fn new(
619 persistent_context: PersistentContext,
620 context_factory: impl ContextFactory,
621 guard: Option<RegionMigrationProcedureGuard>,
622 ) -> Self {
623 let state = Box::new(RegionMigrationStart {});
624 Self::new_inner(state, persistent_context, context_factory, guard)
625 }
626
627 fn new_inner(
628 state: Box<dyn State>,
629 persistent_context: PersistentContext,
630 context_factory: impl ContextFactory,
631 guard: Option<RegionMigrationProcedureGuard>,
632 ) -> Self {
633 Self {
634 state,
635 context: context_factory.new_context(persistent_context),
636 _guard: guard,
637 }
638 }
639
640 fn from_json(
641 json: &str,
642 context_factory: impl ContextFactory,
643 tracker: RegionMigrationProcedureTracker,
644 ) -> ProcedureResult<Self> {
645 let RegionMigrationDataOwned {
646 persistent_ctx,
647 state,
648 } = serde_json::from_str(json).context(FromJsonSnafu)?;
649
650 let guard = tracker.insert_running_procedure(&RegionMigrationProcedureTask {
651 region_id: persistent_ctx.region_id,
652 from_peer: persistent_ctx.from_peer.clone(),
653 to_peer: persistent_ctx.to_peer.clone(),
654 timeout: persistent_ctx.timeout,
655 trigger_reason: persistent_ctx.trigger_reason,
656 });
657 let context = context_factory.new_context(persistent_ctx);
658
659 Ok(Self {
660 state,
661 context,
662 _guard: guard,
663 })
664 }
665
666 async fn rollback_inner(&mut self) -> Result<()> {
667 let _timer = METRIC_META_REGION_MIGRATION_EXECUTE
668 .with_label_values(&["rollback"])
669 .start_timer();
670
671 let table_id = self.context.region_id().table_id();
672 let region_id = self.context.region_id();
673 self.context.remove_table_route_value();
674 let table_metadata_manager = self.context.table_metadata_manager.clone();
675 let table_route = self.context.get_table_route_value().await?;
676
677 let downgraded = table_route
679 .region_routes()
680 .unwrap()
681 .iter()
682 .filter(|route| route.region.id == region_id)
683 .any(|route| route.is_leader_downgrading());
684
685 if downgraded {
686 info!("Rollbacking downgraded region leader table route, region: {region_id}");
687 table_metadata_manager
688 .update_leader_region_status(table_id, table_route, |route| {
689 if route.region.id == region_id {
690 Some(None)
691 } else {
692 None
693 }
694 })
695 .await
696 .context(error::TableMetadataManagerSnafu)
697 .map_err(BoxedError::new)
698 .with_context(|_| error::RetryLaterWithSourceSnafu {
699 reason: format!("Failed to update the table route during the rollback downgraded leader region: {region_id}"),
700 })?;
701 }
702
703 self.context.register_failure_detectors().await;
704
705 Ok(())
706 }
707}
708
709#[async_trait::async_trait]
710impl Procedure for RegionMigrationProcedure {
711 fn type_name(&self) -> &str {
712 Self::TYPE_NAME
713 }
714
715 async fn rollback(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<()> {
716 self.rollback_inner()
717 .await
718 .map_err(ProcedureError::external)
719 }
720
721 fn rollback_supported(&self) -> bool {
722 true
723 }
724
725 async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
726 let state = &mut self.state;
727
728 let name = state.name();
729 let _timer = METRIC_META_REGION_MIGRATION_EXECUTE
730 .with_label_values(&[name])
731 .start_timer();
732 match state.next(&mut self.context, ctx).await {
733 Ok((next, status)) => {
734 *state = next;
735 Ok(status)
736 }
737 Err(e) => {
738 if e.is_retryable() {
739 METRIC_META_REGION_MIGRATION_ERROR
740 .with_label_values(&[name, "retryable"])
741 .inc();
742 Err(ProcedureError::retry_later(e))
743 } else {
744 self.context.volatile_ctx.opening_region_guard.take();
746 self.context
747 .deregister_failure_detectors_for_candidate_region()
748 .await;
749 error!(
750 e;
751 "Region migration procedure failed, region_id: {}, from_peer: {}, to_peer: {}, {}",
752 self.context.region_id(),
753 self.context.persistent_ctx.from_peer,
754 self.context.persistent_ctx.to_peer,
755 self.context.volatile_ctx.metrics,
756 );
757 METRIC_META_REGION_MIGRATION_ERROR
758 .with_label_values(&[name, "external"])
759 .inc();
760 Err(ProcedureError::external(e))
761 }
762 }
763 }
764 }
765
766 fn dump(&self) -> ProcedureResult<String> {
767 let data = RegionMigrationData {
768 state: self.state.as_ref(),
769 persistent_ctx: &self.context.persistent_ctx,
770 };
771 serde_json::to_string(&data).context(ToJsonSnafu)
772 }
773
774 fn lock_key(&self) -> LockKey {
775 LockKey::new(self.context.persistent_ctx.lock_key())
776 }
777
778 fn user_metadata(&self) -> Option<UserMetadata> {
779 Some(UserMetadata::new(self.context.persistent_ctx()))
780 }
781}
782
783#[cfg(test)]
784mod tests {
785 use std::assert_matches::assert_matches;
786 use std::sync::Arc;
787
788 use common_meta::distributed_time_constants::REGION_LEASE_SECS;
789 use common_meta::instruction::Instruction;
790 use common_meta::key::test_utils::new_test_table_info;
791 use common_meta::rpc::router::{Region, RegionRoute};
792
793 use super::update_metadata::UpdateMetadata;
794 use super::*;
795 use crate::handler::HeartbeatMailbox;
796 use crate::procedure::region_migration::open_candidate_region::OpenCandidateRegion;
797 use crate::procedure::region_migration::test_util::*;
798 use crate::procedure::test_util::{
799 new_downgrade_region_reply, new_flush_region_reply_for_region, new_open_region_reply,
800 new_upgrade_region_reply,
801 };
802 use crate::service::mailbox::Channel;
803
804 fn new_persistent_context() -> PersistentContext {
805 test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
806 }
807
808 #[test]
809 fn test_lock_key() {
810 let persistent_context = new_persistent_context();
811 let expected_keys = persistent_context.lock_key();
812
813 let env = TestingEnv::new();
814 let context = env.context_factory();
815
816 let procedure = RegionMigrationProcedure::new(persistent_context, context, None);
817
818 let key = procedure.lock_key();
819 let keys = key.keys_to_lock().cloned().collect::<Vec<_>>();
820
821 for key in expected_keys {
822 assert!(keys.contains(&key));
823 }
824 }
825
826 #[test]
827 fn test_data_serialization() {
828 let persistent_context = new_persistent_context();
829
830 let env = TestingEnv::new();
831 let context = env.context_factory();
832
833 let procedure = RegionMigrationProcedure::new(persistent_context, context, None);
834
835 let serialized = procedure.dump().unwrap();
836 let expected = r#"{"persistent_ctx":{"catalog":"greptime","schema":"public","from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105,"timeout":"10s","trigger_reason":"Unknown"},"state":{"region_migration_state":"RegionMigrationStart"}}"#;
837 assert_eq!(expected, serialized);
838 }
839
840 #[test]
841 fn test_backward_compatibility() {
842 let persistent_ctx = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
843 let serialized = r#"{"catalog":"greptime","schema":"public","from_peer":{"id":1,"addr":""},"to_peer":{"id":2,"addr":""},"region_id":4398046511105}"#;
845 let deserialized: PersistentContext = serde_json::from_str(serialized).unwrap();
846
847 assert_eq!(persistent_ctx, deserialized);
848 }
849
850 #[derive(Debug, Serialize, Deserialize, Default)]
851 pub struct MockState;
852
853 #[async_trait::async_trait]
854 #[typetag::serde]
855 impl State for MockState {
856 async fn next(
857 &mut self,
858 _ctx: &mut Context,
859 _procedure_ctx: &ProcedureContext,
860 ) -> Result<(Box<dyn State>, Status)> {
861 Ok((Box::new(MockState), Status::done()))
862 }
863
864 fn as_any(&self) -> &dyn Any {
865 self
866 }
867 }
868
869 #[tokio::test]
870 async fn test_execution_after_deserialized() {
871 let env = TestingEnv::new();
872
873 fn new_mock_procedure(env: &TestingEnv) -> RegionMigrationProcedure {
874 let persistent_context = new_persistent_context();
875 let context_factory = env.context_factory();
876 let state = Box::<MockState>::default();
877 RegionMigrationProcedure::new_inner(state, persistent_context, context_factory, None)
878 }
879
880 let ctx = TestingEnv::procedure_context();
881 let mut procedure = new_mock_procedure(&env);
882 let mut status = None;
883 for _ in 0..3 {
884 status = Some(procedure.execute(&ctx).await.unwrap());
885 }
886 assert!(status.unwrap().is_done());
887
888 let ctx = TestingEnv::procedure_context();
889 let mut procedure = new_mock_procedure(&env);
890
891 status = Some(procedure.execute(&ctx).await.unwrap());
892
893 let serialized = procedure.dump().unwrap();
894
895 let context_factory = env.context_factory();
896 let tracker = env.tracker();
897 let mut procedure =
898 RegionMigrationProcedure::from_json(&serialized, context_factory, tracker.clone())
899 .unwrap();
900 assert!(tracker.contains(procedure.context.persistent_ctx.region_id));
901
902 for _ in 1..3 {
903 status = Some(procedure.execute(&ctx).await.unwrap());
904 }
905 assert!(status.unwrap().is_done());
906 }
907
908 #[tokio::test]
909 async fn test_broadcast_invalidate_table_cache() {
910 let mut env = TestingEnv::new();
911 let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
912 let ctx = env.context_factory().new_context(persistent_context);
913 let mailbox_ctx = env.mailbox_context();
914
915 ctx.invalidate_table_cache().await.unwrap();
917
918 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
919
920 mailbox_ctx
921 .insert_heartbeat_response_receiver(Channel::Frontend(1), tx)
922 .await;
923
924 ctx.invalidate_table_cache().await.unwrap();
925
926 let resp = rx.recv().await.unwrap().unwrap();
927 let msg = resp.mailbox_message.unwrap();
928
929 let instruction = HeartbeatMailbox::json_instruction(&msg).unwrap();
930 assert_eq!(
931 instruction,
932 Instruction::InvalidateCaches(vec![CacheIdent::TableId(1024)])
933 );
934 }
935
936 fn procedure_flow_steps(from_peer_id: u64, to_peer_id: u64) -> Vec<Step> {
937 vec![
938 Step::next(
940 "Should be the update metadata for downgrading",
941 None,
942 Assertion::simple(assert_update_metadata_downgrade, assert_need_persist),
943 ),
944 Step::next(
946 "Should be the downgrade leader region",
947 None,
948 Assertion::simple(assert_downgrade_leader_region, assert_no_persist),
949 ),
950 Step::next(
952 "Should be the upgrade candidate region",
953 Some(mock_datanode_reply(
954 from_peer_id,
955 Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))),
956 )),
957 Assertion::simple(assert_upgrade_candidate_region, assert_no_persist),
958 ),
959 Step::next(
961 "Should be the update metadata for upgrading",
962 Some(mock_datanode_reply(
963 to_peer_id,
964 Arc::new(|id| Ok(new_upgrade_region_reply(id, true, true, None))),
965 )),
966 Assertion::simple(assert_update_metadata_upgrade, assert_no_persist),
967 ),
968 Step::next(
970 "Should be the close downgraded region",
971 None,
972 Assertion::simple(assert_close_downgraded_region, assert_no_persist),
973 ),
974 Step::next(
976 "Should be the region migration end",
977 None,
978 Assertion::simple(assert_region_migration_end, assert_done),
979 ),
980 Step::next(
982 "Should be the region migration end again",
983 None,
984 Assertion::simple(assert_region_migration_end, assert_done),
985 ),
986 ]
987 }
988
989 #[tokio::test]
990 async fn test_procedure_flow() {
991 common_telemetry::init_default_ut_logging();
992
993 let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
994 let state = Box::new(RegionMigrationStart);
995
996 let from_peer_id = persistent_context.from_peer.id;
998 let to_peer_id = persistent_context.to_peer.id;
999 let from_peer = persistent_context.from_peer.clone();
1000 let to_peer = persistent_context.to_peer.clone();
1001 let region_id = persistent_context.region_id;
1002 let table_info = new_test_table_info(1024, vec![1]).into();
1003 let region_routes = vec![RegionRoute {
1004 region: Region::new_test(region_id),
1005 leader_peer: Some(from_peer),
1006 follower_peers: vec![to_peer],
1007 ..Default::default()
1008 }];
1009
1010 let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1011 suite.init_table_metadata(table_info, region_routes).await;
1012
1013 let steps = procedure_flow_steps(from_peer_id, to_peer_id);
1014 let timer = Instant::now();
1015
1016 let runner = ProcedureMigrationSuiteRunner::new(suite)
1018 .steps(steps)
1019 .run_once()
1020 .await;
1021
1022 assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS / 2);
1024
1025 runner.suite.verify_table_metadata().await;
1026 }
1027
1028 #[tokio::test]
1029 async fn test_procedure_flow_idempotent() {
1030 common_telemetry::init_default_ut_logging();
1031
1032 let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1033 let state = Box::new(RegionMigrationStart);
1034
1035 let from_peer_id = persistent_context.from_peer.id;
1037 let to_peer_id = persistent_context.to_peer.id;
1038 let from_peer = persistent_context.from_peer.clone();
1039 let to_peer = persistent_context.to_peer.clone();
1040 let region_id = persistent_context.region_id;
1041 let table_info = new_test_table_info(1024, vec![1]).into();
1042 let region_routes = vec![RegionRoute {
1043 region: Region::new_test(region_id),
1044 leader_peer: Some(from_peer),
1045 follower_peers: vec![to_peer],
1046 ..Default::default()
1047 }];
1048
1049 let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1050 suite.init_table_metadata(table_info, region_routes).await;
1051
1052 let steps = procedure_flow_steps(from_peer_id, to_peer_id);
1053 let setup_to_latest_persisted_state = Step::setup(
1054 "Sets state to UpdateMetadata::Downgrade",
1055 merge_before_test_fn(vec![
1056 setup_state(Arc::new(|| Box::new(UpdateMetadata::Downgrade))),
1057 Arc::new(reset_volatile_ctx),
1058 ]),
1059 );
1060
1061 let steps = [
1062 steps.clone(),
1063 vec![setup_to_latest_persisted_state.clone()],
1064 steps.clone()[1..].to_vec(),
1065 vec![setup_to_latest_persisted_state],
1066 steps.clone()[1..].to_vec(),
1067 ]
1068 .concat();
1069 let timer = Instant::now();
1070
1071 let runner = ProcedureMigrationSuiteRunner::new(suite)
1073 .steps(steps.clone())
1074 .run_once()
1075 .await;
1076
1077 assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS / 2);
1079
1080 runner.suite.verify_table_metadata().await;
1081 }
1082
1083 #[tokio::test]
1084 async fn test_procedure_flow_open_candidate_region_retryable_error() {
1085 common_telemetry::init_default_ut_logging();
1086
1087 let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1088 let state = Box::new(RegionMigrationStart);
1089
1090 let to_peer_id = persistent_context.to_peer.id;
1092 let from_peer = persistent_context.from_peer.clone();
1093 let region_id = persistent_context.region_id;
1094 let table_info = new_test_table_info(1024, vec![1]).into();
1095 let region_routes = vec![RegionRoute {
1096 region: Region::new_test(region_id),
1097 leader_peer: Some(from_peer),
1098 follower_peers: vec![],
1099 ..Default::default()
1100 }];
1101
1102 let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1103 suite.init_table_metadata(table_info, region_routes).await;
1104
1105 let steps = vec![
1106 Step::next(
1108 "Should be the open candidate region",
1109 None,
1110 Assertion::simple(assert_open_candidate_region, assert_need_persist),
1111 ),
1112 Step::next(
1114 "Should be throwing a non-retry error",
1115 Some(mock_datanode_reply(
1116 to_peer_id,
1117 Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1118 )),
1119 Assertion::error(|error| assert!(error.is_retryable())),
1120 ),
1121 Step::next(
1123 "Should be throwing a non-retry error again",
1124 Some(mock_datanode_reply(
1125 to_peer_id,
1126 Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1127 )),
1128 Assertion::error(|error| assert!(error.is_retryable())),
1129 ),
1130 ];
1131
1132 let setup_to_latest_persisted_state = Step::setup(
1133 "Sets state to UpdateMetadata::Downgrade",
1134 merge_before_test_fn(vec![
1135 setup_state(Arc::new(|| Box::new(OpenCandidateRegion))),
1136 Arc::new(reset_volatile_ctx),
1137 ]),
1138 );
1139
1140 let steps = [
1141 steps.clone(),
1142 vec![setup_to_latest_persisted_state.clone()],
1144 steps.clone()[1..].to_vec(),
1145 vec![setup_to_latest_persisted_state],
1146 steps.clone()[1..].to_vec(),
1147 ]
1148 .concat();
1149
1150 let runner = ProcedureMigrationSuiteRunner::new(suite)
1152 .steps(steps.clone())
1153 .run_once()
1154 .await;
1155
1156 let table_routes_version = runner
1157 .env()
1158 .table_metadata_manager()
1159 .table_route_manager()
1160 .table_route_storage()
1161 .get(region_id.table_id())
1162 .await
1163 .unwrap()
1164 .unwrap()
1165 .version();
1166 assert_eq!(table_routes_version.unwrap(), 0);
1168 }
1169
1170 #[tokio::test]
1171 async fn test_procedure_flow_upgrade_candidate_with_retry_and_failed() {
1172 common_telemetry::init_default_ut_logging();
1173
1174 let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1175 let state = Box::new(RegionMigrationStart);
1176
1177 let from_peer_id = persistent_context.from_peer.id;
1179 let to_peer_id = persistent_context.to_peer.id;
1180 let from_peer = persistent_context.from_peer.clone();
1181 let to_peer = persistent_context.to_peer.clone();
1182 let region_id = persistent_context.region_id;
1183 let table_info = new_test_table_info(1024, vec![1]).into();
1184 let region_routes = vec![RegionRoute {
1185 region: Region::new_test(region_id),
1186 leader_peer: Some(from_peer),
1187 follower_peers: vec![to_peer],
1188 ..Default::default()
1189 }];
1190
1191 let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1192 suite.init_table_metadata(table_info, region_routes).await;
1193
1194 let steps = vec![
1195 Step::next(
1197 "Should be the update metadata for downgrading",
1198 None,
1199 Assertion::simple(assert_update_metadata_downgrade, assert_need_persist),
1200 ),
1201 Step::next(
1203 "Should be the downgrade leader region",
1204 None,
1205 Assertion::simple(assert_downgrade_leader_region, assert_no_persist),
1206 ),
1207 Step::next(
1209 "Should be the upgrade candidate region",
1210 Some(mock_datanode_reply(
1211 from_peer_id,
1212 Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))),
1213 )),
1214 Assertion::simple(assert_upgrade_candidate_region, assert_no_persist),
1215 ),
1216 Step::next(
1218 "Should be the rollback metadata",
1219 Some(mock_datanode_reply(
1220 to_peer_id,
1221 Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1222 )),
1223 Assertion::simple(assert_update_metadata_rollback, assert_no_persist),
1224 ),
1225 Step::next(
1227 "Should be the region migration abort",
1228 None,
1229 Assertion::simple(assert_region_migration_abort, assert_no_persist),
1230 ),
1231 Step::next(
1233 "Should throw an error",
1234 None,
1235 Assertion::error(|error| {
1236 assert!(!error.is_retryable());
1237 assert_matches!(error, error::Error::MigrationAbort { .. });
1238 }),
1239 ),
1240 ];
1241
1242 let setup_to_latest_persisted_state = Step::setup(
1243 "Sets state to UpdateMetadata::Downgrade",
1244 merge_before_test_fn(vec![
1245 setup_state(Arc::new(|| Box::new(UpdateMetadata::Downgrade))),
1246 Arc::new(reset_volatile_ctx),
1247 ]),
1248 );
1249
1250 let steps = [
1251 steps.clone(),
1252 vec![setup_to_latest_persisted_state.clone()],
1253 steps.clone()[1..].to_vec(),
1254 vec![setup_to_latest_persisted_state],
1255 steps.clone()[1..].to_vec(),
1256 ]
1257 .concat();
1258
1259 ProcedureMigrationSuiteRunner::new(suite)
1261 .steps(steps.clone())
1262 .run_once()
1263 .await;
1264 }
1265
1266 #[tokio::test]
1267 async fn test_procedure_flow_upgrade_candidate_with_retry() {
1268 common_telemetry::init_default_ut_logging();
1269
1270 let persistent_context = test_util::new_persistent_context(1, 2, RegionId::new(1024, 1));
1271 let state = Box::new(RegionMigrationStart);
1272
1273 let to_peer_id = persistent_context.to_peer.id;
1275 let from_peer_id = persistent_context.from_peer.id;
1276 let from_peer = persistent_context.from_peer.clone();
1277 let region_id = persistent_context.region_id;
1278 let table_info = new_test_table_info(1024, vec![1]).into();
1279 let region_routes = vec![RegionRoute {
1280 region: Region::new_test(region_id),
1281 leader_peer: Some(from_peer),
1282 follower_peers: vec![],
1283 ..Default::default()
1284 }];
1285
1286 let suite = ProcedureMigrationTestSuite::new(persistent_context, state);
1287 suite.init_table_metadata(table_info, region_routes).await;
1288
1289 let steps = vec![
1290 Step::next(
1292 "Should be the open candidate region",
1293 None,
1294 Assertion::simple(assert_open_candidate_region, assert_need_persist),
1295 ),
1296 Step::next(
1298 "Should be throwing a retryable error",
1299 Some(mock_datanode_reply(
1300 to_peer_id,
1301 Arc::new(|id| Ok(new_open_region_reply(id, false, None))),
1302 )),
1303 Assertion::error(|error| assert!(error.is_retryable(), "err: {error:?}")),
1304 ),
1305 Step::next(
1307 "Should be the update metadata for downgrading",
1308 Some(mock_datanode_reply(
1309 to_peer_id,
1310 Arc::new(|id| Ok(new_open_region_reply(id, true, None))),
1311 )),
1312 Assertion::simple(assert_flush_leader_region, assert_no_persist),
1313 ),
1314 Step::next(
1316 "Should be the flush leader region",
1317 Some(mock_datanode_reply(
1318 from_peer_id,
1319 Arc::new(move |id| {
1320 Ok(new_flush_region_reply_for_region(id, region_id, true, None))
1321 }),
1322 )),
1323 Assertion::simple(assert_update_metadata_downgrade, assert_no_persist),
1324 ),
1325 Step::next(
1327 "Should be the downgrade leader region",
1328 None,
1329 Assertion::simple(assert_downgrade_leader_region, assert_no_persist),
1330 ),
1331 Step::next(
1333 "Should be the upgrade candidate region",
1334 Some(mock_datanode_reply(
1335 from_peer_id,
1336 merge_mailbox_messages(vec![
1337 Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1338 Arc::new(|id| Ok(new_downgrade_region_reply(id, None, true, None))),
1339 ]),
1340 )),
1341 Assertion::simple(assert_upgrade_candidate_region, assert_no_persist),
1342 ),
1343 Step::next(
1345 "Should be the update metadata for upgrading",
1346 Some(mock_datanode_reply(
1347 to_peer_id,
1348 merge_mailbox_messages(vec![
1349 Arc::new(|id| error::MailboxTimeoutSnafu { id }.fail()),
1350 Arc::new(|id| Ok(new_upgrade_region_reply(id, true, true, None))),
1351 ]),
1352 )),
1353 Assertion::simple(assert_update_metadata_upgrade, assert_no_persist),
1354 ),
1355 Step::next(
1357 "Should be the close downgraded region",
1358 None,
1359 Assertion::simple(assert_close_downgraded_region, assert_no_persist),
1360 ),
1361 Step::next(
1363 "Should be the region migration end",
1364 None,
1365 Assertion::simple(assert_region_migration_end, assert_done),
1366 ),
1367 Step::next(
1369 "Should be the region migration end again",
1370 None,
1371 Assertion::simple(assert_region_migration_end, assert_done),
1372 ),
1373 Step::setup(
1375 "Sets state to RegionMigrationStart",
1376 merge_before_test_fn(vec![
1377 setup_state(Arc::new(|| Box::new(RegionMigrationStart))),
1378 Arc::new(reset_volatile_ctx),
1379 ]),
1380 ),
1381 Step::next(
1385 "Should be the region migration end(has been migrated)",
1386 None,
1387 Assertion::simple(assert_region_migration_end, assert_done),
1388 ),
1389 ];
1390
1391 let steps = [steps.clone()].concat();
1392 let timer = Instant::now();
1393
1394 let runner = ProcedureMigrationSuiteRunner::new(suite)
1396 .steps(steps.clone())
1397 .run_once()
1398 .await;
1399
1400 assert!(timer.elapsed().as_secs() < REGION_LEASE_SECS);
1402 runner.suite.verify_table_metadata().await;
1403 }
1404}