1use std::any::Any;
16use std::time::Duration;
17
18use api::v1::meta::MailboxMessage;
19use common_error::ext::BoxedError;
20use common_meta::distributed_time_constants::REGION_LEASE_SECS;
21use common_meta::instruction::{
22 DowngradeRegion, DowngradeRegionReply, DowngradeRegionsReply, Instruction, InstructionReply,
23};
24use common_procedure::{Context as ProcedureContext, Status};
25use common_telemetry::{error, info, warn};
26use common_time::util::current_time_millis;
27use serde::{Deserialize, Serialize};
28use snafu::{OptionExt, ResultExt};
29use tokio::time::{Instant, sleep};
30
31use crate::discovery::utils::find_datanode_lease_value;
32use crate::error::{self, Result};
33use crate::handler::HeartbeatMailbox;
34use crate::procedure::region_migration::update_metadata::UpdateMetadata;
35use crate::procedure::region_migration::upgrade_candidate_region::UpgradeCandidateRegion;
36use crate::procedure::region_migration::{Context, State};
37use crate::service::mailbox::Channel;
38
39#[derive(Debug, Serialize, Deserialize)]
40pub struct DowngradeLeaderRegion {
41 optimistic_retry: usize,
43 retry_initial_interval: Duration,
45}
46
47impl Default for DowngradeLeaderRegion {
48 fn default() -> Self {
49 Self {
50 optimistic_retry: 3,
51 retry_initial_interval: Duration::from_millis(500),
52 }
53 }
54}
55
56#[async_trait::async_trait]
57#[typetag::serde]
58impl State for DowngradeLeaderRegion {
59 async fn next(
60 &mut self,
61 ctx: &mut Context,
62 _procedure_ctx: &ProcedureContext,
63 ) -> Result<(Box<dyn State>, Status)> {
64 let now = Instant::now();
65 ctx.volatile_ctx
67 .set_leader_region_lease_deadline(Duration::from_secs(REGION_LEASE_SECS));
68
69 match self.downgrade_region_with_retry(ctx).await {
70 Ok(_) => {
71 info!(
73 "Downgraded region leader success, region: {}",
74 ctx.persistent_ctx.region_id
75 );
76 }
77 Err(error::Error::ExceededDeadline { .. }) => {
78 info!(
79 "Downgrade region leader exceeded deadline, region: {}",
80 ctx.persistent_ctx.region_id
81 );
82 return Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false)));
84 }
85 Err(err) => {
86 error!(err; "Occurs non-retryable error, region: {}", ctx.persistent_ctx.region_id);
87 if let Some(deadline) = ctx.volatile_ctx.leader_region_lease_deadline.as_ref() {
88 info!(
89 "Running into the downgrade region leader slow path, region: {}, sleep until {:?}",
90 ctx.persistent_ctx.region_id, deadline
91 );
92 tokio::time::sleep_until(*deadline).await;
93 } else {
94 warn!(
95 "Leader region lease deadline is not set, region: {}",
96 ctx.persistent_ctx.region_id
97 );
98 }
99 }
100 }
101 ctx.update_downgrade_leader_region_elapsed(now);
102
103 Ok((
104 Box::new(UpgradeCandidateRegion::default()),
105 Status::executing(false),
106 ))
107 }
108
109 fn as_any(&self) -> &dyn Any {
110 self
111 }
112}
113
114impl DowngradeLeaderRegion {
115 fn build_downgrade_region_instruction(
117 &self,
118 ctx: &Context,
119 flush_timeout: Duration,
120 ) -> Instruction {
121 let pc = &ctx.persistent_ctx;
122 let region_id = pc.region_id;
123 Instruction::DowngradeRegions(vec![DowngradeRegion {
124 region_id,
125 flush_timeout: Some(flush_timeout),
126 }])
127 }
128
129 async fn downgrade_region(&self, ctx: &mut Context) -> Result<()> {
143 let region_id = ctx.persistent_ctx.region_id;
144 let operation_timeout =
145 ctx.next_operation_timeout()
146 .context(error::ExceededDeadlineSnafu {
147 operation: "Downgrade region",
148 })?;
149 let downgrade_instruction = self.build_downgrade_region_instruction(ctx, operation_timeout);
150
151 let leader = &ctx.persistent_ctx.from_peer;
152 let msg = MailboxMessage::json_message(
153 &format!("Downgrade leader region: {}", region_id),
154 &format!("Metasrv@{}", ctx.server_addr()),
155 &format!("Datanode-{}@{}", leader.id, leader.addr),
156 common_time::util::current_time_millis(),
157 &downgrade_instruction,
158 )
159 .with_context(|_| error::SerializeToJsonSnafu {
160 input: downgrade_instruction.to_string(),
161 })?;
162
163 let ch = Channel::Datanode(leader.id);
164 let now = Instant::now();
165 let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
166
167 match receiver.await {
168 Ok(msg) => {
169 let reply = HeartbeatMailbox::json_reply(&msg)?;
170 info!(
171 "Received downgrade region reply: {:?}, region: {}, elapsed: {:?}",
172 reply,
173 region_id,
174 now.elapsed()
175 );
176 let InstructionReply::DowngradeRegions(DowngradeRegionsReply { replies }) = reply
177 else {
178 return error::UnexpectedInstructionReplySnafu {
179 mailbox_message: msg.to_string(),
180 reason: "expect downgrade region reply",
181 }
182 .fail();
183 };
184
185 let DowngradeRegionReply {
187 region_id,
188 last_entry_id,
189 metadata_last_entry_id,
190 exists,
191 error,
192 } = &replies[0];
193
194 if error.is_some() {
195 return error::RetryLaterSnafu {
196 reason: format!(
197 "Failed to downgrade the region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
198 region_id, leader, error, now.elapsed()
199 ),
200 }
201 .fail();
202 }
203
204 if !exists {
205 warn!(
206 "Trying to downgrade the region {} on datanode {:?}, but region doesn't exist!, elapsed: {:?}",
207 region_id,
208 leader,
209 now.elapsed()
210 );
211 } else {
212 info!(
213 "Region {} leader is downgraded on datanode {:?}, last_entry_id: {:?}, metadata_last_entry_id: {:?}, elapsed: {:?}",
214 region_id,
215 leader,
216 last_entry_id,
217 metadata_last_entry_id,
218 now.elapsed()
219 );
220 }
221
222 if let Some(last_entry_id) = last_entry_id {
223 ctx.volatile_ctx.set_last_entry_id(*last_entry_id);
224 }
225
226 if let Some(metadata_last_entry_id) = metadata_last_entry_id {
227 ctx.volatile_ctx
228 .set_metadata_last_entry_id(*metadata_last_entry_id);
229 }
230
231 Ok(())
232 }
233 Err(error::Error::MailboxTimeout { .. }) => {
234 let reason = format!(
235 "Mailbox received timeout for downgrade leader region {region_id} on datanode {:?}, elapsed: {:?}",
236 leader,
237 now.elapsed()
238 );
239 error::RetryLaterSnafu { reason }.fail()
240 }
241 Err(err) => Err(err),
242 }
243 }
244
245 async fn update_leader_region_lease_deadline(&self, ctx: &mut Context) {
246 let leader = &ctx.persistent_ctx.from_peer;
247
248 let last_connection_at = match find_datanode_lease_value(&ctx.in_memory, leader.id).await {
249 Ok(lease_value) => lease_value.map(|lease_value| lease_value.timestamp_millis),
250 Err(err) => {
251 error!(err; "Failed to find datanode lease value for datanode: {}, during region migration, region: {}", leader, ctx.persistent_ctx.region_id);
252 return;
253 }
254 };
255
256 if let Some(last_connection_at) = last_connection_at {
257 let now = current_time_millis();
258 let elapsed = now - last_connection_at;
259 let region_lease = Duration::from_secs(REGION_LEASE_SECS);
260
261 if elapsed >= (REGION_LEASE_SECS * 1000) as i64 {
267 ctx.volatile_ctx.reset_leader_region_lease_deadline();
268 info!(
269 "Datanode {}({}) has been disconnected for longer than the region lease period ({:?}), reset leader region lease deadline to None, region: {}",
270 leader, last_connection_at, region_lease, ctx.persistent_ctx.region_id
271 );
272 } else if elapsed > 0 {
273 let lease_timeout =
275 region_lease - Duration::from_millis((now - last_connection_at) as u64);
276 ctx.volatile_ctx.reset_leader_region_lease_deadline();
277 ctx.volatile_ctx
278 .set_leader_region_lease_deadline(lease_timeout);
279 info!(
280 "Datanode {}({}) last connected {:?} ago, updated leader region lease deadline to {:?}, region: {}",
281 leader,
282 last_connection_at,
283 elapsed,
284 ctx.volatile_ctx.leader_region_lease_deadline,
285 ctx.persistent_ctx.region_id
286 );
287 } else {
288 warn!(
289 "Datanode {} has invalid last connection timestamp: {} (which is after current time: {}), region: {}",
290 leader, last_connection_at, now, ctx.persistent_ctx.region_id
291 )
292 }
293 } else {
294 warn!(
295 "Failed to find last connection time for datanode {}, unable to update region lease deadline, region: {}",
296 leader, ctx.persistent_ctx.region_id
297 )
298 }
299 }
300
301 async fn downgrade_region_with_retry(&self, ctx: &mut Context) -> Result<()> {
312 let mut retry = 0;
313
314 loop {
315 let timer = Instant::now();
316 if let Err(err) = self.downgrade_region(ctx).await {
317 ctx.update_operations_elapsed(timer);
318 retry += 1;
319 if matches!(err, error::Error::ExceededDeadline { .. }) {
321 error!(err; "Failed to downgrade region leader, region: {}, exceeded deadline", ctx.persistent_ctx.region_id);
322 return Err(err);
323 } else if matches!(err, error::Error::PusherNotFound { .. }) {
324 error!(err; "Failed to downgrade region leader, region: {}, datanode({}) is unreachable(PusherNotFound)", ctx.persistent_ctx.region_id, ctx.persistent_ctx.from_peer.id);
326 self.update_leader_region_lease_deadline(ctx).await;
327 return Err(err);
328 } else if err.is_retryable() && retry < self.optimistic_retry {
329 error!(err; "Failed to downgrade region leader, region: {}, retry later", ctx.persistent_ctx.region_id);
330 sleep(self.retry_initial_interval).await;
331 } else {
332 return Err(BoxedError::new(err)).context(error::DowngradeLeaderSnafu {
333 region_id: ctx.persistent_ctx.region_id,
334 })?;
335 }
336 } else {
337 ctx.update_operations_elapsed(timer);
338 ctx.volatile_ctx.reset_leader_region_lease_deadline();
340 break;
341 }
342 }
343
344 Ok(())
345 }
346}
347
348#[cfg(test)]
349mod tests {
350 use std::assert_matches::assert_matches;
351 use std::collections::HashMap;
352
353 use common_meta::key::table_route::TableRouteValue;
354 use common_meta::key::test_utils::new_test_table_info;
355 use common_meta::peer::Peer;
356 use common_meta::rpc::router::{Region, RegionRoute};
357 use store_api::storage::RegionId;
358 use tokio::time::Instant;
359
360 use super::*;
361 use crate::error::Error;
362 use crate::procedure::region_migration::manager::RegionMigrationTriggerReason;
363 use crate::procedure::region_migration::test_util::{TestingEnv, new_procedure_context};
364 use crate::procedure::region_migration::{ContextFactory, PersistentContext};
365 use crate::procedure::test_util::{
366 new_close_region_reply, new_downgrade_region_reply, send_mock_reply,
367 };
368
369 fn new_persistent_context() -> PersistentContext {
370 PersistentContext {
371 catalog: "greptime".into(),
372 schema: "public".into(),
373 from_peer: Peer::empty(1),
374 to_peer: Peer::empty(2),
375 region_id: RegionId::new(1024, 1),
376 timeout: Duration::from_millis(1000),
377 trigger_reason: RegionMigrationTriggerReason::Manual,
378 }
379 }
380
381 async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap<u32, String>) {
382 let table_info =
383 new_test_table_info(ctx.persistent_ctx.region_id.table_id(), vec![1]).into();
384 let region_routes = vec![RegionRoute {
385 region: Region::new_test(ctx.persistent_ctx.region_id),
386 leader_peer: Some(ctx.persistent_ctx.from_peer.clone()),
387 follower_peers: vec![ctx.persistent_ctx.to_peer.clone()],
388 ..Default::default()
389 }];
390 ctx.table_metadata_manager
391 .create_table_metadata(
392 table_info,
393 TableRouteValue::physical(region_routes),
394 wal_options,
395 )
396 .await
397 .unwrap();
398 }
399
400 #[tokio::test]
401 async fn test_datanode_is_unreachable() {
402 let state = DowngradeLeaderRegion::default();
403 let persistent_context = new_persistent_context();
404 let env = TestingEnv::new();
405 let mut ctx = env.context_factory().new_context(persistent_context);
406 prepare_table_metadata(&ctx, HashMap::default()).await;
407 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
408
409 assert_matches!(err, Error::PusherNotFound { .. });
410 assert!(!err.is_retryable());
411 }
412
413 #[tokio::test]
414 async fn test_pusher_dropped() {
415 let state = DowngradeLeaderRegion::default();
416 let persistent_context = new_persistent_context();
417 let from_peer_id = persistent_context.from_peer.id;
418
419 let mut env = TestingEnv::new();
420 let mut ctx = env.context_factory().new_context(persistent_context);
421 prepare_table_metadata(&ctx, HashMap::default()).await;
422 let mailbox_ctx = env.mailbox_context();
423
424 let (tx, rx) = tokio::sync::mpsc::channel(1);
425
426 mailbox_ctx
427 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
428 .await;
429
430 drop(rx);
431
432 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
433
434 assert_matches!(err, Error::PushMessage { .. });
435 assert!(!err.is_retryable());
436 }
437
438 #[tokio::test]
439 async fn test_procedure_exceeded_deadline() {
440 let state = DowngradeLeaderRegion::default();
441 let persistent_context = new_persistent_context();
442 let env = TestingEnv::new();
443 let mut ctx = env.context_factory().new_context(persistent_context);
444 prepare_table_metadata(&ctx, HashMap::default()).await;
445 ctx.volatile_ctx.metrics.operations_elapsed =
446 ctx.persistent_ctx.timeout + Duration::from_secs(1);
447
448 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
449
450 assert_matches!(err, Error::ExceededDeadline { .. });
451 assert!(!err.is_retryable());
452
453 let err = state
454 .downgrade_region_with_retry(&mut ctx)
455 .await
456 .unwrap_err();
457 assert_matches!(err, Error::ExceededDeadline { .. });
458 assert!(!err.is_retryable());
459 }
460
461 #[tokio::test]
462 async fn test_unexpected_instruction_reply() {
463 let state = DowngradeLeaderRegion::default();
464 let persistent_context = new_persistent_context();
465 let from_peer_id = persistent_context.from_peer.id;
466
467 let mut env = TestingEnv::new();
468 let mut ctx = env.context_factory().new_context(persistent_context);
469 prepare_table_metadata(&ctx, HashMap::default()).await;
470 let mailbox_ctx = env.mailbox_context();
471 let mailbox = mailbox_ctx.mailbox().clone();
472
473 let (tx, rx) = tokio::sync::mpsc::channel(1);
474
475 mailbox_ctx
476 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
477 .await;
478
479 send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
481
482 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
483
484 assert_matches!(err, Error::UnexpectedInstructionReply { .. });
485 assert!(!err.is_retryable());
486 }
487
488 #[tokio::test]
489 async fn test_instruction_exceeded_deadline() {
490 let state = DowngradeLeaderRegion::default();
491 let persistent_context = new_persistent_context();
492 let from_peer_id = persistent_context.from_peer.id;
493
494 let mut env = TestingEnv::new();
495 let mut ctx = env.context_factory().new_context(persistent_context);
496 prepare_table_metadata(&ctx, HashMap::default()).await;
497 let mailbox_ctx = env.mailbox_context();
498 let mailbox = mailbox_ctx.mailbox().clone();
499
500 let (tx, rx) = tokio::sync::mpsc::channel(1);
501
502 mailbox_ctx
503 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
504 .await;
505
506 send_mock_reply(mailbox, rx, |id| {
507 Err(error::MailboxTimeoutSnafu { id }.build())
508 });
509
510 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
511
512 assert_matches!(err, Error::RetryLater { .. });
513 assert!(err.is_retryable());
514 }
515
516 #[tokio::test]
517 async fn test_downgrade_region_failed() {
518 let state = DowngradeLeaderRegion::default();
519 let persistent_context = new_persistent_context();
520 let from_peer_id = persistent_context.from_peer.id;
521
522 let mut env = TestingEnv::new();
523 let mut ctx = env.context_factory().new_context(persistent_context);
524 prepare_table_metadata(&ctx, HashMap::default()).await;
525 let mailbox_ctx = env.mailbox_context();
526 let mailbox = mailbox_ctx.mailbox().clone();
527
528 let (tx, rx) = tokio::sync::mpsc::channel(1);
529
530 mailbox_ctx
531 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
532 .await;
533
534 send_mock_reply(mailbox, rx, |id| {
535 Ok(new_downgrade_region_reply(
536 id,
537 None,
538 false,
539 Some("test mocked".to_string()),
540 ))
541 });
542
543 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
544
545 assert_matches!(err, Error::RetryLater { .. });
546 assert!(err.is_retryable());
547 assert!(format!("{err:?}").contains("test mocked"), "err: {err:?}",);
548 }
549
550 #[tokio::test]
551 async fn test_downgrade_region_with_retry_fast_path() {
552 let state = DowngradeLeaderRegion::default();
553 let persistent_context = new_persistent_context();
554 let from_peer_id = persistent_context.from_peer.id;
555
556 let mut env = TestingEnv::new();
557 let mut ctx = env.context_factory().new_context(persistent_context);
558 prepare_table_metadata(&ctx, HashMap::default()).await;
559 let mailbox_ctx = env.mailbox_context();
560 let mailbox = mailbox_ctx.mailbox().clone();
561
562 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
563
564 mailbox_ctx
565 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
566 .await;
567
568 common_runtime::spawn_global(async move {
569 let resp = rx.recv().await.unwrap().unwrap();
571 let reply_id = resp.mailbox_message.unwrap().id;
572 mailbox
573 .on_recv(
574 reply_id,
575 Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
576 )
577 .await
578 .unwrap();
579
580 let resp = rx.recv().await.unwrap().unwrap();
582 let reply_id = resp.mailbox_message.unwrap().id;
583 mailbox
584 .on_recv(
585 reply_id,
586 Ok(new_downgrade_region_reply(reply_id, Some(1), true, None)),
587 )
588 .await
589 .unwrap();
590 });
591
592 state.downgrade_region_with_retry(&mut ctx).await.unwrap();
593 assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, Some(1));
594 assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none());
595 }
596
597 #[tokio::test]
598 async fn test_downgrade_region_with_retry_slow_path() {
599 let state = DowngradeLeaderRegion {
600 optimistic_retry: 3,
601 retry_initial_interval: Duration::from_millis(100),
602 };
603 let persistent_context = new_persistent_context();
604 let from_peer_id = persistent_context.from_peer.id;
605
606 let mut env = TestingEnv::new();
607 let mut ctx = env.context_factory().new_context(persistent_context);
608 let mailbox_ctx = env.mailbox_context();
609 let mailbox = mailbox_ctx.mailbox().clone();
610
611 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
612
613 mailbox_ctx
614 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
615 .await;
616
617 common_runtime::spawn_global(async move {
618 for _ in 0..3 {
619 let resp = rx.recv().await.unwrap().unwrap();
620 let reply_id = resp.mailbox_message.unwrap().id;
621 mailbox
622 .on_recv(
623 reply_id,
624 Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
625 )
626 .await
627 .unwrap();
628 }
629 });
630
631 ctx.volatile_ctx
632 .set_leader_region_lease_deadline(Duration::from_secs(5));
633 let expected_deadline = ctx.volatile_ctx.leader_region_lease_deadline.unwrap();
634 let err = state
635 .downgrade_region_with_retry(&mut ctx)
636 .await
637 .unwrap_err();
638 assert_matches!(err, error::Error::DowngradeLeader { .. });
639 assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, None);
640 assert_eq!(
642 ctx.volatile_ctx.leader_region_lease_deadline.unwrap(),
643 expected_deadline
644 )
645 }
646
647 #[tokio::test]
648 async fn test_next_upgrade_candidate_state() {
649 let mut state = Box::<DowngradeLeaderRegion>::default();
650 let persistent_context = new_persistent_context();
651 let from_peer_id = persistent_context.from_peer.id;
652
653 let mut env = TestingEnv::new();
654 let mut ctx = env.context_factory().new_context(persistent_context);
655 prepare_table_metadata(&ctx, HashMap::default()).await;
656 let mailbox_ctx = env.mailbox_context();
657 let mailbox = mailbox_ctx.mailbox().clone();
658
659 let (tx, rx) = tokio::sync::mpsc::channel(1);
660
661 mailbox_ctx
662 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
663 .await;
664
665 send_mock_reply(mailbox, rx, |id| {
666 Ok(new_downgrade_region_reply(id, Some(1), true, None))
667 });
668
669 let timer = Instant::now();
670 let procedure_ctx = new_procedure_context();
671 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
672 let elapsed = timer.elapsed().as_secs();
673 assert!(elapsed < REGION_LEASE_SECS / 2);
674 assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, Some(1));
675 assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none());
676
677 let _ = next
678 .as_any()
679 .downcast_ref::<UpgradeCandidateRegion>()
680 .unwrap();
681 }
682
683 #[tokio::test]
684 async fn test_downgrade_region_procedure_exceeded_deadline() {
685 let mut state = Box::<UpgradeCandidateRegion>::default();
686 state.retry_initial_interval = Duration::from_millis(100);
687 let persistent_context = new_persistent_context();
688 let to_peer_id = persistent_context.to_peer.id;
689
690 let mut env = TestingEnv::new();
691 let mut ctx = env.context_factory().new_context(persistent_context);
692 let mailbox_ctx = env.mailbox_context();
693 let mailbox = mailbox_ctx.mailbox().clone();
694 ctx.volatile_ctx.metrics.operations_elapsed =
695 ctx.persistent_ctx.timeout + Duration::from_secs(1);
696
697 let (tx, rx) = tokio::sync::mpsc::channel(1);
698 mailbox_ctx
699 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
700 .await;
701
702 send_mock_reply(mailbox, rx, |id| {
703 Ok(new_downgrade_region_reply(id, None, true, None))
704 });
705 let procedure_ctx = new_procedure_context();
706 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
707 let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
708 assert_matches!(update_metadata, UpdateMetadata::Rollback);
709 }
710}