1use std::any::Any;
16use std::time::Duration;
17
18use api::v1::meta::MailboxMessage;
19use common_error::ext::BoxedError;
20use common_meta::distributed_time_constants::REGION_LEASE_SECS;
21use common_meta::instruction::{
22 DowngradeRegion, DowngradeRegionReply, Instruction, InstructionReply,
23};
24use common_procedure::{Context as ProcedureContext, Status};
25use common_telemetry::{error, info, warn};
26use common_time::util::current_time_millis;
27use serde::{Deserialize, Serialize};
28use snafu::{OptionExt, ResultExt};
29use tokio::time::{Instant, sleep};
30
31use crate::discovery::utils::find_datanode_lease_value;
32use crate::error::{self, Result};
33use crate::handler::HeartbeatMailbox;
34use crate::procedure::region_migration::update_metadata::UpdateMetadata;
35use crate::procedure::region_migration::upgrade_candidate_region::UpgradeCandidateRegion;
36use crate::procedure::region_migration::{Context, State};
37use crate::service::mailbox::Channel;
38
39#[derive(Debug, Serialize, Deserialize)]
40pub struct DowngradeLeaderRegion {
41 optimistic_retry: usize,
43 retry_initial_interval: Duration,
45}
46
47impl Default for DowngradeLeaderRegion {
48 fn default() -> Self {
49 Self {
50 optimistic_retry: 3,
51 retry_initial_interval: Duration::from_millis(500),
52 }
53 }
54}
55
56#[async_trait::async_trait]
57#[typetag::serde]
58impl State for DowngradeLeaderRegion {
59 async fn next(
60 &mut self,
61 ctx: &mut Context,
62 _procedure_ctx: &ProcedureContext,
63 ) -> Result<(Box<dyn State>, Status)> {
64 let now = Instant::now();
65 ctx.volatile_ctx
67 .set_leader_region_lease_deadline(Duration::from_secs(REGION_LEASE_SECS));
68
69 match self.downgrade_region_with_retry(ctx).await {
70 Ok(_) => {
71 info!(
73 "Downgraded region leader success, region: {}",
74 ctx.persistent_ctx.region_id
75 );
76 }
77 Err(error::Error::ExceededDeadline { .. }) => {
78 info!(
79 "Downgrade region leader exceeded deadline, region: {}",
80 ctx.persistent_ctx.region_id
81 );
82 return Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false)));
84 }
85 Err(err) => {
86 error!(err; "Occurs non-retryable error, region: {}", ctx.persistent_ctx.region_id);
87 if let Some(deadline) = ctx.volatile_ctx.leader_region_lease_deadline.as_ref() {
88 info!(
89 "Running into the downgrade region leader slow path, region: {}, sleep until {:?}",
90 ctx.persistent_ctx.region_id, deadline
91 );
92 tokio::time::sleep_until(*deadline).await;
93 } else {
94 warn!(
95 "Leader region lease deadline is not set, region: {}",
96 ctx.persistent_ctx.region_id
97 );
98 }
99 }
100 }
101 ctx.update_downgrade_leader_region_elapsed(now);
102
103 Ok((
104 Box::new(UpgradeCandidateRegion::default()),
105 Status::executing(false),
106 ))
107 }
108
109 fn as_any(&self) -> &dyn Any {
110 self
111 }
112}
113
114impl DowngradeLeaderRegion {
115 fn build_downgrade_region_instruction(
117 &self,
118 ctx: &Context,
119 flush_timeout: Duration,
120 ) -> Instruction {
121 let pc = &ctx.persistent_ctx;
122 let region_id = pc.region_id;
123 Instruction::DowngradeRegion(DowngradeRegion {
124 region_id,
125 flush_timeout: Some(flush_timeout),
126 })
127 }
128
129 async fn downgrade_region(&self, ctx: &mut Context) -> Result<()> {
143 let region_id = ctx.persistent_ctx.region_id;
144 let operation_timeout =
145 ctx.next_operation_timeout()
146 .context(error::ExceededDeadlineSnafu {
147 operation: "Downgrade region",
148 })?;
149 let downgrade_instruction = self.build_downgrade_region_instruction(ctx, operation_timeout);
150
151 let leader = &ctx.persistent_ctx.from_peer;
152 let msg = MailboxMessage::json_message(
153 &format!("Downgrade leader region: {}", region_id),
154 &format!("Metasrv@{}", ctx.server_addr()),
155 &format!("Datanode-{}@{}", leader.id, leader.addr),
156 common_time::util::current_time_millis(),
157 &downgrade_instruction,
158 )
159 .with_context(|_| error::SerializeToJsonSnafu {
160 input: downgrade_instruction.to_string(),
161 })?;
162
163 let ch = Channel::Datanode(leader.id);
164 let now = Instant::now();
165 let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
166
167 match receiver.await {
168 Ok(msg) => {
169 let reply = HeartbeatMailbox::json_reply(&msg)?;
170 info!(
171 "Received downgrade region reply: {:?}, region: {}, elapsed: {:?}",
172 reply,
173 region_id,
174 now.elapsed()
175 );
176 let InstructionReply::DowngradeRegion(DowngradeRegionReply {
177 last_entry_id,
178 metadata_last_entry_id,
179 exists,
180 error,
181 }) = reply
182 else {
183 return error::UnexpectedInstructionReplySnafu {
184 mailbox_message: msg.to_string(),
185 reason: "expect downgrade region reply",
186 }
187 .fail();
188 };
189
190 if error.is_some() {
191 return error::RetryLaterSnafu {
192 reason: format!(
193 "Failed to downgrade the region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
194 region_id, leader, error, now.elapsed()
195 ),
196 }
197 .fail();
198 }
199
200 if !exists {
201 warn!(
202 "Trying to downgrade the region {} on datanode {:?}, but region doesn't exist!, elapsed: {:?}",
203 region_id,
204 leader,
205 now.elapsed()
206 );
207 } else {
208 info!(
209 "Region {} leader is downgraded on datanode {:?}, last_entry_id: {:?}, metadata_last_entry_id: {:?}, elapsed: {:?}",
210 region_id,
211 leader,
212 last_entry_id,
213 metadata_last_entry_id,
214 now.elapsed()
215 );
216 }
217
218 if let Some(last_entry_id) = last_entry_id {
219 ctx.volatile_ctx.set_last_entry_id(last_entry_id);
220 }
221
222 if let Some(metadata_last_entry_id) = metadata_last_entry_id {
223 ctx.volatile_ctx
224 .set_metadata_last_entry_id(metadata_last_entry_id);
225 }
226
227 Ok(())
228 }
229 Err(error::Error::MailboxTimeout { .. }) => {
230 let reason = format!(
231 "Mailbox received timeout for downgrade leader region {region_id} on datanode {:?}, elapsed: {:?}",
232 leader,
233 now.elapsed()
234 );
235 error::RetryLaterSnafu { reason }.fail()
236 }
237 Err(err) => Err(err),
238 }
239 }
240
241 async fn update_leader_region_lease_deadline(&self, ctx: &mut Context) {
242 let leader = &ctx.persistent_ctx.from_peer;
243
244 let last_connection_at = match find_datanode_lease_value(&ctx.in_memory, leader.id).await {
245 Ok(lease_value) => lease_value.map(|lease_value| lease_value.timestamp_millis),
246 Err(err) => {
247 error!(err; "Failed to find datanode lease value for datanode: {}, during region migration, region: {}", leader, ctx.persistent_ctx.region_id);
248 return;
249 }
250 };
251
252 if let Some(last_connection_at) = last_connection_at {
253 let now = current_time_millis();
254 let elapsed = now - last_connection_at;
255 let region_lease = Duration::from_secs(REGION_LEASE_SECS);
256
257 if elapsed >= (REGION_LEASE_SECS * 1000) as i64 {
263 ctx.volatile_ctx.reset_leader_region_lease_deadline();
264 info!(
265 "Datanode {}({}) has been disconnected for longer than the region lease period ({:?}), reset leader region lease deadline to None, region: {}",
266 leader, last_connection_at, region_lease, ctx.persistent_ctx.region_id
267 );
268 } else if elapsed > 0 {
269 let lease_timeout =
271 region_lease - Duration::from_millis((now - last_connection_at) as u64);
272 ctx.volatile_ctx.reset_leader_region_lease_deadline();
273 ctx.volatile_ctx
274 .set_leader_region_lease_deadline(lease_timeout);
275 info!(
276 "Datanode {}({}) last connected {:?} ago, updated leader region lease deadline to {:?}, region: {}",
277 leader,
278 last_connection_at,
279 elapsed,
280 ctx.volatile_ctx.leader_region_lease_deadline,
281 ctx.persistent_ctx.region_id
282 );
283 } else {
284 warn!(
285 "Datanode {} has invalid last connection timestamp: {} (which is after current time: {}), region: {}",
286 leader, last_connection_at, now, ctx.persistent_ctx.region_id
287 )
288 }
289 } else {
290 warn!(
291 "Failed to find last connection time for datanode {}, unable to update region lease deadline, region: {}",
292 leader, ctx.persistent_ctx.region_id
293 )
294 }
295 }
296
297 async fn downgrade_region_with_retry(&self, ctx: &mut Context) -> Result<()> {
308 let mut retry = 0;
309
310 loop {
311 let timer = Instant::now();
312 if let Err(err) = self.downgrade_region(ctx).await {
313 ctx.update_operations_elapsed(timer);
314 retry += 1;
315 if matches!(err, error::Error::ExceededDeadline { .. }) {
317 error!(err; "Failed to downgrade region leader, region: {}, exceeded deadline", ctx.persistent_ctx.region_id);
318 return Err(err);
319 } else if matches!(err, error::Error::PusherNotFound { .. }) {
320 error!(err; "Failed to downgrade region leader, region: {}, datanode({}) is unreachable(PusherNotFound)", ctx.persistent_ctx.region_id, ctx.persistent_ctx.from_peer.id);
322 self.update_leader_region_lease_deadline(ctx).await;
323 return Err(err);
324 } else if err.is_retryable() && retry < self.optimistic_retry {
325 error!(err; "Failed to downgrade region leader, region: {}, retry later", ctx.persistent_ctx.region_id);
326 sleep(self.retry_initial_interval).await;
327 } else {
328 return Err(BoxedError::new(err)).context(error::DowngradeLeaderSnafu {
329 region_id: ctx.persistent_ctx.region_id,
330 })?;
331 }
332 } else {
333 ctx.update_operations_elapsed(timer);
334 ctx.volatile_ctx.reset_leader_region_lease_deadline();
336 break;
337 }
338 }
339
340 Ok(())
341 }
342}
343
344#[cfg(test)]
345mod tests {
346 use std::assert_matches::assert_matches;
347 use std::collections::HashMap;
348
349 use common_meta::key::table_route::TableRouteValue;
350 use common_meta::key::test_utils::new_test_table_info;
351 use common_meta::peer::Peer;
352 use common_meta::rpc::router::{Region, RegionRoute};
353 use store_api::storage::RegionId;
354 use tokio::time::Instant;
355
356 use super::*;
357 use crate::error::Error;
358 use crate::procedure::region_migration::manager::RegionMigrationTriggerReason;
359 use crate::procedure::region_migration::test_util::{TestingEnv, new_procedure_context};
360 use crate::procedure::region_migration::{ContextFactory, PersistentContext};
361 use crate::procedure::test_util::{
362 new_close_region_reply, new_downgrade_region_reply, send_mock_reply,
363 };
364
365 fn new_persistent_context() -> PersistentContext {
366 PersistentContext {
367 catalog: "greptime".into(),
368 schema: "public".into(),
369 from_peer: Peer::empty(1),
370 to_peer: Peer::empty(2),
371 region_id: RegionId::new(1024, 1),
372 timeout: Duration::from_millis(1000),
373 trigger_reason: RegionMigrationTriggerReason::Manual,
374 }
375 }
376
377 async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap<u32, String>) {
378 let table_info =
379 new_test_table_info(ctx.persistent_ctx.region_id.table_id(), vec![1]).into();
380 let region_routes = vec![RegionRoute {
381 region: Region::new_test(ctx.persistent_ctx.region_id),
382 leader_peer: Some(ctx.persistent_ctx.from_peer.clone()),
383 follower_peers: vec![ctx.persistent_ctx.to_peer.clone()],
384 ..Default::default()
385 }];
386 ctx.table_metadata_manager
387 .create_table_metadata(
388 table_info,
389 TableRouteValue::physical(region_routes),
390 wal_options,
391 )
392 .await
393 .unwrap();
394 }
395
396 #[tokio::test]
397 async fn test_datanode_is_unreachable() {
398 let state = DowngradeLeaderRegion::default();
399 let persistent_context = new_persistent_context();
400 let env = TestingEnv::new();
401 let mut ctx = env.context_factory().new_context(persistent_context);
402 prepare_table_metadata(&ctx, HashMap::default()).await;
403 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
404
405 assert_matches!(err, Error::PusherNotFound { .. });
406 assert!(!err.is_retryable());
407 }
408
409 #[tokio::test]
410 async fn test_pusher_dropped() {
411 let state = DowngradeLeaderRegion::default();
412 let persistent_context = new_persistent_context();
413 let from_peer_id = persistent_context.from_peer.id;
414
415 let mut env = TestingEnv::new();
416 let mut ctx = env.context_factory().new_context(persistent_context);
417 prepare_table_metadata(&ctx, HashMap::default()).await;
418 let mailbox_ctx = env.mailbox_context();
419
420 let (tx, rx) = tokio::sync::mpsc::channel(1);
421
422 mailbox_ctx
423 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
424 .await;
425
426 drop(rx);
427
428 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
429
430 assert_matches!(err, Error::PushMessage { .. });
431 assert!(!err.is_retryable());
432 }
433
434 #[tokio::test]
435 async fn test_procedure_exceeded_deadline() {
436 let state = DowngradeLeaderRegion::default();
437 let persistent_context = new_persistent_context();
438 let env = TestingEnv::new();
439 let mut ctx = env.context_factory().new_context(persistent_context);
440 prepare_table_metadata(&ctx, HashMap::default()).await;
441 ctx.volatile_ctx.metrics.operations_elapsed =
442 ctx.persistent_ctx.timeout + Duration::from_secs(1);
443
444 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
445
446 assert_matches!(err, Error::ExceededDeadline { .. });
447 assert!(!err.is_retryable());
448
449 let err = state
450 .downgrade_region_with_retry(&mut ctx)
451 .await
452 .unwrap_err();
453 assert_matches!(err, Error::ExceededDeadline { .. });
454 assert!(!err.is_retryable());
455 }
456
457 #[tokio::test]
458 async fn test_unexpected_instruction_reply() {
459 let state = DowngradeLeaderRegion::default();
460 let persistent_context = new_persistent_context();
461 let from_peer_id = persistent_context.from_peer.id;
462
463 let mut env = TestingEnv::new();
464 let mut ctx = env.context_factory().new_context(persistent_context);
465 prepare_table_metadata(&ctx, HashMap::default()).await;
466 let mailbox_ctx = env.mailbox_context();
467 let mailbox = mailbox_ctx.mailbox().clone();
468
469 let (tx, rx) = tokio::sync::mpsc::channel(1);
470
471 mailbox_ctx
472 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
473 .await;
474
475 send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
477
478 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
479
480 assert_matches!(err, Error::UnexpectedInstructionReply { .. });
481 assert!(!err.is_retryable());
482 }
483
484 #[tokio::test]
485 async fn test_instruction_exceeded_deadline() {
486 let state = DowngradeLeaderRegion::default();
487 let persistent_context = new_persistent_context();
488 let from_peer_id = persistent_context.from_peer.id;
489
490 let mut env = TestingEnv::new();
491 let mut ctx = env.context_factory().new_context(persistent_context);
492 prepare_table_metadata(&ctx, HashMap::default()).await;
493 let mailbox_ctx = env.mailbox_context();
494 let mailbox = mailbox_ctx.mailbox().clone();
495
496 let (tx, rx) = tokio::sync::mpsc::channel(1);
497
498 mailbox_ctx
499 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
500 .await;
501
502 send_mock_reply(mailbox, rx, |id| {
503 Err(error::MailboxTimeoutSnafu { id }.build())
504 });
505
506 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
507
508 assert_matches!(err, Error::RetryLater { .. });
509 assert!(err.is_retryable());
510 }
511
512 #[tokio::test]
513 async fn test_downgrade_region_failed() {
514 let state = DowngradeLeaderRegion::default();
515 let persistent_context = new_persistent_context();
516 let from_peer_id = persistent_context.from_peer.id;
517
518 let mut env = TestingEnv::new();
519 let mut ctx = env.context_factory().new_context(persistent_context);
520 prepare_table_metadata(&ctx, HashMap::default()).await;
521 let mailbox_ctx = env.mailbox_context();
522 let mailbox = mailbox_ctx.mailbox().clone();
523
524 let (tx, rx) = tokio::sync::mpsc::channel(1);
525
526 mailbox_ctx
527 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
528 .await;
529
530 send_mock_reply(mailbox, rx, |id| {
531 Ok(new_downgrade_region_reply(
532 id,
533 None,
534 false,
535 Some("test mocked".to_string()),
536 ))
537 });
538
539 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
540
541 assert_matches!(err, Error::RetryLater { .. });
542 assert!(err.is_retryable());
543 assert!(format!("{err:?}").contains("test mocked"), "err: {err:?}",);
544 }
545
546 #[tokio::test]
547 async fn test_downgrade_region_with_retry_fast_path() {
548 let state = DowngradeLeaderRegion::default();
549 let persistent_context = new_persistent_context();
550 let from_peer_id = persistent_context.from_peer.id;
551
552 let mut env = TestingEnv::new();
553 let mut ctx = env.context_factory().new_context(persistent_context);
554 prepare_table_metadata(&ctx, HashMap::default()).await;
555 let mailbox_ctx = env.mailbox_context();
556 let mailbox = mailbox_ctx.mailbox().clone();
557
558 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
559
560 mailbox_ctx
561 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
562 .await;
563
564 common_runtime::spawn_global(async move {
565 let resp = rx.recv().await.unwrap().unwrap();
567 let reply_id = resp.mailbox_message.unwrap().id;
568 mailbox
569 .on_recv(
570 reply_id,
571 Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
572 )
573 .await
574 .unwrap();
575
576 let resp = rx.recv().await.unwrap().unwrap();
578 let reply_id = resp.mailbox_message.unwrap().id;
579 mailbox
580 .on_recv(
581 reply_id,
582 Ok(new_downgrade_region_reply(reply_id, Some(1), true, None)),
583 )
584 .await
585 .unwrap();
586 });
587
588 state.downgrade_region_with_retry(&mut ctx).await.unwrap();
589 assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, Some(1));
590 assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none());
591 }
592
593 #[tokio::test]
594 async fn test_downgrade_region_with_retry_slow_path() {
595 let state = DowngradeLeaderRegion {
596 optimistic_retry: 3,
597 retry_initial_interval: Duration::from_millis(100),
598 };
599 let persistent_context = new_persistent_context();
600 let from_peer_id = persistent_context.from_peer.id;
601
602 let mut env = TestingEnv::new();
603 let mut ctx = env.context_factory().new_context(persistent_context);
604 let mailbox_ctx = env.mailbox_context();
605 let mailbox = mailbox_ctx.mailbox().clone();
606
607 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
608
609 mailbox_ctx
610 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
611 .await;
612
613 common_runtime::spawn_global(async move {
614 for _ in 0..3 {
615 let resp = rx.recv().await.unwrap().unwrap();
616 let reply_id = resp.mailbox_message.unwrap().id;
617 mailbox
618 .on_recv(
619 reply_id,
620 Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
621 )
622 .await
623 .unwrap();
624 }
625 });
626
627 ctx.volatile_ctx
628 .set_leader_region_lease_deadline(Duration::from_secs(5));
629 let expected_deadline = ctx.volatile_ctx.leader_region_lease_deadline.unwrap();
630 let err = state
631 .downgrade_region_with_retry(&mut ctx)
632 .await
633 .unwrap_err();
634 assert_matches!(err, error::Error::DowngradeLeader { .. });
635 assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, None);
636 assert_eq!(
638 ctx.volatile_ctx.leader_region_lease_deadline.unwrap(),
639 expected_deadline
640 )
641 }
642
643 #[tokio::test]
644 async fn test_next_upgrade_candidate_state() {
645 let mut state = Box::<DowngradeLeaderRegion>::default();
646 let persistent_context = new_persistent_context();
647 let from_peer_id = persistent_context.from_peer.id;
648
649 let mut env = TestingEnv::new();
650 let mut ctx = env.context_factory().new_context(persistent_context);
651 prepare_table_metadata(&ctx, HashMap::default()).await;
652 let mailbox_ctx = env.mailbox_context();
653 let mailbox = mailbox_ctx.mailbox().clone();
654
655 let (tx, rx) = tokio::sync::mpsc::channel(1);
656
657 mailbox_ctx
658 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
659 .await;
660
661 send_mock_reply(mailbox, rx, |id| {
662 Ok(new_downgrade_region_reply(id, Some(1), true, None))
663 });
664
665 let timer = Instant::now();
666 let procedure_ctx = new_procedure_context();
667 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
668 let elapsed = timer.elapsed().as_secs();
669 assert!(elapsed < REGION_LEASE_SECS / 2);
670 assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, Some(1));
671 assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none());
672
673 let _ = next
674 .as_any()
675 .downcast_ref::<UpgradeCandidateRegion>()
676 .unwrap();
677 }
678
679 #[tokio::test]
680 async fn test_downgrade_region_procedure_exceeded_deadline() {
681 let mut state = Box::<UpgradeCandidateRegion>::default();
682 state.retry_initial_interval = Duration::from_millis(100);
683 let persistent_context = new_persistent_context();
684 let to_peer_id = persistent_context.to_peer.id;
685
686 let mut env = TestingEnv::new();
687 let mut ctx = env.context_factory().new_context(persistent_context);
688 let mailbox_ctx = env.mailbox_context();
689 let mailbox = mailbox_ctx.mailbox().clone();
690 ctx.volatile_ctx.metrics.operations_elapsed =
691 ctx.persistent_ctx.timeout + Duration::from_secs(1);
692
693 let (tx, rx) = tokio::sync::mpsc::channel(1);
694 mailbox_ctx
695 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
696 .await;
697
698 send_mock_reply(mailbox, rx, |id| {
699 Ok(new_downgrade_region_reply(id, None, true, None))
700 });
701 let procedure_ctx = new_procedure_context();
702 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
703 let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
704 assert_matches!(update_metadata, UpdateMetadata::Rollback);
705 }
706}