1use std::any::Any;
16use std::time::Duration;
17
18use api::v1::meta::MailboxMessage;
19use common_error::ext::BoxedError;
20use common_meta::distributed_time_constants::REGION_LEASE_SECS;
21use common_meta::instruction::{
22 DowngradeRegion, DowngradeRegionReply, Instruction, InstructionReply,
23};
24use common_procedure::{Context as ProcedureContext, Status};
25use common_telemetry::{error, info, warn};
26use common_time::util::current_time_millis;
27use serde::{Deserialize, Serialize};
28use snafu::{OptionExt, ResultExt};
29use tokio::time::{sleep, Instant};
30
31use crate::error::{self, Result};
32use crate::handler::HeartbeatMailbox;
33use crate::lease::find_datanode_lease_value;
34use crate::procedure::region_migration::update_metadata::UpdateMetadata;
35use crate::procedure::region_migration::upgrade_candidate_region::UpgradeCandidateRegion;
36use crate::procedure::region_migration::{Context, State};
37use crate::service::mailbox::Channel;
38
39#[derive(Debug, Serialize, Deserialize)]
40pub struct DowngradeLeaderRegion {
41 optimistic_retry: usize,
43 retry_initial_interval: Duration,
45}
46
47impl Default for DowngradeLeaderRegion {
48 fn default() -> Self {
49 Self {
50 optimistic_retry: 3,
51 retry_initial_interval: Duration::from_millis(500),
52 }
53 }
54}
55
56#[async_trait::async_trait]
57#[typetag::serde]
58impl State for DowngradeLeaderRegion {
59 async fn next(
60 &mut self,
61 ctx: &mut Context,
62 _procedure_ctx: &ProcedureContext,
63 ) -> Result<(Box<dyn State>, Status)> {
64 let now = Instant::now();
65 ctx.volatile_ctx
67 .set_leader_region_lease_deadline(Duration::from_secs(REGION_LEASE_SECS));
68
69 match self.downgrade_region_with_retry(ctx).await {
70 Ok(_) => {
71 info!(
73 "Downgraded region leader success, region: {}",
74 ctx.persistent_ctx.region_id
75 );
76 }
77 Err(error::Error::ExceededDeadline { .. }) => {
78 info!(
79 "Downgrade region leader exceeded deadline, region: {}",
80 ctx.persistent_ctx.region_id
81 );
82 return Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false)));
84 }
85 Err(err) => {
86 error!(err; "Occurs non-retryable error, region: {}", ctx.persistent_ctx.region_id);
87 if let Some(deadline) = ctx.volatile_ctx.leader_region_lease_deadline.as_ref() {
88 info!(
89 "Running into the downgrade region leader slow path, region: {}, sleep until {:?}",
90 ctx.persistent_ctx.region_id, deadline
91 );
92 tokio::time::sleep_until(*deadline).await;
93 } else {
94 warn!(
95 "Leader region lease deadline is not set, region: {}",
96 ctx.persistent_ctx.region_id
97 );
98 }
99 }
100 }
101 ctx.update_downgrade_leader_region_elapsed(now);
102
103 Ok((
104 Box::new(UpgradeCandidateRegion::default()),
105 Status::executing(false),
106 ))
107 }
108
109 fn as_any(&self) -> &dyn Any {
110 self
111 }
112}
113
114impl DowngradeLeaderRegion {
115 fn build_downgrade_region_instruction(
117 &self,
118 ctx: &Context,
119 flush_timeout: Duration,
120 ) -> Instruction {
121 let pc = &ctx.persistent_ctx;
122 let region_id = pc.region_id;
123 Instruction::DowngradeRegion(DowngradeRegion {
124 region_id,
125 flush_timeout: Some(flush_timeout),
126 })
127 }
128
129 async fn downgrade_region(&self, ctx: &mut Context) -> Result<()> {
143 let region_id = ctx.persistent_ctx.region_id;
144 let operation_timeout =
145 ctx.next_operation_timeout()
146 .context(error::ExceededDeadlineSnafu {
147 operation: "Downgrade region",
148 })?;
149 let downgrade_instruction = self.build_downgrade_region_instruction(ctx, operation_timeout);
150
151 let leader = &ctx.persistent_ctx.from_peer;
152 let msg = MailboxMessage::json_message(
153 &format!("Downgrade leader region: {}", region_id),
154 &format!("Metasrv@{}", ctx.server_addr()),
155 &format!("Datanode-{}@{}", leader.id, leader.addr),
156 common_time::util::current_time_millis(),
157 &downgrade_instruction,
158 )
159 .with_context(|_| error::SerializeToJsonSnafu {
160 input: downgrade_instruction.to_string(),
161 })?;
162
163 let ch = Channel::Datanode(leader.id);
164 let now = Instant::now();
165 let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
166
167 match receiver.await {
168 Ok(msg) => {
169 let reply = HeartbeatMailbox::json_reply(&msg)?;
170 info!(
171 "Received downgrade region reply: {:?}, region: {}, elapsed: {:?}",
172 reply,
173 region_id,
174 now.elapsed()
175 );
176 let InstructionReply::DowngradeRegion(DowngradeRegionReply {
177 last_entry_id,
178 metadata_last_entry_id,
179 exists,
180 error,
181 }) = reply
182 else {
183 return error::UnexpectedInstructionReplySnafu {
184 mailbox_message: msg.to_string(),
185 reason: "expect downgrade region reply",
186 }
187 .fail();
188 };
189
190 if error.is_some() {
191 return error::RetryLaterSnafu {
192 reason: format!(
193 "Failed to downgrade the region {} on datanode {:?}, error: {:?}, elapsed: {:?}",
194 region_id, leader, error, now.elapsed()
195 ),
196 }
197 .fail();
198 }
199
200 if !exists {
201 warn!(
202 "Trying to downgrade the region {} on datanode {:?}, but region doesn't exist!, elapsed: {:?}",
203 region_id, leader, now.elapsed()
204 );
205 } else {
206 info!(
207 "Region {} leader is downgraded on datanode {:?}, last_entry_id: {:?}, metadata_last_entry_id: {:?}, elapsed: {:?}",
208 region_id,
209 leader,
210 last_entry_id,
211 metadata_last_entry_id,
212 now.elapsed()
213 );
214 }
215
216 if let Some(last_entry_id) = last_entry_id {
217 ctx.volatile_ctx.set_last_entry_id(last_entry_id);
218 }
219
220 if let Some(metadata_last_entry_id) = metadata_last_entry_id {
221 ctx.volatile_ctx
222 .set_metadata_last_entry_id(metadata_last_entry_id);
223 }
224
225 Ok(())
226 }
227 Err(error::Error::MailboxTimeout { .. }) => {
228 let reason = format!(
229 "Mailbox received timeout for downgrade leader region {region_id} on datanode {:?}, elapsed: {:?}",
230 leader,
231 now.elapsed()
232 );
233 error::RetryLaterSnafu { reason }.fail()
234 }
235 Err(err) => Err(err),
236 }
237 }
238
239 async fn update_leader_region_lease_deadline(&self, ctx: &mut Context) {
240 let leader = &ctx.persistent_ctx.from_peer;
241
242 let last_connection_at = match find_datanode_lease_value(leader.id, &ctx.in_memory).await {
243 Ok(lease_value) => lease_value.map(|lease_value| lease_value.timestamp_millis),
244 Err(err) => {
245 error!(err; "Failed to find datanode lease value for datanode: {}, during region migration, region: {}", leader, ctx.persistent_ctx.region_id);
246 return;
247 }
248 };
249
250 if let Some(last_connection_at) = last_connection_at {
251 let now = current_time_millis();
252 let elapsed = now - last_connection_at;
253 let region_lease = Duration::from_secs(REGION_LEASE_SECS);
254
255 if elapsed >= (REGION_LEASE_SECS * 1000) as i64 {
261 ctx.volatile_ctx.reset_leader_region_lease_deadline();
262 info!(
263 "Datanode {}({}) has been disconnected for longer than the region lease period ({:?}), reset leader region lease deadline to None, region: {}",
264 leader,
265 last_connection_at,
266 region_lease,
267 ctx.persistent_ctx.region_id
268 );
269 } else if elapsed > 0 {
270 let lease_timeout =
272 region_lease - Duration::from_millis((now - last_connection_at) as u64);
273 ctx.volatile_ctx.reset_leader_region_lease_deadline();
274 ctx.volatile_ctx
275 .set_leader_region_lease_deadline(lease_timeout);
276 info!(
277 "Datanode {}({}) last connected {:?} ago, updated leader region lease deadline to {:?}, region: {}",
278 leader, last_connection_at, elapsed, ctx.volatile_ctx.leader_region_lease_deadline, ctx.persistent_ctx.region_id
279 );
280 } else {
281 warn!(
282 "Datanode {} has invalid last connection timestamp: {} (which is after current time: {}), region: {}",
283 leader, last_connection_at, now, ctx.persistent_ctx.region_id
284 )
285 }
286 } else {
287 warn!(
288 "Failed to find last connection time for datanode {}, unable to update region lease deadline, region: {}",
289 leader, ctx.persistent_ctx.region_id
290 )
291 }
292 }
293
294 async fn downgrade_region_with_retry(&self, ctx: &mut Context) -> Result<()> {
305 let mut retry = 0;
306
307 loop {
308 let timer = Instant::now();
309 if let Err(err) = self.downgrade_region(ctx).await {
310 ctx.update_operations_elapsed(timer);
311 retry += 1;
312 if matches!(err, error::Error::ExceededDeadline { .. }) {
314 error!(err; "Failed to downgrade region leader, region: {}, exceeded deadline", ctx.persistent_ctx.region_id);
315 return Err(err);
316 } else if matches!(err, error::Error::PusherNotFound { .. }) {
317 error!(err; "Failed to downgrade region leader, region: {}, datanode({}) is unreachable(PusherNotFound)", ctx.persistent_ctx.region_id, ctx.persistent_ctx.from_peer.id);
319 self.update_leader_region_lease_deadline(ctx).await;
320 return Err(err);
321 } else if err.is_retryable() && retry < self.optimistic_retry {
322 error!(err; "Failed to downgrade region leader, region: {}, retry later", ctx.persistent_ctx.region_id);
323 sleep(self.retry_initial_interval).await;
324 } else {
325 return Err(BoxedError::new(err)).context(error::DowngradeLeaderSnafu {
326 region_id: ctx.persistent_ctx.region_id,
327 })?;
328 }
329 } else {
330 ctx.update_operations_elapsed(timer);
331 ctx.volatile_ctx.reset_leader_region_lease_deadline();
333 break;
334 }
335 }
336
337 Ok(())
338 }
339}
340
341#[cfg(test)]
342mod tests {
343 use std::assert_matches::assert_matches;
344 use std::collections::HashMap;
345
346 use common_meta::key::table_route::TableRouteValue;
347 use common_meta::key::test_utils::new_test_table_info;
348 use common_meta::peer::Peer;
349 use common_meta::rpc::router::{Region, RegionRoute};
350 use store_api::storage::RegionId;
351 use tokio::time::Instant;
352
353 use super::*;
354 use crate::error::Error;
355 use crate::procedure::region_migration::test_util::{new_procedure_context, TestingEnv};
356 use crate::procedure::region_migration::{ContextFactory, PersistentContext};
357 use crate::procedure::test_util::{
358 new_close_region_reply, new_downgrade_region_reply, send_mock_reply,
359 };
360
361 fn new_persistent_context() -> PersistentContext {
362 PersistentContext {
363 catalog: "greptime".into(),
364 schema: "public".into(),
365 from_peer: Peer::empty(1),
366 to_peer: Peer::empty(2),
367 region_id: RegionId::new(1024, 1),
368 timeout: Duration::from_millis(1000),
369 }
370 }
371
372 async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap<u32, String>) {
373 let table_info =
374 new_test_table_info(ctx.persistent_ctx.region_id.table_id(), vec![1]).into();
375 let region_routes = vec![RegionRoute {
376 region: Region::new_test(ctx.persistent_ctx.region_id),
377 leader_peer: Some(ctx.persistent_ctx.from_peer.clone()),
378 follower_peers: vec![ctx.persistent_ctx.to_peer.clone()],
379 ..Default::default()
380 }];
381 ctx.table_metadata_manager
382 .create_table_metadata(
383 table_info,
384 TableRouteValue::physical(region_routes),
385 wal_options,
386 )
387 .await
388 .unwrap();
389 }
390
391 #[tokio::test]
392 async fn test_datanode_is_unreachable() {
393 let state = DowngradeLeaderRegion::default();
394 let persistent_context = new_persistent_context();
395 let env = TestingEnv::new();
396 let mut ctx = env.context_factory().new_context(persistent_context);
397 prepare_table_metadata(&ctx, HashMap::default()).await;
398 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
399
400 assert_matches!(err, Error::PusherNotFound { .. });
401 assert!(!err.is_retryable());
402 }
403
404 #[tokio::test]
405 async fn test_pusher_dropped() {
406 let state = DowngradeLeaderRegion::default();
407 let persistent_context = new_persistent_context();
408 let from_peer_id = persistent_context.from_peer.id;
409
410 let mut env = TestingEnv::new();
411 let mut ctx = env.context_factory().new_context(persistent_context);
412 prepare_table_metadata(&ctx, HashMap::default()).await;
413 let mailbox_ctx = env.mailbox_context();
414
415 let (tx, rx) = tokio::sync::mpsc::channel(1);
416
417 mailbox_ctx
418 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
419 .await;
420
421 drop(rx);
422
423 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
424
425 assert_matches!(err, Error::PushMessage { .. });
426 assert!(!err.is_retryable());
427 }
428
429 #[tokio::test]
430 async fn test_procedure_exceeded_deadline() {
431 let state = DowngradeLeaderRegion::default();
432 let persistent_context = new_persistent_context();
433 let env = TestingEnv::new();
434 let mut ctx = env.context_factory().new_context(persistent_context);
435 prepare_table_metadata(&ctx, HashMap::default()).await;
436 ctx.volatile_ctx.metrics.operations_elapsed =
437 ctx.persistent_ctx.timeout + Duration::from_secs(1);
438
439 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
440
441 assert_matches!(err, Error::ExceededDeadline { .. });
442 assert!(!err.is_retryable());
443
444 let err = state
445 .downgrade_region_with_retry(&mut ctx)
446 .await
447 .unwrap_err();
448 assert_matches!(err, Error::ExceededDeadline { .. });
449 assert!(!err.is_retryable());
450 }
451
452 #[tokio::test]
453 async fn test_unexpected_instruction_reply() {
454 let state = DowngradeLeaderRegion::default();
455 let persistent_context = new_persistent_context();
456 let from_peer_id = persistent_context.from_peer.id;
457
458 let mut env = TestingEnv::new();
459 let mut ctx = env.context_factory().new_context(persistent_context);
460 prepare_table_metadata(&ctx, HashMap::default()).await;
461 let mailbox_ctx = env.mailbox_context();
462 let mailbox = mailbox_ctx.mailbox().clone();
463
464 let (tx, rx) = tokio::sync::mpsc::channel(1);
465
466 mailbox_ctx
467 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
468 .await;
469
470 send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
472
473 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
474
475 assert_matches!(err, Error::UnexpectedInstructionReply { .. });
476 assert!(!err.is_retryable());
477 }
478
479 #[tokio::test]
480 async fn test_instruction_exceeded_deadline() {
481 let state = DowngradeLeaderRegion::default();
482 let persistent_context = new_persistent_context();
483 let from_peer_id = persistent_context.from_peer.id;
484
485 let mut env = TestingEnv::new();
486 let mut ctx = env.context_factory().new_context(persistent_context);
487 prepare_table_metadata(&ctx, HashMap::default()).await;
488 let mailbox_ctx = env.mailbox_context();
489 let mailbox = mailbox_ctx.mailbox().clone();
490
491 let (tx, rx) = tokio::sync::mpsc::channel(1);
492
493 mailbox_ctx
494 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
495 .await;
496
497 send_mock_reply(mailbox, rx, |id| {
498 Err(error::MailboxTimeoutSnafu { id }.build())
499 });
500
501 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
502
503 assert_matches!(err, Error::RetryLater { .. });
504 assert!(err.is_retryable());
505 }
506
507 #[tokio::test]
508 async fn test_downgrade_region_failed() {
509 let state = DowngradeLeaderRegion::default();
510 let persistent_context = new_persistent_context();
511 let from_peer_id = persistent_context.from_peer.id;
512
513 let mut env = TestingEnv::new();
514 let mut ctx = env.context_factory().new_context(persistent_context);
515 prepare_table_metadata(&ctx, HashMap::default()).await;
516 let mailbox_ctx = env.mailbox_context();
517 let mailbox = mailbox_ctx.mailbox().clone();
518
519 let (tx, rx) = tokio::sync::mpsc::channel(1);
520
521 mailbox_ctx
522 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
523 .await;
524
525 send_mock_reply(mailbox, rx, |id| {
526 Ok(new_downgrade_region_reply(
527 id,
528 None,
529 false,
530 Some("test mocked".to_string()),
531 ))
532 });
533
534 let err = state.downgrade_region(&mut ctx).await.unwrap_err();
535
536 assert_matches!(err, Error::RetryLater { .. });
537 assert!(err.is_retryable());
538 assert!(format!("{err:?}").contains("test mocked"), "err: {err:?}",);
539 }
540
541 #[tokio::test]
542 async fn test_downgrade_region_with_retry_fast_path() {
543 let state = DowngradeLeaderRegion::default();
544 let persistent_context = new_persistent_context();
545 let from_peer_id = persistent_context.from_peer.id;
546
547 let mut env = TestingEnv::new();
548 let mut ctx = env.context_factory().new_context(persistent_context);
549 prepare_table_metadata(&ctx, HashMap::default()).await;
550 let mailbox_ctx = env.mailbox_context();
551 let mailbox = mailbox_ctx.mailbox().clone();
552
553 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
554
555 mailbox_ctx
556 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
557 .await;
558
559 common_runtime::spawn_global(async move {
560 let resp = rx.recv().await.unwrap().unwrap();
562 let reply_id = resp.mailbox_message.unwrap().id;
563 mailbox
564 .on_recv(
565 reply_id,
566 Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
567 )
568 .await
569 .unwrap();
570
571 let resp = rx.recv().await.unwrap().unwrap();
573 let reply_id = resp.mailbox_message.unwrap().id;
574 mailbox
575 .on_recv(
576 reply_id,
577 Ok(new_downgrade_region_reply(reply_id, Some(1), true, None)),
578 )
579 .await
580 .unwrap();
581 });
582
583 state.downgrade_region_with_retry(&mut ctx).await.unwrap();
584 assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, Some(1));
585 assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none());
586 }
587
588 #[tokio::test]
589 async fn test_downgrade_region_with_retry_slow_path() {
590 let state = DowngradeLeaderRegion {
591 optimistic_retry: 3,
592 retry_initial_interval: Duration::from_millis(100),
593 };
594 let persistent_context = new_persistent_context();
595 let from_peer_id = persistent_context.from_peer.id;
596
597 let mut env = TestingEnv::new();
598 let mut ctx = env.context_factory().new_context(persistent_context);
599 let mailbox_ctx = env.mailbox_context();
600 let mailbox = mailbox_ctx.mailbox().clone();
601
602 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
603
604 mailbox_ctx
605 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
606 .await;
607
608 common_runtime::spawn_global(async move {
609 for _ in 0..3 {
610 let resp = rx.recv().await.unwrap().unwrap();
611 let reply_id = resp.mailbox_message.unwrap().id;
612 mailbox
613 .on_recv(
614 reply_id,
615 Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
616 )
617 .await
618 .unwrap();
619 }
620 });
621
622 ctx.volatile_ctx
623 .set_leader_region_lease_deadline(Duration::from_secs(5));
624 let expected_deadline = ctx.volatile_ctx.leader_region_lease_deadline.unwrap();
625 let err = state
626 .downgrade_region_with_retry(&mut ctx)
627 .await
628 .unwrap_err();
629 assert_matches!(err, error::Error::DowngradeLeader { .. });
630 assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, None);
631 assert_eq!(
633 ctx.volatile_ctx.leader_region_lease_deadline.unwrap(),
634 expected_deadline
635 )
636 }
637
638 #[tokio::test]
639 async fn test_next_upgrade_candidate_state() {
640 let mut state = Box::<DowngradeLeaderRegion>::default();
641 let persistent_context = new_persistent_context();
642 let from_peer_id = persistent_context.from_peer.id;
643
644 let mut env = TestingEnv::new();
645 let mut ctx = env.context_factory().new_context(persistent_context);
646 prepare_table_metadata(&ctx, HashMap::default()).await;
647 let mailbox_ctx = env.mailbox_context();
648 let mailbox = mailbox_ctx.mailbox().clone();
649
650 let (tx, rx) = tokio::sync::mpsc::channel(1);
651
652 mailbox_ctx
653 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
654 .await;
655
656 send_mock_reply(mailbox, rx, |id| {
657 Ok(new_downgrade_region_reply(id, Some(1), true, None))
658 });
659
660 let timer = Instant::now();
661 let procedure_ctx = new_procedure_context();
662 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
663 let elapsed = timer.elapsed().as_secs();
664 assert!(elapsed < REGION_LEASE_SECS / 2);
665 assert_eq!(ctx.volatile_ctx.leader_region_last_entry_id, Some(1));
666 assert!(ctx.volatile_ctx.leader_region_lease_deadline.is_none());
667
668 let _ = next
669 .as_any()
670 .downcast_ref::<UpgradeCandidateRegion>()
671 .unwrap();
672 }
673
674 #[tokio::test]
675 async fn test_downgrade_region_procedure_exceeded_deadline() {
676 let mut state = Box::<UpgradeCandidateRegion>::default();
677 state.retry_initial_interval = Duration::from_millis(100);
678 let persistent_context = new_persistent_context();
679 let to_peer_id = persistent_context.to_peer.id;
680
681 let mut env = TestingEnv::new();
682 let mut ctx = env.context_factory().new_context(persistent_context);
683 let mailbox_ctx = env.mailbox_context();
684 let mailbox = mailbox_ctx.mailbox().clone();
685 ctx.volatile_ctx.metrics.operations_elapsed =
686 ctx.persistent_ctx.timeout + Duration::from_secs(1);
687
688 let (tx, rx) = tokio::sync::mpsc::channel(1);
689 mailbox_ctx
690 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
691 .await;
692
693 send_mock_reply(mailbox, rx, |id| {
694 Ok(new_downgrade_region_reply(id, None, true, None))
695 });
696 let procedure_ctx = new_procedure_context();
697 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
698 let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
699 assert_matches!(update_metadata, UpdateMetadata::Rollback);
700 }
701}