1use std::any::Any;
16use std::time::Duration;
17
18use api::v1::meta::MailboxMessage;
19use common_meta::instruction::{Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply};
20use common_procedure::{Context as ProcedureContext, Status};
21use common_telemetry::error;
22use serde::{Deserialize, Serialize};
23use snafu::{ensure, OptionExt, ResultExt};
24use tokio::time::{sleep, Instant};
25
26use crate::error::{self, Result};
27use crate::handler::HeartbeatMailbox;
28use crate::procedure::region_migration::update_metadata::UpdateMetadata;
29use crate::procedure::region_migration::{Context, State};
30use crate::service::mailbox::Channel;
31
32#[derive(Debug, Serialize, Deserialize)]
33pub struct UpgradeCandidateRegion {
34 pub(crate) optimistic_retry: usize,
36 pub(crate) retry_initial_interval: Duration,
38 pub(crate) require_ready: bool,
41}
42
43impl Default for UpgradeCandidateRegion {
44 fn default() -> Self {
45 Self {
46 optimistic_retry: 3,
47 retry_initial_interval: Duration::from_millis(500),
48 require_ready: true,
49 }
50 }
51}
52
53#[async_trait::async_trait]
54#[typetag::serde]
55impl State for UpgradeCandidateRegion {
56 async fn next(
57 &mut self,
58 ctx: &mut Context,
59 _procedure_ctx: &ProcedureContext,
60 ) -> Result<(Box<dyn State>, Status)> {
61 let now = Instant::now();
62 if self.upgrade_region_with_retry(ctx).await {
63 ctx.update_upgrade_candidate_region_elapsed(now);
64 Ok((Box::new(UpdateMetadata::Upgrade), Status::executing(false)))
65 } else {
66 ctx.update_upgrade_candidate_region_elapsed(now);
67 Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false)))
68 }
69 }
70
71 fn as_any(&self) -> &dyn Any {
72 self
73 }
74}
75
76impl UpgradeCandidateRegion {
77 fn build_upgrade_region_instruction(
79 &self,
80 ctx: &Context,
81 replay_timeout: Duration,
82 ) -> Instruction {
83 let pc = &ctx.persistent_ctx;
84 let region_id = pc.region_id;
85 let last_entry_id = ctx.volatile_ctx.leader_region_last_entry_id;
86 let metadata_last_entry_id = ctx.volatile_ctx.leader_region_metadata_last_entry_id;
87
88 Instruction::UpgradeRegion(UpgradeRegion {
89 region_id,
90 last_entry_id,
91 metadata_last_entry_id,
92 replay_timeout: Some(replay_timeout),
93 location_id: Some(ctx.persistent_ctx.from_peer.id),
94 })
95 }
96
97 async fn upgrade_region(&self, ctx: &Context) -> Result<()> {
112 let pc = &ctx.persistent_ctx;
113 let region_id = pc.region_id;
114 let candidate = &pc.to_peer;
115 let operation_timeout =
116 ctx.next_operation_timeout()
117 .context(error::ExceededDeadlineSnafu {
118 operation: "Upgrade region",
119 })?;
120 let upgrade_instruction = self.build_upgrade_region_instruction(ctx, operation_timeout);
121
122 let msg = MailboxMessage::json_message(
123 &format!("Upgrade candidate region: {}", region_id),
124 &format!("Metasrv@{}", ctx.server_addr()),
125 &format!("Datanode-{}@{}", candidate.id, candidate.addr),
126 common_time::util::current_time_millis(),
127 &upgrade_instruction,
128 )
129 .with_context(|_| error::SerializeToJsonSnafu {
130 input: upgrade_instruction.to_string(),
131 })?;
132
133 let ch = Channel::Datanode(candidate.id);
134 let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
135
136 match receiver.await {
137 Ok(msg) => {
138 let reply = HeartbeatMailbox::json_reply(&msg)?;
139 let InstructionReply::UpgradeRegion(UpgradeRegionReply {
140 ready,
141 exists,
142 error,
143 }) = reply
144 else {
145 return error::UnexpectedInstructionReplySnafu {
146 mailbox_message: msg.to_string(),
147 reason: "Unexpected reply of the upgrade region instruction",
148 }
149 .fail();
150 };
151
152 if error.is_some() {
154 return error::RetryLaterSnafu {
155 reason: format!(
156 "Failed to upgrade the region {} on datanode {:?}, error: {:?}",
157 region_id, candidate, error
158 ),
159 }
160 .fail();
161 }
162
163 ensure!(
164 exists,
165 error::UnexpectedSnafu {
166 violated: format!(
167 "Candidate region {} doesn't exist on datanode {:?}",
168 region_id, candidate
169 )
170 }
171 );
172
173 if self.require_ready && !ready {
174 return error::RetryLaterSnafu {
175 reason: format!(
176 "Candidate region {} still replaying the wal on datanode {:?}",
177 region_id, candidate
178 ),
179 }
180 .fail();
181 }
182
183 Ok(())
184 }
185 Err(error::Error::MailboxTimeout { .. }) => {
186 let reason = format!(
187 "Mailbox received timeout for upgrade candidate region {region_id} on datanode {:?}",
188 candidate,
189 );
190 error::RetryLaterSnafu { reason }.fail()
191 }
192 Err(err) => Err(err),
193 }
194 }
195
196 async fn upgrade_region_with_retry(&self, ctx: &mut Context) -> bool {
200 let mut retry = 0;
201 let mut upgraded = false;
202
203 loop {
204 let timer = Instant::now();
205 if let Err(err) = self.upgrade_region(ctx).await {
206 retry += 1;
207 ctx.update_operations_elapsed(timer);
208 if matches!(err, error::Error::ExceededDeadline { .. }) {
209 error!("Failed to upgrade region, exceeded deadline");
210 break;
211 } else if err.is_retryable() && retry < self.optimistic_retry {
212 error!("Failed to upgrade region, error: {err:?}, retry later");
213 sleep(self.retry_initial_interval).await;
214 } else {
215 error!("Failed to upgrade region, error: {err:?}");
216 break;
217 }
218 } else {
219 ctx.update_operations_elapsed(timer);
220 upgraded = true;
221 break;
222 }
223 }
224
225 upgraded
226 }
227}
228
229#[cfg(test)]
230mod tests {
231 use std::assert_matches::assert_matches;
232
233 use common_meta::peer::Peer;
234 use store_api::storage::RegionId;
235
236 use super::*;
237 use crate::error::Error;
238 use crate::procedure::region_migration::test_util::{new_procedure_context, TestingEnv};
239 use crate::procedure::region_migration::{ContextFactory, PersistentContext};
240 use crate::procedure::test_util::{
241 new_close_region_reply, new_upgrade_region_reply, send_mock_reply,
242 };
243
244 fn new_persistent_context() -> PersistentContext {
245 PersistentContext {
246 catalog: "greptime".into(),
247 schema: "public".into(),
248 from_peer: Peer::empty(1),
249 to_peer: Peer::empty(2),
250 region_id: RegionId::new(1024, 1),
251 timeout: Duration::from_millis(1000),
252 }
253 }
254
255 #[tokio::test]
256 async fn test_datanode_is_unreachable() {
257 let state = UpgradeCandidateRegion::default();
258 let persistent_context = new_persistent_context();
259 let env = TestingEnv::new();
260 let ctx = env.context_factory().new_context(persistent_context);
261
262 let err = state.upgrade_region(&ctx).await.unwrap_err();
263
264 assert_matches!(err, Error::PusherNotFound { .. });
265 assert!(!err.is_retryable());
266 }
267
268 #[tokio::test]
269 async fn test_pusher_dropped() {
270 let state = UpgradeCandidateRegion::default();
271 let persistent_context = new_persistent_context();
272 let to_peer_id = persistent_context.to_peer.id;
273
274 let mut env = TestingEnv::new();
275 let ctx = env.context_factory().new_context(persistent_context);
276 let mailbox_ctx = env.mailbox_context();
277
278 let (tx, rx) = tokio::sync::mpsc::channel(1);
279
280 mailbox_ctx
281 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
282 .await;
283
284 drop(rx);
285
286 let err = state.upgrade_region(&ctx).await.unwrap_err();
287
288 assert_matches!(err, Error::PushMessage { .. });
289 assert!(!err.is_retryable());
290 }
291
292 #[tokio::test]
293 async fn test_procedure_exceeded_deadline() {
294 let state = UpgradeCandidateRegion::default();
295 let persistent_context = new_persistent_context();
296 let env = TestingEnv::new();
297 let mut ctx = env.context_factory().new_context(persistent_context);
298 ctx.volatile_ctx.metrics.operations_elapsed =
299 ctx.persistent_ctx.timeout + Duration::from_secs(1);
300
301 let err = state.upgrade_region(&ctx).await.unwrap_err();
302
303 assert_matches!(err, Error::ExceededDeadline { .. });
304 assert!(!err.is_retryable());
305 }
306
307 #[tokio::test]
308 async fn test_unexpected_instruction_reply() {
309 let state = UpgradeCandidateRegion::default();
310 let persistent_context = new_persistent_context();
311 let to_peer_id = persistent_context.to_peer.id;
312
313 let mut env = TestingEnv::new();
314 let ctx = env.context_factory().new_context(persistent_context);
315 let mailbox_ctx = env.mailbox_context();
316 let mailbox = mailbox_ctx.mailbox().clone();
317
318 let (tx, rx) = tokio::sync::mpsc::channel(1);
319
320 mailbox_ctx
321 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
322 .await;
323
324 send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
325
326 let err = state.upgrade_region(&ctx).await.unwrap_err();
327 assert_matches!(err, Error::UnexpectedInstructionReply { .. });
328 assert!(!err.is_retryable());
329 }
330
331 #[tokio::test]
332 async fn test_upgrade_region_failed() {
333 let state = UpgradeCandidateRegion::default();
334 let persistent_context = new_persistent_context();
335 let to_peer_id = persistent_context.to_peer.id;
336
337 let mut env = TestingEnv::new();
338 let ctx = env.context_factory().new_context(persistent_context);
339 let mailbox_ctx = env.mailbox_context();
340 let mailbox = mailbox_ctx.mailbox().clone();
341
342 let (tx, rx) = tokio::sync::mpsc::channel(1);
343
344 mailbox_ctx
345 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
346 .await;
347
348 send_mock_reply(mailbox, rx, |id| {
350 Ok(new_upgrade_region_reply(
351 id,
352 true,
353 true,
354 Some("test mocked".to_string()),
355 ))
356 });
357
358 let err = state.upgrade_region(&ctx).await.unwrap_err();
359
360 assert_matches!(err, Error::RetryLater { .. });
361 assert!(err.is_retryable());
362 assert!(format!("{err:?}").contains("test mocked"));
363 }
364
365 #[tokio::test]
366 async fn test_upgrade_region_not_found() {
367 let state = UpgradeCandidateRegion::default();
368 let persistent_context = new_persistent_context();
369 let to_peer_id = persistent_context.to_peer.id;
370
371 let mut env = TestingEnv::new();
372 let ctx = env.context_factory().new_context(persistent_context);
373 let mailbox_ctx = env.mailbox_context();
374 let mailbox = mailbox_ctx.mailbox().clone();
375
376 let (tx, rx) = tokio::sync::mpsc::channel(1);
377
378 mailbox_ctx
379 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
380 .await;
381
382 send_mock_reply(mailbox, rx, |id| {
383 Ok(new_upgrade_region_reply(id, true, false, None))
384 });
385
386 let err = state.upgrade_region(&ctx).await.unwrap_err();
387
388 assert_matches!(err, Error::Unexpected { .. });
389 assert!(!err.is_retryable());
390 assert!(err.to_string().contains("doesn't exist"));
391 }
392
393 #[tokio::test]
394 async fn test_upgrade_region_require_ready() {
395 let mut state = UpgradeCandidateRegion {
396 require_ready: true,
397 ..Default::default()
398 };
399
400 let persistent_context = new_persistent_context();
401 let to_peer_id = persistent_context.to_peer.id;
402
403 let mut env = TestingEnv::new();
404 let ctx = env.context_factory().new_context(persistent_context);
405 let mailbox_ctx = env.mailbox_context();
406 let mailbox = mailbox_ctx.mailbox().clone();
407
408 let (tx, rx) = tokio::sync::mpsc::channel(1);
409
410 mailbox_ctx
411 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
412 .await;
413
414 send_mock_reply(mailbox, rx, |id| {
415 Ok(new_upgrade_region_reply(id, false, true, None))
416 });
417
418 let err = state.upgrade_region(&ctx).await.unwrap_err();
419
420 assert_matches!(err, Error::RetryLater { .. });
421 assert!(err.is_retryable());
422 assert!(format!("{err:?}").contains("still replaying the wal"));
423
424 state.require_ready = false;
426
427 let mailbox = mailbox_ctx.mailbox().clone();
428 let (tx, rx) = tokio::sync::mpsc::channel(1);
429
430 mailbox_ctx
431 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
432 .await;
433
434 send_mock_reply(mailbox, rx, |id| {
435 Ok(new_upgrade_region_reply(id, false, true, None))
436 });
437
438 state.upgrade_region(&ctx).await.unwrap();
439 }
440
441 #[tokio::test]
442 async fn test_upgrade_region_with_retry_ok() {
443 let mut state = Box::<UpgradeCandidateRegion>::default();
444 state.retry_initial_interval = Duration::from_millis(100);
445 let persistent_context = new_persistent_context();
446 let to_peer_id = persistent_context.to_peer.id;
447
448 let mut env = TestingEnv::new();
449 let mut ctx = env.context_factory().new_context(persistent_context);
450 let mailbox_ctx = env.mailbox_context();
451 let mailbox = mailbox_ctx.mailbox().clone();
452
453 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
454
455 mailbox_ctx
456 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
457 .await;
458
459 common_runtime::spawn_global(async move {
460 let resp = rx.recv().await.unwrap().unwrap();
461 let reply_id = resp.mailbox_message.unwrap().id;
462 mailbox
463 .on_recv(
464 reply_id,
465 Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
466 )
467 .await
468 .unwrap();
469
470 let resp = rx.recv().await.unwrap().unwrap();
472 let reply_id = resp.mailbox_message.unwrap().id;
473 mailbox
474 .on_recv(
475 reply_id,
476 Ok(new_upgrade_region_reply(reply_id, false, true, None)),
477 )
478 .await
479 .unwrap();
480
481 let resp = rx.recv().await.unwrap().unwrap();
483 let reply_id = resp.mailbox_message.unwrap().id;
484 mailbox
485 .on_recv(
486 reply_id,
487 Ok(new_upgrade_region_reply(reply_id, true, true, None)),
488 )
489 .await
490 .unwrap();
491 });
492
493 let procedure_ctx = new_procedure_context();
494 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
495
496 let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
497
498 assert_matches!(update_metadata, UpdateMetadata::Upgrade);
499 }
500
501 #[tokio::test]
502 async fn test_upgrade_region_with_retry_failed() {
503 let mut state = Box::<UpgradeCandidateRegion>::default();
504 state.retry_initial_interval = Duration::from_millis(100);
505 let persistent_context = new_persistent_context();
506 let to_peer_id = persistent_context.to_peer.id;
507
508 let mut env = TestingEnv::new();
509 let mut ctx = env.context_factory().new_context(persistent_context);
510 let mailbox_ctx = env.mailbox_context();
511 let mailbox = mailbox_ctx.mailbox().clone();
512
513 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
514
515 mailbox_ctx
516 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
517 .await;
518
519 common_runtime::spawn_global(async move {
520 let resp = rx.recv().await.unwrap().unwrap();
521 let reply_id = resp.mailbox_message.unwrap().id;
522 mailbox
523 .on_recv(
524 reply_id,
525 Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
526 )
527 .await
528 .unwrap();
529
530 let resp = rx.recv().await.unwrap().unwrap();
532 let reply_id = resp.mailbox_message.unwrap().id;
533 mailbox
534 .on_recv(
535 reply_id,
536 Ok(new_upgrade_region_reply(reply_id, false, true, None)),
537 )
538 .await
539 .unwrap();
540
541 let resp = rx.recv().await.unwrap().unwrap();
543 let reply_id = resp.mailbox_message.unwrap().id;
544 mailbox
545 .on_recv(
546 reply_id,
547 Ok(new_upgrade_region_reply(reply_id, false, false, None)),
548 )
549 .await
550 .unwrap();
551 });
552 let procedure_ctx = new_procedure_context();
553 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
554
555 let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
556 assert_matches!(update_metadata, UpdateMetadata::Rollback);
557 }
558
559 #[tokio::test]
560 async fn test_upgrade_region_procedure_exceeded_deadline() {
561 let mut state = Box::<UpgradeCandidateRegion>::default();
562 state.retry_initial_interval = Duration::from_millis(100);
563 let persistent_context = new_persistent_context();
564 let to_peer_id = persistent_context.to_peer.id;
565
566 let mut env = TestingEnv::new();
567 let mut ctx = env.context_factory().new_context(persistent_context);
568 let mailbox_ctx = env.mailbox_context();
569 let mailbox = mailbox_ctx.mailbox().clone();
570 ctx.volatile_ctx.metrics.operations_elapsed =
571 ctx.persistent_ctx.timeout + Duration::from_secs(1);
572
573 let (tx, rx) = tokio::sync::mpsc::channel(1);
574 mailbox_ctx
575 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
576 .await;
577
578 send_mock_reply(mailbox, rx, |id| {
579 Ok(new_upgrade_region_reply(id, false, true, None))
580 });
581 let procedure_ctx = new_procedure_context();
582 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
583 let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
584 assert_matches!(update_metadata, UpdateMetadata::Rollback);
585 }
586}