meta_srv/procedure/region_migration/
upgrade_candidate_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::time::Duration;
17
18use api::v1::meta::MailboxMessage;
19use common_meta::instruction::{Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply};
20use common_procedure::{Context as ProcedureContext, Status};
21use common_telemetry::error;
22use serde::{Deserialize, Serialize};
23use snafu::{ensure, OptionExt, ResultExt};
24use tokio::time::{sleep, Instant};
25
26use crate::error::{self, Result};
27use crate::handler::HeartbeatMailbox;
28use crate::procedure::region_migration::update_metadata::UpdateMetadata;
29use crate::procedure::region_migration::{Context, State};
30use crate::service::mailbox::Channel;
31
32#[derive(Debug, Serialize, Deserialize)]
33pub struct UpgradeCandidateRegion {
34    // The optimistic retry times.
35    pub(crate) optimistic_retry: usize,
36    // The retry initial interval.
37    pub(crate) retry_initial_interval: Duration,
38    // If it's true it requires the candidate region MUST replay the WAL to the latest entry id.
39    // Otherwise, it will rollback to the old leader region.
40    pub(crate) require_ready: bool,
41}
42
43impl Default for UpgradeCandidateRegion {
44    fn default() -> Self {
45        Self {
46            optimistic_retry: 3,
47            retry_initial_interval: Duration::from_millis(500),
48            require_ready: true,
49        }
50    }
51}
52
53#[async_trait::async_trait]
54#[typetag::serde]
55impl State for UpgradeCandidateRegion {
56    async fn next(
57        &mut self,
58        ctx: &mut Context,
59        _procedure_ctx: &ProcedureContext,
60    ) -> Result<(Box<dyn State>, Status)> {
61        let now = Instant::now();
62        if self.upgrade_region_with_retry(ctx).await {
63            ctx.update_upgrade_candidate_region_elapsed(now);
64            Ok((Box::new(UpdateMetadata::Upgrade), Status::executing(false)))
65        } else {
66            ctx.update_upgrade_candidate_region_elapsed(now);
67            Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false)))
68        }
69    }
70
71    fn as_any(&self) -> &dyn Any {
72        self
73    }
74}
75
76impl UpgradeCandidateRegion {
77    /// Builds upgrade region instruction.
78    fn build_upgrade_region_instruction(
79        &self,
80        ctx: &Context,
81        replay_timeout: Duration,
82    ) -> Instruction {
83        let pc = &ctx.persistent_ctx;
84        let region_id = pc.region_id;
85        let last_entry_id = ctx.volatile_ctx.leader_region_last_entry_id;
86        let metadata_last_entry_id = ctx.volatile_ctx.leader_region_metadata_last_entry_id;
87
88        Instruction::UpgradeRegion(UpgradeRegion {
89            region_id,
90            last_entry_id,
91            metadata_last_entry_id,
92            replay_timeout: Some(replay_timeout),
93            location_id: Some(ctx.persistent_ctx.from_peer.id),
94        })
95    }
96
97    /// Tries to upgrade a candidate region.
98    ///
99    /// Retry:
100    /// - If `require_ready` is true, but the candidate region returns `ready` is false.
101    /// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout.
102    ///
103    /// Abort:
104    /// - The candidate region doesn't exist.
105    /// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable.
106    /// - [PushMessage](error::Error::PushMessage), The receiver is dropped.
107    /// - [MailboxReceiver](error::Error::MailboxReceiver), The sender is dropped without sending (impossible).
108    /// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply) (impossible).
109    /// - [ExceededDeadline](error::Error::ExceededDeadline)
110    /// - Invalid JSON (impossible).
111    async fn upgrade_region(&self, ctx: &Context) -> Result<()> {
112        let pc = &ctx.persistent_ctx;
113        let region_id = pc.region_id;
114        let candidate = &pc.to_peer;
115        let operation_timeout =
116            ctx.next_operation_timeout()
117                .context(error::ExceededDeadlineSnafu {
118                    operation: "Upgrade region",
119                })?;
120        let upgrade_instruction = self.build_upgrade_region_instruction(ctx, operation_timeout);
121
122        let msg = MailboxMessage::json_message(
123            &format!("Upgrade candidate region: {}", region_id),
124            &format!("Metasrv@{}", ctx.server_addr()),
125            &format!("Datanode-{}@{}", candidate.id, candidate.addr),
126            common_time::util::current_time_millis(),
127            &upgrade_instruction,
128        )
129        .with_context(|_| error::SerializeToJsonSnafu {
130            input: upgrade_instruction.to_string(),
131        })?;
132
133        let ch = Channel::Datanode(candidate.id);
134        let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
135
136        match receiver.await {
137            Ok(msg) => {
138                let reply = HeartbeatMailbox::json_reply(&msg)?;
139                let InstructionReply::UpgradeRegion(UpgradeRegionReply {
140                    ready,
141                    exists,
142                    error,
143                }) = reply
144                else {
145                    return error::UnexpectedInstructionReplySnafu {
146                        mailbox_message: msg.to_string(),
147                        reason: "Unexpected reply of the upgrade region instruction",
148                    }
149                    .fail();
150                };
151
152                // Notes: The order of handling is important.
153                if error.is_some() {
154                    return error::RetryLaterSnafu {
155                        reason: format!(
156                            "Failed to upgrade the region {} on datanode {:?}, error: {:?}",
157                            region_id, candidate, error
158                        ),
159                    }
160                    .fail();
161                }
162
163                ensure!(
164                    exists,
165                    error::UnexpectedSnafu {
166                        violated: format!(
167                            "Candidate region {} doesn't exist on datanode {:?}",
168                            region_id, candidate
169                        )
170                    }
171                );
172
173                if self.require_ready && !ready {
174                    return error::RetryLaterSnafu {
175                        reason: format!(
176                            "Candidate region {} still replaying the wal on datanode {:?}",
177                            region_id, candidate
178                        ),
179                    }
180                    .fail();
181                }
182
183                Ok(())
184            }
185            Err(error::Error::MailboxTimeout { .. }) => {
186                let reason = format!(
187                        "Mailbox received timeout for upgrade candidate region {region_id} on datanode {:?}", 
188                        candidate,
189                    );
190                error::RetryLaterSnafu { reason }.fail()
191            }
192            Err(err) => Err(err),
193        }
194    }
195
196    /// Upgrades a candidate region.
197    ///
198    /// Returns true if the candidate region is upgraded successfully.
199    async fn upgrade_region_with_retry(&self, ctx: &mut Context) -> bool {
200        let mut retry = 0;
201        let mut upgraded = false;
202
203        loop {
204            let timer = Instant::now();
205            if let Err(err) = self.upgrade_region(ctx).await {
206                retry += 1;
207                ctx.update_operations_elapsed(timer);
208                if matches!(err, error::Error::ExceededDeadline { .. }) {
209                    error!("Failed to upgrade region, exceeded deadline");
210                    break;
211                } else if err.is_retryable() && retry < self.optimistic_retry {
212                    error!("Failed to upgrade region, error: {err:?}, retry later");
213                    sleep(self.retry_initial_interval).await;
214                } else {
215                    error!("Failed to upgrade region, error: {err:?}");
216                    break;
217                }
218            } else {
219                ctx.update_operations_elapsed(timer);
220                upgraded = true;
221                break;
222            }
223        }
224
225        upgraded
226    }
227}
228
229#[cfg(test)]
230mod tests {
231    use std::assert_matches::assert_matches;
232
233    use common_meta::peer::Peer;
234    use store_api::storage::RegionId;
235
236    use super::*;
237    use crate::error::Error;
238    use crate::procedure::region_migration::test_util::{new_procedure_context, TestingEnv};
239    use crate::procedure::region_migration::{ContextFactory, PersistentContext};
240    use crate::procedure::test_util::{
241        new_close_region_reply, new_upgrade_region_reply, send_mock_reply,
242    };
243
244    fn new_persistent_context() -> PersistentContext {
245        PersistentContext {
246            catalog: "greptime".into(),
247            schema: "public".into(),
248            from_peer: Peer::empty(1),
249            to_peer: Peer::empty(2),
250            region_id: RegionId::new(1024, 1),
251            timeout: Duration::from_millis(1000),
252        }
253    }
254
255    #[tokio::test]
256    async fn test_datanode_is_unreachable() {
257        let state = UpgradeCandidateRegion::default();
258        let persistent_context = new_persistent_context();
259        let env = TestingEnv::new();
260        let ctx = env.context_factory().new_context(persistent_context);
261
262        let err = state.upgrade_region(&ctx).await.unwrap_err();
263
264        assert_matches!(err, Error::PusherNotFound { .. });
265        assert!(!err.is_retryable());
266    }
267
268    #[tokio::test]
269    async fn test_pusher_dropped() {
270        let state = UpgradeCandidateRegion::default();
271        let persistent_context = new_persistent_context();
272        let to_peer_id = persistent_context.to_peer.id;
273
274        let mut env = TestingEnv::new();
275        let ctx = env.context_factory().new_context(persistent_context);
276        let mailbox_ctx = env.mailbox_context();
277
278        let (tx, rx) = tokio::sync::mpsc::channel(1);
279
280        mailbox_ctx
281            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
282            .await;
283
284        drop(rx);
285
286        let err = state.upgrade_region(&ctx).await.unwrap_err();
287
288        assert_matches!(err, Error::PushMessage { .. });
289        assert!(!err.is_retryable());
290    }
291
292    #[tokio::test]
293    async fn test_procedure_exceeded_deadline() {
294        let state = UpgradeCandidateRegion::default();
295        let persistent_context = new_persistent_context();
296        let env = TestingEnv::new();
297        let mut ctx = env.context_factory().new_context(persistent_context);
298        ctx.volatile_ctx.metrics.operations_elapsed =
299            ctx.persistent_ctx.timeout + Duration::from_secs(1);
300
301        let err = state.upgrade_region(&ctx).await.unwrap_err();
302
303        assert_matches!(err, Error::ExceededDeadline { .. });
304        assert!(!err.is_retryable());
305    }
306
307    #[tokio::test]
308    async fn test_unexpected_instruction_reply() {
309        let state = UpgradeCandidateRegion::default();
310        let persistent_context = new_persistent_context();
311        let to_peer_id = persistent_context.to_peer.id;
312
313        let mut env = TestingEnv::new();
314        let ctx = env.context_factory().new_context(persistent_context);
315        let mailbox_ctx = env.mailbox_context();
316        let mailbox = mailbox_ctx.mailbox().clone();
317
318        let (tx, rx) = tokio::sync::mpsc::channel(1);
319
320        mailbox_ctx
321            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
322            .await;
323
324        send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
325
326        let err = state.upgrade_region(&ctx).await.unwrap_err();
327        assert_matches!(err, Error::UnexpectedInstructionReply { .. });
328        assert!(!err.is_retryable());
329    }
330
331    #[tokio::test]
332    async fn test_upgrade_region_failed() {
333        let state = UpgradeCandidateRegion::default();
334        let persistent_context = new_persistent_context();
335        let to_peer_id = persistent_context.to_peer.id;
336
337        let mut env = TestingEnv::new();
338        let ctx = env.context_factory().new_context(persistent_context);
339        let mailbox_ctx = env.mailbox_context();
340        let mailbox = mailbox_ctx.mailbox().clone();
341
342        let (tx, rx) = tokio::sync::mpsc::channel(1);
343
344        mailbox_ctx
345            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
346            .await;
347
348        // A reply contains an error.
349        send_mock_reply(mailbox, rx, |id| {
350            Ok(new_upgrade_region_reply(
351                id,
352                true,
353                true,
354                Some("test mocked".to_string()),
355            ))
356        });
357
358        let err = state.upgrade_region(&ctx).await.unwrap_err();
359
360        assert_matches!(err, Error::RetryLater { .. });
361        assert!(err.is_retryable());
362        assert!(format!("{err:?}").contains("test mocked"));
363    }
364
365    #[tokio::test]
366    async fn test_upgrade_region_not_found() {
367        let state = UpgradeCandidateRegion::default();
368        let persistent_context = new_persistent_context();
369        let to_peer_id = persistent_context.to_peer.id;
370
371        let mut env = TestingEnv::new();
372        let ctx = env.context_factory().new_context(persistent_context);
373        let mailbox_ctx = env.mailbox_context();
374        let mailbox = mailbox_ctx.mailbox().clone();
375
376        let (tx, rx) = tokio::sync::mpsc::channel(1);
377
378        mailbox_ctx
379            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
380            .await;
381
382        send_mock_reply(mailbox, rx, |id| {
383            Ok(new_upgrade_region_reply(id, true, false, None))
384        });
385
386        let err = state.upgrade_region(&ctx).await.unwrap_err();
387
388        assert_matches!(err, Error::Unexpected { .. });
389        assert!(!err.is_retryable());
390        assert!(err.to_string().contains("doesn't exist"));
391    }
392
393    #[tokio::test]
394    async fn test_upgrade_region_require_ready() {
395        let mut state = UpgradeCandidateRegion {
396            require_ready: true,
397            ..Default::default()
398        };
399
400        let persistent_context = new_persistent_context();
401        let to_peer_id = persistent_context.to_peer.id;
402
403        let mut env = TestingEnv::new();
404        let ctx = env.context_factory().new_context(persistent_context);
405        let mailbox_ctx = env.mailbox_context();
406        let mailbox = mailbox_ctx.mailbox().clone();
407
408        let (tx, rx) = tokio::sync::mpsc::channel(1);
409
410        mailbox_ctx
411            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
412            .await;
413
414        send_mock_reply(mailbox, rx, |id| {
415            Ok(new_upgrade_region_reply(id, false, true, None))
416        });
417
418        let err = state.upgrade_region(&ctx).await.unwrap_err();
419
420        assert_matches!(err, Error::RetryLater { .. });
421        assert!(err.is_retryable());
422        assert!(format!("{err:?}").contains("still replaying the wal"));
423
424        // Sets the `require_ready` to false.
425        state.require_ready = false;
426
427        let mailbox = mailbox_ctx.mailbox().clone();
428        let (tx, rx) = tokio::sync::mpsc::channel(1);
429
430        mailbox_ctx
431            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
432            .await;
433
434        send_mock_reply(mailbox, rx, |id| {
435            Ok(new_upgrade_region_reply(id, false, true, None))
436        });
437
438        state.upgrade_region(&ctx).await.unwrap();
439    }
440
441    #[tokio::test]
442    async fn test_upgrade_region_with_retry_ok() {
443        let mut state = Box::<UpgradeCandidateRegion>::default();
444        state.retry_initial_interval = Duration::from_millis(100);
445        let persistent_context = new_persistent_context();
446        let to_peer_id = persistent_context.to_peer.id;
447
448        let mut env = TestingEnv::new();
449        let mut ctx = env.context_factory().new_context(persistent_context);
450        let mailbox_ctx = env.mailbox_context();
451        let mailbox = mailbox_ctx.mailbox().clone();
452
453        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
454
455        mailbox_ctx
456            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
457            .await;
458
459        common_runtime::spawn_global(async move {
460            let resp = rx.recv().await.unwrap().unwrap();
461            let reply_id = resp.mailbox_message.unwrap().id;
462            mailbox
463                .on_recv(
464                    reply_id,
465                    Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
466                )
467                .await
468                .unwrap();
469
470            // retry: 1
471            let resp = rx.recv().await.unwrap().unwrap();
472            let reply_id = resp.mailbox_message.unwrap().id;
473            mailbox
474                .on_recv(
475                    reply_id,
476                    Ok(new_upgrade_region_reply(reply_id, false, true, None)),
477                )
478                .await
479                .unwrap();
480
481            // retry: 2
482            let resp = rx.recv().await.unwrap().unwrap();
483            let reply_id = resp.mailbox_message.unwrap().id;
484            mailbox
485                .on_recv(
486                    reply_id,
487                    Ok(new_upgrade_region_reply(reply_id, true, true, None)),
488                )
489                .await
490                .unwrap();
491        });
492
493        let procedure_ctx = new_procedure_context();
494        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
495
496        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
497
498        assert_matches!(update_metadata, UpdateMetadata::Upgrade);
499    }
500
501    #[tokio::test]
502    async fn test_upgrade_region_with_retry_failed() {
503        let mut state = Box::<UpgradeCandidateRegion>::default();
504        state.retry_initial_interval = Duration::from_millis(100);
505        let persistent_context = new_persistent_context();
506        let to_peer_id = persistent_context.to_peer.id;
507
508        let mut env = TestingEnv::new();
509        let mut ctx = env.context_factory().new_context(persistent_context);
510        let mailbox_ctx = env.mailbox_context();
511        let mailbox = mailbox_ctx.mailbox().clone();
512
513        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
514
515        mailbox_ctx
516            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
517            .await;
518
519        common_runtime::spawn_global(async move {
520            let resp = rx.recv().await.unwrap().unwrap();
521            let reply_id = resp.mailbox_message.unwrap().id;
522            mailbox
523                .on_recv(
524                    reply_id,
525                    Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
526                )
527                .await
528                .unwrap();
529
530            // retry: 1
531            let resp = rx.recv().await.unwrap().unwrap();
532            let reply_id = resp.mailbox_message.unwrap().id;
533            mailbox
534                .on_recv(
535                    reply_id,
536                    Ok(new_upgrade_region_reply(reply_id, false, true, None)),
537                )
538                .await
539                .unwrap();
540
541            // retry: 2
542            let resp = rx.recv().await.unwrap().unwrap();
543            let reply_id = resp.mailbox_message.unwrap().id;
544            mailbox
545                .on_recv(
546                    reply_id,
547                    Ok(new_upgrade_region_reply(reply_id, false, false, None)),
548                )
549                .await
550                .unwrap();
551        });
552        let procedure_ctx = new_procedure_context();
553        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
554
555        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
556        assert_matches!(update_metadata, UpdateMetadata::Rollback);
557    }
558
559    #[tokio::test]
560    async fn test_upgrade_region_procedure_exceeded_deadline() {
561        let mut state = Box::<UpgradeCandidateRegion>::default();
562        state.retry_initial_interval = Duration::from_millis(100);
563        let persistent_context = new_persistent_context();
564        let to_peer_id = persistent_context.to_peer.id;
565
566        let mut env = TestingEnv::new();
567        let mut ctx = env.context_factory().new_context(persistent_context);
568        let mailbox_ctx = env.mailbox_context();
569        let mailbox = mailbox_ctx.mailbox().clone();
570        ctx.volatile_ctx.metrics.operations_elapsed =
571            ctx.persistent_ctx.timeout + Duration::from_secs(1);
572
573        let (tx, rx) = tokio::sync::mpsc::channel(1);
574        mailbox_ctx
575            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
576            .await;
577
578        send_mock_reply(mailbox, rx, |id| {
579            Ok(new_upgrade_region_reply(id, false, true, None))
580        });
581        let procedure_ctx = new_procedure_context();
582        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
583        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
584        assert_matches!(update_metadata, UpdateMetadata::Rollback);
585    }
586}