1use std::any::Any;
16use std::time::Duration;
17
18use api::v1::meta::MailboxMessage;
19use common_meta::instruction::{Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply};
20use common_procedure::{Context as ProcedureContext, Status};
21use common_telemetry::error;
22use serde::{Deserialize, Serialize};
23use snafu::{ensure, OptionExt, ResultExt};
24use tokio::time::{sleep, Instant};
25
26use crate::error::{self, Result};
27use crate::handler::HeartbeatMailbox;
28use crate::procedure::region_migration::update_metadata::UpdateMetadata;
29use crate::procedure::region_migration::{Context, State};
30use crate::service::mailbox::Channel;
31
32#[derive(Debug, Serialize, Deserialize)]
33pub struct UpgradeCandidateRegion {
34 pub(crate) optimistic_retry: usize,
36 pub(crate) retry_initial_interval: Duration,
38 pub(crate) require_ready: bool,
41}
42
43impl Default for UpgradeCandidateRegion {
44 fn default() -> Self {
45 Self {
46 optimistic_retry: 3,
47 retry_initial_interval: Duration::from_millis(500),
48 require_ready: true,
49 }
50 }
51}
52
53#[async_trait::async_trait]
54#[typetag::serde]
55impl State for UpgradeCandidateRegion {
56 async fn next(
57 &mut self,
58 ctx: &mut Context,
59 _procedure_ctx: &ProcedureContext,
60 ) -> Result<(Box<dyn State>, Status)> {
61 let now = Instant::now();
62 if self.upgrade_region_with_retry(ctx).await {
63 ctx.update_upgrade_candidate_region_elapsed(now);
64 Ok((Box::new(UpdateMetadata::Upgrade), Status::executing(false)))
65 } else {
66 ctx.update_upgrade_candidate_region_elapsed(now);
67 Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false)))
68 }
69 }
70
71 fn as_any(&self) -> &dyn Any {
72 self
73 }
74}
75
76impl UpgradeCandidateRegion {
77 fn build_upgrade_region_instruction(
79 &self,
80 ctx: &Context,
81 replay_timeout: Duration,
82 ) -> Instruction {
83 let pc = &ctx.persistent_ctx;
84 let region_id = pc.region_id;
85 let last_entry_id = ctx.volatile_ctx.leader_region_last_entry_id;
86 let metadata_last_entry_id = ctx.volatile_ctx.leader_region_metadata_last_entry_id;
87
88 Instruction::UpgradeRegion(UpgradeRegion {
89 region_id,
90 last_entry_id,
91 metadata_last_entry_id,
92 replay_timeout: Some(replay_timeout),
93 location_id: Some(ctx.persistent_ctx.from_peer.id),
94 })
95 }
96
97 async fn upgrade_region(&self, ctx: &Context) -> Result<()> {
112 let pc = &ctx.persistent_ctx;
113 let region_id = pc.region_id;
114 let candidate = &pc.to_peer;
115 let operation_timeout =
116 ctx.next_operation_timeout()
117 .context(error::ExceededDeadlineSnafu {
118 operation: "Upgrade region",
119 })?;
120 let upgrade_instruction = self.build_upgrade_region_instruction(ctx, operation_timeout);
121
122 let msg = MailboxMessage::json_message(
123 &format!("Upgrade candidate region: {}", region_id),
124 &format!("Metasrv@{}", ctx.server_addr()),
125 &format!("Datanode-{}@{}", candidate.id, candidate.addr),
126 common_time::util::current_time_millis(),
127 &upgrade_instruction,
128 )
129 .with_context(|_| error::SerializeToJsonSnafu {
130 input: upgrade_instruction.to_string(),
131 })?;
132
133 let ch = Channel::Datanode(candidate.id);
134 let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
135
136 match receiver.await {
137 Ok(msg) => {
138 let reply = HeartbeatMailbox::json_reply(&msg)?;
139 let InstructionReply::UpgradeRegion(UpgradeRegionReply {
140 ready,
141 exists,
142 error,
143 }) = reply
144 else {
145 return error::UnexpectedInstructionReplySnafu {
146 mailbox_message: msg.to_string(),
147 reason: "Unexpected reply of the upgrade region instruction",
148 }
149 .fail();
150 };
151
152 if error.is_some() {
154 return error::RetryLaterSnafu {
155 reason: format!(
156 "Failed to upgrade the region {} on datanode {:?}, error: {:?}",
157 region_id, candidate, error
158 ),
159 }
160 .fail();
161 }
162
163 ensure!(
164 exists,
165 error::UnexpectedSnafu {
166 violated: format!(
167 "Candidate region {} doesn't exist on datanode {:?}",
168 region_id, candidate
169 )
170 }
171 );
172
173 if self.require_ready && !ready {
174 return error::RetryLaterSnafu {
175 reason: format!(
176 "Candidate region {} still replaying the wal on datanode {:?}",
177 region_id, candidate
178 ),
179 }
180 .fail();
181 }
182
183 Ok(())
184 }
185 Err(error::Error::MailboxTimeout { .. }) => {
186 let reason = format!(
187 "Mailbox received timeout for upgrade candidate region {region_id} on datanode {:?}",
188 candidate,
189 );
190 error::RetryLaterSnafu { reason }.fail()
191 }
192 Err(err) => Err(err),
193 }
194 }
195
196 async fn upgrade_region_with_retry(&self, ctx: &mut Context) -> bool {
200 let mut retry = 0;
201 let mut upgraded = false;
202
203 loop {
204 let timer = Instant::now();
205 if let Err(err) = self.upgrade_region(ctx).await {
206 retry += 1;
207 ctx.update_operations_elapsed(timer);
208 if matches!(err, error::Error::ExceededDeadline { .. }) {
209 error!("Failed to upgrade region, exceeded deadline");
210 break;
211 } else if err.is_retryable() && retry < self.optimistic_retry {
212 error!("Failed to upgrade region, error: {err:?}, retry later");
213 sleep(self.retry_initial_interval).await;
214 } else {
215 error!("Failed to upgrade region, error: {err:?}");
216 break;
217 }
218 } else {
219 ctx.update_operations_elapsed(timer);
220 upgraded = true;
221 break;
222 }
223 }
224
225 upgraded
226 }
227}
228
229#[cfg(test)]
230mod tests {
231 use std::assert_matches::assert_matches;
232
233 use common_meta::peer::Peer;
234 use store_api::storage::RegionId;
235
236 use super::*;
237 use crate::error::Error;
238 use crate::procedure::region_migration::manager::RegionMigrationTriggerReason;
239 use crate::procedure::region_migration::test_util::{new_procedure_context, TestingEnv};
240 use crate::procedure::region_migration::{ContextFactory, PersistentContext};
241 use crate::procedure::test_util::{
242 new_close_region_reply, new_upgrade_region_reply, send_mock_reply,
243 };
244
245 fn new_persistent_context() -> PersistentContext {
246 PersistentContext {
247 catalog: "greptime".into(),
248 schema: "public".into(),
249 from_peer: Peer::empty(1),
250 to_peer: Peer::empty(2),
251 region_id: RegionId::new(1024, 1),
252 timeout: Duration::from_millis(1000),
253 trigger_reason: RegionMigrationTriggerReason::Manual,
254 }
255 }
256
257 #[tokio::test]
258 async fn test_datanode_is_unreachable() {
259 let state = UpgradeCandidateRegion::default();
260 let persistent_context = new_persistent_context();
261 let env = TestingEnv::new();
262 let ctx = env.context_factory().new_context(persistent_context);
263
264 let err = state.upgrade_region(&ctx).await.unwrap_err();
265
266 assert_matches!(err, Error::PusherNotFound { .. });
267 assert!(!err.is_retryable());
268 }
269
270 #[tokio::test]
271 async fn test_pusher_dropped() {
272 let state = UpgradeCandidateRegion::default();
273 let persistent_context = new_persistent_context();
274 let to_peer_id = persistent_context.to_peer.id;
275
276 let mut env = TestingEnv::new();
277 let ctx = env.context_factory().new_context(persistent_context);
278 let mailbox_ctx = env.mailbox_context();
279
280 let (tx, rx) = tokio::sync::mpsc::channel(1);
281
282 mailbox_ctx
283 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
284 .await;
285
286 drop(rx);
287
288 let err = state.upgrade_region(&ctx).await.unwrap_err();
289
290 assert_matches!(err, Error::PushMessage { .. });
291 assert!(!err.is_retryable());
292 }
293
294 #[tokio::test]
295 async fn test_procedure_exceeded_deadline() {
296 let state = UpgradeCandidateRegion::default();
297 let persistent_context = new_persistent_context();
298 let env = TestingEnv::new();
299 let mut ctx = env.context_factory().new_context(persistent_context);
300 ctx.volatile_ctx.metrics.operations_elapsed =
301 ctx.persistent_ctx.timeout + Duration::from_secs(1);
302
303 let err = state.upgrade_region(&ctx).await.unwrap_err();
304
305 assert_matches!(err, Error::ExceededDeadline { .. });
306 assert!(!err.is_retryable());
307 }
308
309 #[tokio::test]
310 async fn test_unexpected_instruction_reply() {
311 let state = UpgradeCandidateRegion::default();
312 let persistent_context = new_persistent_context();
313 let to_peer_id = persistent_context.to_peer.id;
314
315 let mut env = TestingEnv::new();
316 let ctx = env.context_factory().new_context(persistent_context);
317 let mailbox_ctx = env.mailbox_context();
318 let mailbox = mailbox_ctx.mailbox().clone();
319
320 let (tx, rx) = tokio::sync::mpsc::channel(1);
321
322 mailbox_ctx
323 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
324 .await;
325
326 send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
327
328 let err = state.upgrade_region(&ctx).await.unwrap_err();
329 assert_matches!(err, Error::UnexpectedInstructionReply { .. });
330 assert!(!err.is_retryable());
331 }
332
333 #[tokio::test]
334 async fn test_upgrade_region_failed() {
335 let state = UpgradeCandidateRegion::default();
336 let persistent_context = new_persistent_context();
337 let to_peer_id = persistent_context.to_peer.id;
338
339 let mut env = TestingEnv::new();
340 let ctx = env.context_factory().new_context(persistent_context);
341 let mailbox_ctx = env.mailbox_context();
342 let mailbox = mailbox_ctx.mailbox().clone();
343
344 let (tx, rx) = tokio::sync::mpsc::channel(1);
345
346 mailbox_ctx
347 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
348 .await;
349
350 send_mock_reply(mailbox, rx, |id| {
352 Ok(new_upgrade_region_reply(
353 id,
354 true,
355 true,
356 Some("test mocked".to_string()),
357 ))
358 });
359
360 let err = state.upgrade_region(&ctx).await.unwrap_err();
361
362 assert_matches!(err, Error::RetryLater { .. });
363 assert!(err.is_retryable());
364 assert!(format!("{err:?}").contains("test mocked"));
365 }
366
367 #[tokio::test]
368 async fn test_upgrade_region_not_found() {
369 let state = UpgradeCandidateRegion::default();
370 let persistent_context = new_persistent_context();
371 let to_peer_id = persistent_context.to_peer.id;
372
373 let mut env = TestingEnv::new();
374 let ctx = env.context_factory().new_context(persistent_context);
375 let mailbox_ctx = env.mailbox_context();
376 let mailbox = mailbox_ctx.mailbox().clone();
377
378 let (tx, rx) = tokio::sync::mpsc::channel(1);
379
380 mailbox_ctx
381 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
382 .await;
383
384 send_mock_reply(mailbox, rx, |id| {
385 Ok(new_upgrade_region_reply(id, true, false, None))
386 });
387
388 let err = state.upgrade_region(&ctx).await.unwrap_err();
389
390 assert_matches!(err, Error::Unexpected { .. });
391 assert!(!err.is_retryable());
392 assert!(err.to_string().contains("doesn't exist"));
393 }
394
395 #[tokio::test]
396 async fn test_upgrade_region_require_ready() {
397 let mut state = UpgradeCandidateRegion {
398 require_ready: true,
399 ..Default::default()
400 };
401
402 let persistent_context = new_persistent_context();
403 let to_peer_id = persistent_context.to_peer.id;
404
405 let mut env = TestingEnv::new();
406 let ctx = env.context_factory().new_context(persistent_context);
407 let mailbox_ctx = env.mailbox_context();
408 let mailbox = mailbox_ctx.mailbox().clone();
409
410 let (tx, rx) = tokio::sync::mpsc::channel(1);
411
412 mailbox_ctx
413 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
414 .await;
415
416 send_mock_reply(mailbox, rx, |id| {
417 Ok(new_upgrade_region_reply(id, false, true, None))
418 });
419
420 let err = state.upgrade_region(&ctx).await.unwrap_err();
421
422 assert_matches!(err, Error::RetryLater { .. });
423 assert!(err.is_retryable());
424 assert!(format!("{err:?}").contains("still replaying the wal"));
425
426 state.require_ready = false;
428
429 let mailbox = mailbox_ctx.mailbox().clone();
430 let (tx, rx) = tokio::sync::mpsc::channel(1);
431
432 mailbox_ctx
433 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
434 .await;
435
436 send_mock_reply(mailbox, rx, |id| {
437 Ok(new_upgrade_region_reply(id, false, true, None))
438 });
439
440 state.upgrade_region(&ctx).await.unwrap();
441 }
442
443 #[tokio::test]
444 async fn test_upgrade_region_with_retry_ok() {
445 let mut state = Box::<UpgradeCandidateRegion>::default();
446 state.retry_initial_interval = Duration::from_millis(100);
447 let persistent_context = new_persistent_context();
448 let to_peer_id = persistent_context.to_peer.id;
449
450 let mut env = TestingEnv::new();
451 let mut ctx = env.context_factory().new_context(persistent_context);
452 let mailbox_ctx = env.mailbox_context();
453 let mailbox = mailbox_ctx.mailbox().clone();
454
455 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
456
457 mailbox_ctx
458 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
459 .await;
460
461 common_runtime::spawn_global(async move {
462 let resp = rx.recv().await.unwrap().unwrap();
463 let reply_id = resp.mailbox_message.unwrap().id;
464 mailbox
465 .on_recv(
466 reply_id,
467 Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
468 )
469 .await
470 .unwrap();
471
472 let resp = rx.recv().await.unwrap().unwrap();
474 let reply_id = resp.mailbox_message.unwrap().id;
475 mailbox
476 .on_recv(
477 reply_id,
478 Ok(new_upgrade_region_reply(reply_id, false, true, None)),
479 )
480 .await
481 .unwrap();
482
483 let resp = rx.recv().await.unwrap().unwrap();
485 let reply_id = resp.mailbox_message.unwrap().id;
486 mailbox
487 .on_recv(
488 reply_id,
489 Ok(new_upgrade_region_reply(reply_id, true, true, None)),
490 )
491 .await
492 .unwrap();
493 });
494
495 let procedure_ctx = new_procedure_context();
496 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
497
498 let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
499
500 assert_matches!(update_metadata, UpdateMetadata::Upgrade);
501 }
502
503 #[tokio::test]
504 async fn test_upgrade_region_with_retry_failed() {
505 let mut state = Box::<UpgradeCandidateRegion>::default();
506 state.retry_initial_interval = Duration::from_millis(100);
507 let persistent_context = new_persistent_context();
508 let to_peer_id = persistent_context.to_peer.id;
509
510 let mut env = TestingEnv::new();
511 let mut ctx = env.context_factory().new_context(persistent_context);
512 let mailbox_ctx = env.mailbox_context();
513 let mailbox = mailbox_ctx.mailbox().clone();
514
515 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
516
517 mailbox_ctx
518 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
519 .await;
520
521 common_runtime::spawn_global(async move {
522 let resp = rx.recv().await.unwrap().unwrap();
523 let reply_id = resp.mailbox_message.unwrap().id;
524 mailbox
525 .on_recv(
526 reply_id,
527 Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
528 )
529 .await
530 .unwrap();
531
532 let resp = rx.recv().await.unwrap().unwrap();
534 let reply_id = resp.mailbox_message.unwrap().id;
535 mailbox
536 .on_recv(
537 reply_id,
538 Ok(new_upgrade_region_reply(reply_id, false, true, None)),
539 )
540 .await
541 .unwrap();
542
543 let resp = rx.recv().await.unwrap().unwrap();
545 let reply_id = resp.mailbox_message.unwrap().id;
546 mailbox
547 .on_recv(
548 reply_id,
549 Ok(new_upgrade_region_reply(reply_id, false, false, None)),
550 )
551 .await
552 .unwrap();
553 });
554 let procedure_ctx = new_procedure_context();
555 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
556
557 let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
558 assert_matches!(update_metadata, UpdateMetadata::Rollback);
559 }
560
561 #[tokio::test]
562 async fn test_upgrade_region_procedure_exceeded_deadline() {
563 let mut state = Box::<UpgradeCandidateRegion>::default();
564 state.retry_initial_interval = Duration::from_millis(100);
565 let persistent_context = new_persistent_context();
566 let to_peer_id = persistent_context.to_peer.id;
567
568 let mut env = TestingEnv::new();
569 let mut ctx = env.context_factory().new_context(persistent_context);
570 let mailbox_ctx = env.mailbox_context();
571 let mailbox = mailbox_ctx.mailbox().clone();
572 ctx.volatile_ctx.metrics.operations_elapsed =
573 ctx.persistent_ctx.timeout + Duration::from_secs(1);
574
575 let (tx, rx) = tokio::sync::mpsc::channel(1);
576 mailbox_ctx
577 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
578 .await;
579
580 send_mock_reply(mailbox, rx, |id| {
581 Ok(new_upgrade_region_reply(id, false, true, None))
582 });
583 let procedure_ctx = new_procedure_context();
584 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
585 let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
586 assert_matches!(update_metadata, UpdateMetadata::Rollback);
587 }
588}