1use std::any::Any;
16use std::time::Duration;
17
18use api::v1::meta::MailboxMessage;
19use common_meta::ddl::utils::parse_region_wal_options;
20use common_meta::instruction::{
21 Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply, UpgradeRegionsReply,
22};
23use common_meta::lock_key::RemoteWalLock;
24use common_meta::wal_options_allocator::extract_topic_from_wal_options;
25use common_procedure::{Context as ProcedureContext, Status};
26use common_telemetry::{error, warn};
27use common_wal::options::WalOptions;
28use serde::{Deserialize, Serialize};
29use snafu::{OptionExt, ResultExt, ensure};
30use tokio::time::{Instant, sleep};
31
32use crate::error::{self, Result};
33use crate::handler::HeartbeatMailbox;
34use crate::procedure::region_migration::update_metadata::UpdateMetadata;
35use crate::procedure::region_migration::{Context, State};
36use crate::service::mailbox::Channel;
37
38#[derive(Debug, Serialize, Deserialize)]
39pub struct UpgradeCandidateRegion {
40 pub(crate) optimistic_retry: usize,
42 pub(crate) retry_initial_interval: Duration,
44 pub(crate) require_ready: bool,
47}
48
49impl Default for UpgradeCandidateRegion {
50 fn default() -> Self {
51 Self {
52 optimistic_retry: 3,
53 retry_initial_interval: Duration::from_millis(500),
54 require_ready: true,
55 }
56 }
57}
58
59#[async_trait::async_trait]
60#[typetag::serde]
61impl State for UpgradeCandidateRegion {
62 async fn next(
63 &mut self,
64 ctx: &mut Context,
65 procedure_ctx: &ProcedureContext,
66 ) -> Result<(Box<dyn State>, Status)> {
67 let now = Instant::now();
68
69 let region_wal_option = self.get_region_wal_option(ctx).await?;
70 let region_id = ctx.persistent_ctx.region_id;
71 if region_wal_option.is_none() {
72 warn!(
73 "Region {} wal options not found, during upgrade candidate region",
74 region_id
75 );
76 }
77
78 if self
79 .upgrade_region_with_retry(ctx, procedure_ctx, region_wal_option.as_ref())
80 .await
81 {
82 ctx.update_upgrade_candidate_region_elapsed(now);
83 Ok((Box::new(UpdateMetadata::Upgrade), Status::executing(false)))
84 } else {
85 ctx.update_upgrade_candidate_region_elapsed(now);
86 Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false)))
87 }
88 }
89
90 fn as_any(&self) -> &dyn Any {
91 self
92 }
93}
94
95impl UpgradeCandidateRegion {
96 async fn get_region_wal_option(&self, ctx: &mut Context) -> Result<Option<WalOptions>> {
97 let region_id = ctx.persistent_ctx.region_id;
98 match ctx.get_from_peer_datanode_table_value().await {
99 Ok(datanode_table_value) => {
100 let region_wal_options =
101 parse_region_wal_options(&datanode_table_value.region_info.region_wal_options)
102 .context(error::ParseWalOptionsSnafu)?;
103 Ok(region_wal_options.get(®ion_id.region_number()).cloned())
104 }
105 Err(error::Error::DatanodeTableNotFound { datanode_id, .. }) => {
106 warn!(
107 "Datanode table not found, during upgrade candidate region, the target region might already been migrated, region_id: {}, datanode_id: {}",
108 region_id, datanode_id
109 );
110 Ok(None)
111 }
112 Err(e) => Err(e),
113 }
114 }
115
116 async fn build_upgrade_region_instruction(
118 &self,
119 ctx: &mut Context,
120 replay_timeout: Duration,
121 ) -> Result<Instruction> {
122 let pc = &ctx.persistent_ctx;
123 let region_id = pc.region_id;
124 let last_entry_id = ctx.volatile_ctx.leader_region_last_entry_id;
125 let metadata_last_entry_id = ctx.volatile_ctx.leader_region_metadata_last_entry_id;
126 let datanode_table_value = ctx.get_from_peer_datanode_table_value().await.ok();
128 let checkpoint = if let Some(topic) = datanode_table_value.as_ref().and_then(|v| {
129 extract_topic_from_wal_options(region_id, &v.region_info.region_wal_options)
130 }) {
131 ctx.fetch_replay_checkpoint(&topic).await.ok().flatten()
132 } else {
133 None
134 };
135
136 let upgrade_instruction = Instruction::UpgradeRegions(vec![
137 UpgradeRegion {
138 region_id,
139 last_entry_id,
140 metadata_last_entry_id,
141 replay_timeout,
142 location_id: Some(ctx.persistent_ctx.from_peer.id),
143 replay_entry_id: None,
144 metadata_replay_entry_id: None,
145 }
146 .with_replay_entry_id(checkpoint.map(|c| c.entry_id))
147 .with_metadata_replay_entry_id(checkpoint.and_then(|c| c.metadata_entry_id)),
148 ]);
149
150 Ok(upgrade_instruction)
151 }
152
153 async fn upgrade_region(&self, ctx: &mut Context) -> Result<()> {
168 let operation_timeout =
169 ctx.next_operation_timeout()
170 .context(error::ExceededDeadlineSnafu {
171 operation: "Upgrade region",
172 })?;
173 let upgrade_instruction = self
174 .build_upgrade_region_instruction(ctx, operation_timeout)
175 .await?;
176
177 let pc = &ctx.persistent_ctx;
178 let region_id = pc.region_id;
179 let candidate = &pc.to_peer;
180
181 let msg = MailboxMessage::json_message(
182 &format!("Upgrade candidate region: {}", region_id),
183 &format!("Metasrv@{}", ctx.server_addr()),
184 &format!("Datanode-{}@{}", candidate.id, candidate.addr),
185 common_time::util::current_time_millis(),
186 &upgrade_instruction,
187 )
188 .with_context(|_| error::SerializeToJsonSnafu {
189 input: upgrade_instruction.to_string(),
190 })?;
191
192 let ch = Channel::Datanode(candidate.id);
193 let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
194
195 match receiver.await {
196 Ok(msg) => {
197 let reply = HeartbeatMailbox::json_reply(&msg)?;
198 let InstructionReply::UpgradeRegions(UpgradeRegionsReply { replies }) = reply
199 else {
200 return error::UnexpectedInstructionReplySnafu {
201 mailbox_message: msg.to_string(),
202 reason: "Unexpected reply of the upgrade region instruction",
203 }
204 .fail();
205 };
206 let UpgradeRegionReply {
208 ready,
209 exists,
210 error,
211 ..
212 } = &replies[0];
213
214 if error.is_some() {
216 return error::RetryLaterSnafu {
217 reason: format!(
218 "Failed to upgrade the region {} on datanode {:?}, error: {:?}",
219 region_id, candidate, error
220 ),
221 }
222 .fail();
223 }
224
225 ensure!(
226 exists,
227 error::UnexpectedSnafu {
228 violated: format!(
229 "Candidate region {} doesn't exist on datanode {:?}",
230 region_id, candidate
231 )
232 }
233 );
234
235 if self.require_ready && !ready {
236 return error::RetryLaterSnafu {
237 reason: format!(
238 "Candidate region {} still replaying the wal on datanode {:?}",
239 region_id, candidate
240 ),
241 }
242 .fail();
243 }
244
245 Ok(())
246 }
247 Err(error::Error::MailboxTimeout { .. }) => {
248 let reason = format!(
249 "Mailbox received timeout for upgrade candidate region {region_id} on datanode {:?}",
250 candidate,
251 );
252 error::RetryLaterSnafu { reason }.fail()
253 }
254 Err(err) => Err(err),
255 }
256 }
257
258 async fn upgrade_region_with_retry(
262 &self,
263 ctx: &mut Context,
264 procedure_ctx: &ProcedureContext,
265 wal_options: Option<&WalOptions>,
266 ) -> bool {
267 let mut retry = 0;
268 let mut upgraded = false;
269
270 loop {
271 let timer = Instant::now();
272 let _guard = if let Some(WalOptions::Kafka(kafka_wal_options)) = wal_options {
274 Some(
275 procedure_ctx
276 .provider
277 .acquire_lock(
278 &(RemoteWalLock::Read(kafka_wal_options.topic.clone()).into()),
279 )
280 .await,
281 )
282 } else {
283 None
284 };
285 if let Err(err) = self.upgrade_region(ctx).await {
286 retry += 1;
287 ctx.update_operations_elapsed(timer);
288 if matches!(err, error::Error::ExceededDeadline { .. }) {
289 error!("Failed to upgrade region, exceeded deadline");
290 break;
291 } else if err.is_retryable() && retry < self.optimistic_retry {
292 error!("Failed to upgrade region, error: {err:?}, retry later");
293 sleep(self.retry_initial_interval).await;
294 } else {
295 error!("Failed to upgrade region, error: {err:?}");
296 break;
297 }
298 } else {
299 ctx.update_operations_elapsed(timer);
300 upgraded = true;
301 break;
302 }
303 }
304
305 upgraded
306 }
307}
308
309#[cfg(test)]
310mod tests {
311 use std::assert_matches::assert_matches;
312 use std::collections::HashMap;
313
314 use common_meta::key::table_route::TableRouteValue;
315 use common_meta::key::test_utils::new_test_table_info;
316 use common_meta::peer::Peer;
317 use common_meta::rpc::router::{Region, RegionRoute};
318 use store_api::storage::RegionId;
319
320 use super::*;
321 use crate::error::Error;
322 use crate::procedure::region_migration::manager::RegionMigrationTriggerReason;
323 use crate::procedure::region_migration::test_util::{TestingEnv, new_procedure_context};
324 use crate::procedure::region_migration::{ContextFactory, PersistentContext};
325 use crate::procedure::test_util::{
326 new_close_region_reply, new_upgrade_region_reply, send_mock_reply,
327 };
328
329 fn new_persistent_context() -> PersistentContext {
330 PersistentContext {
331 catalog: "greptime".into(),
332 schema: "public".into(),
333 from_peer: Peer::empty(1),
334 to_peer: Peer::empty(2),
335 region_id: RegionId::new(1024, 1),
336 timeout: Duration::from_millis(1000),
337 trigger_reason: RegionMigrationTriggerReason::Manual,
338 }
339 }
340
341 async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap<u32, String>) {
342 let table_info =
343 new_test_table_info(ctx.persistent_ctx.region_id.table_id(), vec![1]).into();
344 let region_routes = vec![RegionRoute {
345 region: Region::new_test(ctx.persistent_ctx.region_id),
346 leader_peer: Some(ctx.persistent_ctx.from_peer.clone()),
347 follower_peers: vec![ctx.persistent_ctx.to_peer.clone()],
348 ..Default::default()
349 }];
350 ctx.table_metadata_manager
351 .create_table_metadata(
352 table_info,
353 TableRouteValue::physical(region_routes),
354 wal_options,
355 )
356 .await
357 .unwrap();
358 }
359
360 #[tokio::test]
361 async fn test_datanode_is_unreachable() {
362 let state = UpgradeCandidateRegion::default();
363 let persistent_context = new_persistent_context();
364 let env = TestingEnv::new();
365 let mut ctx = env.context_factory().new_context(persistent_context);
366 prepare_table_metadata(&ctx, HashMap::default()).await;
367 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
368
369 assert_matches!(err, Error::PusherNotFound { .. });
370 assert!(!err.is_retryable());
371 }
372
373 #[tokio::test]
374 async fn test_pusher_dropped() {
375 let state = UpgradeCandidateRegion::default();
376 let persistent_context = new_persistent_context();
377 let to_peer_id = persistent_context.to_peer.id;
378
379 let mut env = TestingEnv::new();
380 let mut ctx = env.context_factory().new_context(persistent_context);
381 prepare_table_metadata(&ctx, HashMap::default()).await;
382 let mailbox_ctx = env.mailbox_context();
383
384 let (tx, rx) = tokio::sync::mpsc::channel(1);
385
386 mailbox_ctx
387 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
388 .await;
389
390 drop(rx);
391
392 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
393
394 assert_matches!(err, Error::PushMessage { .. });
395 assert!(!err.is_retryable());
396 }
397
398 #[tokio::test]
399 async fn test_procedure_exceeded_deadline() {
400 let state = UpgradeCandidateRegion::default();
401 let persistent_context = new_persistent_context();
402 let env = TestingEnv::new();
403 let mut ctx = env.context_factory().new_context(persistent_context);
404 prepare_table_metadata(&ctx, HashMap::default()).await;
405 ctx.volatile_ctx.metrics.operations_elapsed =
406 ctx.persistent_ctx.timeout + Duration::from_secs(1);
407
408 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
409
410 assert_matches!(err, Error::ExceededDeadline { .. });
411 assert!(!err.is_retryable());
412 }
413
414 #[tokio::test]
415 async fn test_unexpected_instruction_reply() {
416 let state = UpgradeCandidateRegion::default();
417 let persistent_context = new_persistent_context();
418 let to_peer_id = persistent_context.to_peer.id;
419
420 let mut env = TestingEnv::new();
421 let mut ctx = env.context_factory().new_context(persistent_context);
422 prepare_table_metadata(&ctx, HashMap::default()).await;
423 let mailbox_ctx = env.mailbox_context();
424 let mailbox = mailbox_ctx.mailbox().clone();
425
426 let (tx, rx) = tokio::sync::mpsc::channel(1);
427
428 mailbox_ctx
429 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
430 .await;
431
432 send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
433
434 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
435 assert_matches!(err, Error::UnexpectedInstructionReply { .. });
436 assert!(!err.is_retryable());
437 }
438
439 #[tokio::test]
440 async fn test_upgrade_region_failed() {
441 let state = UpgradeCandidateRegion::default();
442 let persistent_context = new_persistent_context();
443 let to_peer_id = persistent_context.to_peer.id;
444
445 let mut env = TestingEnv::new();
446 let mut ctx = env.context_factory().new_context(persistent_context);
447 prepare_table_metadata(&ctx, HashMap::default()).await;
448 let mailbox_ctx = env.mailbox_context();
449 let mailbox = mailbox_ctx.mailbox().clone();
450
451 let (tx, rx) = tokio::sync::mpsc::channel(1);
452
453 mailbox_ctx
454 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
455 .await;
456
457 send_mock_reply(mailbox, rx, |id| {
459 Ok(new_upgrade_region_reply(
460 id,
461 true,
462 true,
463 Some("test mocked".to_string()),
464 ))
465 });
466
467 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
468
469 assert_matches!(err, Error::RetryLater { .. });
470 assert!(err.is_retryable());
471 assert!(format!("{err:?}").contains("test mocked"));
472 }
473
474 #[tokio::test]
475 async fn test_upgrade_region_not_found() {
476 let state = UpgradeCandidateRegion::default();
477 let persistent_context = new_persistent_context();
478 let to_peer_id = persistent_context.to_peer.id;
479
480 let mut env = TestingEnv::new();
481 let mut ctx = env.context_factory().new_context(persistent_context);
482 prepare_table_metadata(&ctx, HashMap::default()).await;
483 let mailbox_ctx = env.mailbox_context();
484 let mailbox = mailbox_ctx.mailbox().clone();
485
486 let (tx, rx) = tokio::sync::mpsc::channel(1);
487
488 mailbox_ctx
489 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
490 .await;
491
492 send_mock_reply(mailbox, rx, |id| {
493 Ok(new_upgrade_region_reply(id, true, false, None))
494 });
495
496 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
497
498 assert_matches!(err, Error::Unexpected { .. });
499 assert!(!err.is_retryable());
500 assert!(err.to_string().contains("doesn't exist"));
501 }
502
503 #[tokio::test]
504 async fn test_upgrade_region_require_ready() {
505 let mut state = UpgradeCandidateRegion {
506 require_ready: true,
507 ..Default::default()
508 };
509
510 let persistent_context = new_persistent_context();
511 let to_peer_id = persistent_context.to_peer.id;
512
513 let mut env = TestingEnv::new();
514 let mut ctx = env.context_factory().new_context(persistent_context);
515 prepare_table_metadata(&ctx, HashMap::default()).await;
516 let mailbox_ctx = env.mailbox_context();
517 let mailbox = mailbox_ctx.mailbox().clone();
518
519 let (tx, rx) = tokio::sync::mpsc::channel(1);
520
521 mailbox_ctx
522 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
523 .await;
524
525 send_mock_reply(mailbox, rx, |id| {
526 Ok(new_upgrade_region_reply(id, false, true, None))
527 });
528
529 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
530
531 assert_matches!(err, Error::RetryLater { .. });
532 assert!(err.is_retryable());
533 assert!(format!("{err:?}").contains("still replaying the wal"));
534
535 state.require_ready = false;
537
538 let mailbox = mailbox_ctx.mailbox().clone();
539 let (tx, rx) = tokio::sync::mpsc::channel(1);
540
541 mailbox_ctx
542 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
543 .await;
544
545 send_mock_reply(mailbox, rx, |id| {
546 Ok(new_upgrade_region_reply(id, false, true, None))
547 });
548
549 state.upgrade_region(&mut ctx).await.unwrap();
550 }
551
552 #[tokio::test]
553 async fn test_upgrade_region_with_retry_ok() {
554 let mut state = Box::<UpgradeCandidateRegion>::default();
555 state.retry_initial_interval = Duration::from_millis(100);
556 let persistent_context = new_persistent_context();
557 let to_peer_id = persistent_context.to_peer.id;
558
559 let mut env = TestingEnv::new();
560 let mut ctx = env.context_factory().new_context(persistent_context);
561 prepare_table_metadata(&ctx, HashMap::default()).await;
562 let mailbox_ctx = env.mailbox_context();
563 let mailbox = mailbox_ctx.mailbox().clone();
564
565 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
566
567 mailbox_ctx
568 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
569 .await;
570
571 common_runtime::spawn_global(async move {
572 let resp = rx.recv().await.unwrap().unwrap();
573 let reply_id = resp.mailbox_message.unwrap().id;
574 mailbox
575 .on_recv(
576 reply_id,
577 Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
578 )
579 .await
580 .unwrap();
581
582 let resp = rx.recv().await.unwrap().unwrap();
584 let reply_id = resp.mailbox_message.unwrap().id;
585 mailbox
586 .on_recv(
587 reply_id,
588 Ok(new_upgrade_region_reply(reply_id, false, true, None)),
589 )
590 .await
591 .unwrap();
592
593 let resp = rx.recv().await.unwrap().unwrap();
595 let reply_id = resp.mailbox_message.unwrap().id;
596 mailbox
597 .on_recv(
598 reply_id,
599 Ok(new_upgrade_region_reply(reply_id, true, true, None)),
600 )
601 .await
602 .unwrap();
603 });
604
605 let procedure_ctx = new_procedure_context();
606 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
607
608 let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
609
610 assert_matches!(update_metadata, UpdateMetadata::Upgrade);
611 }
612
613 #[tokio::test]
614 async fn test_upgrade_region_with_retry_failed() {
615 let mut state = Box::<UpgradeCandidateRegion>::default();
616 state.retry_initial_interval = Duration::from_millis(100);
617 let persistent_context = new_persistent_context();
618 let to_peer_id = persistent_context.to_peer.id;
619
620 let mut env = TestingEnv::new();
621 let mut ctx = env.context_factory().new_context(persistent_context);
622 prepare_table_metadata(&ctx, HashMap::default()).await;
623 let mailbox_ctx = env.mailbox_context();
624 let mailbox = mailbox_ctx.mailbox().clone();
625
626 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
627
628 mailbox_ctx
629 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
630 .await;
631
632 common_runtime::spawn_global(async move {
633 let resp = rx.recv().await.unwrap().unwrap();
634 let reply_id = resp.mailbox_message.unwrap().id;
635 mailbox
636 .on_recv(
637 reply_id,
638 Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
639 )
640 .await
641 .unwrap();
642
643 let resp = rx.recv().await.unwrap().unwrap();
645 let reply_id = resp.mailbox_message.unwrap().id;
646 mailbox
647 .on_recv(
648 reply_id,
649 Ok(new_upgrade_region_reply(reply_id, false, true, None)),
650 )
651 .await
652 .unwrap();
653
654 let resp = rx.recv().await.unwrap().unwrap();
656 let reply_id = resp.mailbox_message.unwrap().id;
657 mailbox
658 .on_recv(
659 reply_id,
660 Ok(new_upgrade_region_reply(reply_id, false, false, None)),
661 )
662 .await
663 .unwrap();
664 });
665 let procedure_ctx = new_procedure_context();
666 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
667
668 let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
669 assert_matches!(update_metadata, UpdateMetadata::Rollback);
670 }
671
672 #[tokio::test]
673 async fn test_upgrade_region_procedure_exceeded_deadline() {
674 let mut state = Box::<UpgradeCandidateRegion>::default();
675 state.retry_initial_interval = Duration::from_millis(100);
676 let persistent_context = new_persistent_context();
677 let to_peer_id = persistent_context.to_peer.id;
678
679 let mut env = TestingEnv::new();
680 let mut ctx = env.context_factory().new_context(persistent_context);
681 prepare_table_metadata(&ctx, HashMap::default()).await;
682 let mailbox_ctx = env.mailbox_context();
683 let mailbox = mailbox_ctx.mailbox().clone();
684 ctx.volatile_ctx.metrics.operations_elapsed =
685 ctx.persistent_ctx.timeout + Duration::from_secs(1);
686
687 let (tx, rx) = tokio::sync::mpsc::channel(1);
688 mailbox_ctx
689 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
690 .await;
691
692 send_mock_reply(mailbox, rx, |id| {
693 Ok(new_upgrade_region_reply(id, false, true, None))
694 });
695 let procedure_ctx = new_procedure_context();
696 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
697 let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
698 assert_matches!(update_metadata, UpdateMetadata::Rollback);
699 }
700}