1use std::any::Any;
16use std::time::Duration;
17
18use api::v1::meta::MailboxMessage;
19use common_meta::ddl::utils::parse_region_wal_options;
20use common_meta::instruction::{Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply};
21use common_meta::lock_key::RemoteWalLock;
22use common_meta::wal_options_allocator::extract_topic_from_wal_options;
23use common_procedure::{Context as ProcedureContext, Status};
24use common_telemetry::{error, warn};
25use common_wal::options::WalOptions;
26use serde::{Deserialize, Serialize};
27use snafu::{ensure, OptionExt, ResultExt};
28use tokio::time::{sleep, Instant};
29
30use crate::error::{self, Result};
31use crate::handler::HeartbeatMailbox;
32use crate::procedure::region_migration::update_metadata::UpdateMetadata;
33use crate::procedure::region_migration::{Context, State};
34use crate::service::mailbox::Channel;
35
36#[derive(Debug, Serialize, Deserialize)]
37pub struct UpgradeCandidateRegion {
38 pub(crate) optimistic_retry: usize,
40 pub(crate) retry_initial_interval: Duration,
42 pub(crate) require_ready: bool,
45}
46
47impl Default for UpgradeCandidateRegion {
48 fn default() -> Self {
49 Self {
50 optimistic_retry: 3,
51 retry_initial_interval: Duration::from_millis(500),
52 require_ready: true,
53 }
54 }
55}
56
57#[async_trait::async_trait]
58#[typetag::serde]
59impl State for UpgradeCandidateRegion {
60 async fn next(
61 &mut self,
62 ctx: &mut Context,
63 procedure_ctx: &ProcedureContext,
64 ) -> Result<(Box<dyn State>, Status)> {
65 let now = Instant::now();
66
67 let region_wal_option = self.get_region_wal_option(ctx).await?;
68 let region_id = ctx.persistent_ctx.region_id;
69 if region_wal_option.is_none() {
70 warn!(
71 "Region {} wal options not found, during upgrade candidate region",
72 region_id
73 );
74 }
75
76 if self
77 .upgrade_region_with_retry(ctx, procedure_ctx, region_wal_option.as_ref())
78 .await
79 {
80 ctx.update_upgrade_candidate_region_elapsed(now);
81 Ok((Box::new(UpdateMetadata::Upgrade), Status::executing(false)))
82 } else {
83 ctx.update_upgrade_candidate_region_elapsed(now);
84 Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false)))
85 }
86 }
87
88 fn as_any(&self) -> &dyn Any {
89 self
90 }
91}
92
93impl UpgradeCandidateRegion {
94 async fn get_region_wal_option(&self, ctx: &mut Context) -> Result<Option<WalOptions>> {
95 let region_id = ctx.persistent_ctx.region_id;
96 match ctx.get_from_peer_datanode_table_value().await {
97 Ok(datanode_table_value) => {
98 let region_wal_options =
99 parse_region_wal_options(&datanode_table_value.region_info.region_wal_options)
100 .context(error::ParseWalOptionsSnafu)?;
101 Ok(region_wal_options.get(®ion_id.region_number()).cloned())
102 }
103 Err(error::Error::DatanodeTableNotFound { datanode_id, .. }) => {
104 warn!(
105 "Datanode table not found, during upgrade candidate region, the target region might already been migrated, region_id: {}, datanode_id: {}",
106 region_id, datanode_id
107 );
108 Ok(None)
109 }
110 Err(e) => Err(e),
111 }
112 }
113
114 async fn build_upgrade_region_instruction(
116 &self,
117 ctx: &mut Context,
118 replay_timeout: Duration,
119 ) -> Result<Instruction> {
120 let pc = &ctx.persistent_ctx;
121 let region_id = pc.region_id;
122 let last_entry_id = ctx.volatile_ctx.leader_region_last_entry_id;
123 let metadata_last_entry_id = ctx.volatile_ctx.leader_region_metadata_last_entry_id;
124 let datanode_table_value = ctx.get_from_peer_datanode_table_value().await.ok();
126 let checkpoint = if let Some(topic) = datanode_table_value.as_ref().and_then(|v| {
127 extract_topic_from_wal_options(region_id, &v.region_info.region_wal_options)
128 }) {
129 ctx.fetch_replay_checkpoint(&topic).await.ok().flatten()
130 } else {
131 None
132 };
133
134 let upgrade_instruction = Instruction::UpgradeRegion(
135 UpgradeRegion {
136 region_id,
137 last_entry_id,
138 metadata_last_entry_id,
139 replay_timeout: Some(replay_timeout),
140 location_id: Some(ctx.persistent_ctx.from_peer.id),
141 replay_entry_id: None,
142 metadata_replay_entry_id: None,
143 }
144 .with_replay_entry_id(checkpoint.map(|c| c.entry_id))
145 .with_metadata_replay_entry_id(checkpoint.and_then(|c| c.metadata_entry_id)),
146 );
147
148 Ok(upgrade_instruction)
149 }
150
151 async fn upgrade_region(&self, ctx: &mut Context) -> Result<()> {
166 let operation_timeout =
167 ctx.next_operation_timeout()
168 .context(error::ExceededDeadlineSnafu {
169 operation: "Upgrade region",
170 })?;
171 let upgrade_instruction = self
172 .build_upgrade_region_instruction(ctx, operation_timeout)
173 .await?;
174
175 let pc = &ctx.persistent_ctx;
176 let region_id = pc.region_id;
177 let candidate = &pc.to_peer;
178
179 let msg = MailboxMessage::json_message(
180 &format!("Upgrade candidate region: {}", region_id),
181 &format!("Metasrv@{}", ctx.server_addr()),
182 &format!("Datanode-{}@{}", candidate.id, candidate.addr),
183 common_time::util::current_time_millis(),
184 &upgrade_instruction,
185 )
186 .with_context(|_| error::SerializeToJsonSnafu {
187 input: upgrade_instruction.to_string(),
188 })?;
189
190 let ch = Channel::Datanode(candidate.id);
191 let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
192
193 match receiver.await {
194 Ok(msg) => {
195 let reply = HeartbeatMailbox::json_reply(&msg)?;
196 let InstructionReply::UpgradeRegion(UpgradeRegionReply {
197 ready,
198 exists,
199 error,
200 }) = reply
201 else {
202 return error::UnexpectedInstructionReplySnafu {
203 mailbox_message: msg.to_string(),
204 reason: "Unexpected reply of the upgrade region instruction",
205 }
206 .fail();
207 };
208
209 if error.is_some() {
211 return error::RetryLaterSnafu {
212 reason: format!(
213 "Failed to upgrade the region {} on datanode {:?}, error: {:?}",
214 region_id, candidate, error
215 ),
216 }
217 .fail();
218 }
219
220 ensure!(
221 exists,
222 error::UnexpectedSnafu {
223 violated: format!(
224 "Candidate region {} doesn't exist on datanode {:?}",
225 region_id, candidate
226 )
227 }
228 );
229
230 if self.require_ready && !ready {
231 return error::RetryLaterSnafu {
232 reason: format!(
233 "Candidate region {} still replaying the wal on datanode {:?}",
234 region_id, candidate
235 ),
236 }
237 .fail();
238 }
239
240 Ok(())
241 }
242 Err(error::Error::MailboxTimeout { .. }) => {
243 let reason = format!(
244 "Mailbox received timeout for upgrade candidate region {region_id} on datanode {:?}",
245 candidate,
246 );
247 error::RetryLaterSnafu { reason }.fail()
248 }
249 Err(err) => Err(err),
250 }
251 }
252
253 async fn upgrade_region_with_retry(
257 &self,
258 ctx: &mut Context,
259 procedure_ctx: &ProcedureContext,
260 wal_options: Option<&WalOptions>,
261 ) -> bool {
262 let mut retry = 0;
263 let mut upgraded = false;
264
265 loop {
266 let timer = Instant::now();
267 let _guard = if let Some(WalOptions::Kafka(kafka_wal_options)) = wal_options {
269 Some(
270 procedure_ctx
271 .provider
272 .acquire_lock(
273 &(RemoteWalLock::Read(kafka_wal_options.topic.clone()).into()),
274 )
275 .await,
276 )
277 } else {
278 None
279 };
280 if let Err(err) = self.upgrade_region(ctx).await {
281 retry += 1;
282 ctx.update_operations_elapsed(timer);
283 if matches!(err, error::Error::ExceededDeadline { .. }) {
284 error!("Failed to upgrade region, exceeded deadline");
285 break;
286 } else if err.is_retryable() && retry < self.optimistic_retry {
287 error!("Failed to upgrade region, error: {err:?}, retry later");
288 sleep(self.retry_initial_interval).await;
289 } else {
290 error!("Failed to upgrade region, error: {err:?}");
291 break;
292 }
293 } else {
294 ctx.update_operations_elapsed(timer);
295 upgraded = true;
296 break;
297 }
298 }
299
300 upgraded
301 }
302}
303
304#[cfg(test)]
305mod tests {
306 use std::assert_matches::assert_matches;
307 use std::collections::HashMap;
308
309 use common_meta::key::table_route::TableRouteValue;
310 use common_meta::key::test_utils::new_test_table_info;
311 use common_meta::peer::Peer;
312 use common_meta::rpc::router::{Region, RegionRoute};
313 use store_api::storage::RegionId;
314
315 use super::*;
316 use crate::error::Error;
317 use crate::procedure::region_migration::manager::RegionMigrationTriggerReason;
318 use crate::procedure::region_migration::test_util::{new_procedure_context, TestingEnv};
319 use crate::procedure::region_migration::{ContextFactory, PersistentContext};
320 use crate::procedure::test_util::{
321 new_close_region_reply, new_upgrade_region_reply, send_mock_reply,
322 };
323
324 fn new_persistent_context() -> PersistentContext {
325 PersistentContext {
326 catalog: "greptime".into(),
327 schema: "public".into(),
328 from_peer: Peer::empty(1),
329 to_peer: Peer::empty(2),
330 region_id: RegionId::new(1024, 1),
331 timeout: Duration::from_millis(1000),
332 trigger_reason: RegionMigrationTriggerReason::Manual,
333 }
334 }
335
336 async fn prepare_table_metadata(ctx: &Context, wal_options: HashMap<u32, String>) {
337 let table_info =
338 new_test_table_info(ctx.persistent_ctx.region_id.table_id(), vec![1]).into();
339 let region_routes = vec![RegionRoute {
340 region: Region::new_test(ctx.persistent_ctx.region_id),
341 leader_peer: Some(ctx.persistent_ctx.from_peer.clone()),
342 follower_peers: vec![ctx.persistent_ctx.to_peer.clone()],
343 ..Default::default()
344 }];
345 ctx.table_metadata_manager
346 .create_table_metadata(
347 table_info,
348 TableRouteValue::physical(region_routes),
349 wal_options,
350 )
351 .await
352 .unwrap();
353 }
354
355 #[tokio::test]
356 async fn test_datanode_is_unreachable() {
357 let state = UpgradeCandidateRegion::default();
358 let persistent_context = new_persistent_context();
359 let env = TestingEnv::new();
360 let mut ctx = env.context_factory().new_context(persistent_context);
361 prepare_table_metadata(&ctx, HashMap::default()).await;
362 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
363
364 assert_matches!(err, Error::PusherNotFound { .. });
365 assert!(!err.is_retryable());
366 }
367
368 #[tokio::test]
369 async fn test_pusher_dropped() {
370 let state = UpgradeCandidateRegion::default();
371 let persistent_context = new_persistent_context();
372 let to_peer_id = persistent_context.to_peer.id;
373
374 let mut env = TestingEnv::new();
375 let mut ctx = env.context_factory().new_context(persistent_context);
376 prepare_table_metadata(&ctx, HashMap::default()).await;
377 let mailbox_ctx = env.mailbox_context();
378
379 let (tx, rx) = tokio::sync::mpsc::channel(1);
380
381 mailbox_ctx
382 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
383 .await;
384
385 drop(rx);
386
387 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
388
389 assert_matches!(err, Error::PushMessage { .. });
390 assert!(!err.is_retryable());
391 }
392
393 #[tokio::test]
394 async fn test_procedure_exceeded_deadline() {
395 let state = UpgradeCandidateRegion::default();
396 let persistent_context = new_persistent_context();
397 let env = TestingEnv::new();
398 let mut ctx = env.context_factory().new_context(persistent_context);
399 prepare_table_metadata(&ctx, HashMap::default()).await;
400 ctx.volatile_ctx.metrics.operations_elapsed =
401 ctx.persistent_ctx.timeout + Duration::from_secs(1);
402
403 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
404
405 assert_matches!(err, Error::ExceededDeadline { .. });
406 assert!(!err.is_retryable());
407 }
408
409 #[tokio::test]
410 async fn test_unexpected_instruction_reply() {
411 let state = UpgradeCandidateRegion::default();
412 let persistent_context = new_persistent_context();
413 let to_peer_id = persistent_context.to_peer.id;
414
415 let mut env = TestingEnv::new();
416 let mut ctx = env.context_factory().new_context(persistent_context);
417 prepare_table_metadata(&ctx, HashMap::default()).await;
418 let mailbox_ctx = env.mailbox_context();
419 let mailbox = mailbox_ctx.mailbox().clone();
420
421 let (tx, rx) = tokio::sync::mpsc::channel(1);
422
423 mailbox_ctx
424 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
425 .await;
426
427 send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
428
429 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
430 assert_matches!(err, Error::UnexpectedInstructionReply { .. });
431 assert!(!err.is_retryable());
432 }
433
434 #[tokio::test]
435 async fn test_upgrade_region_failed() {
436 let state = UpgradeCandidateRegion::default();
437 let persistent_context = new_persistent_context();
438 let to_peer_id = persistent_context.to_peer.id;
439
440 let mut env = TestingEnv::new();
441 let mut ctx = env.context_factory().new_context(persistent_context);
442 prepare_table_metadata(&ctx, HashMap::default()).await;
443 let mailbox_ctx = env.mailbox_context();
444 let mailbox = mailbox_ctx.mailbox().clone();
445
446 let (tx, rx) = tokio::sync::mpsc::channel(1);
447
448 mailbox_ctx
449 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
450 .await;
451
452 send_mock_reply(mailbox, rx, |id| {
454 Ok(new_upgrade_region_reply(
455 id,
456 true,
457 true,
458 Some("test mocked".to_string()),
459 ))
460 });
461
462 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
463
464 assert_matches!(err, Error::RetryLater { .. });
465 assert!(err.is_retryable());
466 assert!(format!("{err:?}").contains("test mocked"));
467 }
468
469 #[tokio::test]
470 async fn test_upgrade_region_not_found() {
471 let state = UpgradeCandidateRegion::default();
472 let persistent_context = new_persistent_context();
473 let to_peer_id = persistent_context.to_peer.id;
474
475 let mut env = TestingEnv::new();
476 let mut ctx = env.context_factory().new_context(persistent_context);
477 prepare_table_metadata(&ctx, HashMap::default()).await;
478 let mailbox_ctx = env.mailbox_context();
479 let mailbox = mailbox_ctx.mailbox().clone();
480
481 let (tx, rx) = tokio::sync::mpsc::channel(1);
482
483 mailbox_ctx
484 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
485 .await;
486
487 send_mock_reply(mailbox, rx, |id| {
488 Ok(new_upgrade_region_reply(id, true, false, None))
489 });
490
491 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
492
493 assert_matches!(err, Error::Unexpected { .. });
494 assert!(!err.is_retryable());
495 assert!(err.to_string().contains("doesn't exist"));
496 }
497
498 #[tokio::test]
499 async fn test_upgrade_region_require_ready() {
500 let mut state = UpgradeCandidateRegion {
501 require_ready: true,
502 ..Default::default()
503 };
504
505 let persistent_context = new_persistent_context();
506 let to_peer_id = persistent_context.to_peer.id;
507
508 let mut env = TestingEnv::new();
509 let mut ctx = env.context_factory().new_context(persistent_context);
510 prepare_table_metadata(&ctx, HashMap::default()).await;
511 let mailbox_ctx = env.mailbox_context();
512 let mailbox = mailbox_ctx.mailbox().clone();
513
514 let (tx, rx) = tokio::sync::mpsc::channel(1);
515
516 mailbox_ctx
517 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
518 .await;
519
520 send_mock_reply(mailbox, rx, |id| {
521 Ok(new_upgrade_region_reply(id, false, true, None))
522 });
523
524 let err = state.upgrade_region(&mut ctx).await.unwrap_err();
525
526 assert_matches!(err, Error::RetryLater { .. });
527 assert!(err.is_retryable());
528 assert!(format!("{err:?}").contains("still replaying the wal"));
529
530 state.require_ready = false;
532
533 let mailbox = mailbox_ctx.mailbox().clone();
534 let (tx, rx) = tokio::sync::mpsc::channel(1);
535
536 mailbox_ctx
537 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
538 .await;
539
540 send_mock_reply(mailbox, rx, |id| {
541 Ok(new_upgrade_region_reply(id, false, true, None))
542 });
543
544 state.upgrade_region(&mut ctx).await.unwrap();
545 }
546
547 #[tokio::test]
548 async fn test_upgrade_region_with_retry_ok() {
549 let mut state = Box::<UpgradeCandidateRegion>::default();
550 state.retry_initial_interval = Duration::from_millis(100);
551 let persistent_context = new_persistent_context();
552 let to_peer_id = persistent_context.to_peer.id;
553
554 let mut env = TestingEnv::new();
555 let mut ctx = env.context_factory().new_context(persistent_context);
556 prepare_table_metadata(&ctx, HashMap::default()).await;
557 let mailbox_ctx = env.mailbox_context();
558 let mailbox = mailbox_ctx.mailbox().clone();
559
560 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
561
562 mailbox_ctx
563 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
564 .await;
565
566 common_runtime::spawn_global(async move {
567 let resp = rx.recv().await.unwrap().unwrap();
568 let reply_id = resp.mailbox_message.unwrap().id;
569 mailbox
570 .on_recv(
571 reply_id,
572 Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
573 )
574 .await
575 .unwrap();
576
577 let resp = rx.recv().await.unwrap().unwrap();
579 let reply_id = resp.mailbox_message.unwrap().id;
580 mailbox
581 .on_recv(
582 reply_id,
583 Ok(new_upgrade_region_reply(reply_id, false, true, None)),
584 )
585 .await
586 .unwrap();
587
588 let resp = rx.recv().await.unwrap().unwrap();
590 let reply_id = resp.mailbox_message.unwrap().id;
591 mailbox
592 .on_recv(
593 reply_id,
594 Ok(new_upgrade_region_reply(reply_id, true, true, None)),
595 )
596 .await
597 .unwrap();
598 });
599
600 let procedure_ctx = new_procedure_context();
601 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
602
603 let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
604
605 assert_matches!(update_metadata, UpdateMetadata::Upgrade);
606 }
607
608 #[tokio::test]
609 async fn test_upgrade_region_with_retry_failed() {
610 let mut state = Box::<UpgradeCandidateRegion>::default();
611 state.retry_initial_interval = Duration::from_millis(100);
612 let persistent_context = new_persistent_context();
613 let to_peer_id = persistent_context.to_peer.id;
614
615 let mut env = TestingEnv::new();
616 let mut ctx = env.context_factory().new_context(persistent_context);
617 prepare_table_metadata(&ctx, HashMap::default()).await;
618 let mailbox_ctx = env.mailbox_context();
619 let mailbox = mailbox_ctx.mailbox().clone();
620
621 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
622
623 mailbox_ctx
624 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
625 .await;
626
627 common_runtime::spawn_global(async move {
628 let resp = rx.recv().await.unwrap().unwrap();
629 let reply_id = resp.mailbox_message.unwrap().id;
630 mailbox
631 .on_recv(
632 reply_id,
633 Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
634 )
635 .await
636 .unwrap();
637
638 let resp = rx.recv().await.unwrap().unwrap();
640 let reply_id = resp.mailbox_message.unwrap().id;
641 mailbox
642 .on_recv(
643 reply_id,
644 Ok(new_upgrade_region_reply(reply_id, false, true, None)),
645 )
646 .await
647 .unwrap();
648
649 let resp = rx.recv().await.unwrap().unwrap();
651 let reply_id = resp.mailbox_message.unwrap().id;
652 mailbox
653 .on_recv(
654 reply_id,
655 Ok(new_upgrade_region_reply(reply_id, false, false, None)),
656 )
657 .await
658 .unwrap();
659 });
660 let procedure_ctx = new_procedure_context();
661 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
662
663 let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
664 assert_matches!(update_metadata, UpdateMetadata::Rollback);
665 }
666
667 #[tokio::test]
668 async fn test_upgrade_region_procedure_exceeded_deadline() {
669 let mut state = Box::<UpgradeCandidateRegion>::default();
670 state.retry_initial_interval = Duration::from_millis(100);
671 let persistent_context = new_persistent_context();
672 let to_peer_id = persistent_context.to_peer.id;
673
674 let mut env = TestingEnv::new();
675 let mut ctx = env.context_factory().new_context(persistent_context);
676 prepare_table_metadata(&ctx, HashMap::default()).await;
677 let mailbox_ctx = env.mailbox_context();
678 let mailbox = mailbox_ctx.mailbox().clone();
679 ctx.volatile_ctx.metrics.operations_elapsed =
680 ctx.persistent_ctx.timeout + Duration::from_secs(1);
681
682 let (tx, rx) = tokio::sync::mpsc::channel(1);
683 mailbox_ctx
684 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
685 .await;
686
687 send_mock_reply(mailbox, rx, |id| {
688 Ok(new_upgrade_region_reply(id, false, true, None))
689 });
690 let procedure_ctx = new_procedure_context();
691 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
692 let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
693 assert_matches!(update_metadata, UpdateMetadata::Rollback);
694 }
695}