meta_srv/procedure/region_migration/
upgrade_candidate_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::time::Duration;
17
18use api::v1::meta::MailboxMessage;
19use common_meta::instruction::{Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply};
20use common_procedure::{Context as ProcedureContext, Status};
21use common_telemetry::error;
22use serde::{Deserialize, Serialize};
23use snafu::{ensure, OptionExt, ResultExt};
24use tokio::time::{sleep, Instant};
25
26use crate::error::{self, Result};
27use crate::handler::HeartbeatMailbox;
28use crate::procedure::region_migration::update_metadata::UpdateMetadata;
29use crate::procedure::region_migration::{Context, State};
30use crate::service::mailbox::Channel;
31
32#[derive(Debug, Serialize, Deserialize)]
33pub struct UpgradeCandidateRegion {
34    // The optimistic retry times.
35    pub(crate) optimistic_retry: usize,
36    // The retry initial interval.
37    pub(crate) retry_initial_interval: Duration,
38    // If it's true it requires the candidate region MUST replay the WAL to the latest entry id.
39    // Otherwise, it will rollback to the old leader region.
40    pub(crate) require_ready: bool,
41}
42
43impl Default for UpgradeCandidateRegion {
44    fn default() -> Self {
45        Self {
46            optimistic_retry: 3,
47            retry_initial_interval: Duration::from_millis(500),
48            require_ready: true,
49        }
50    }
51}
52
53#[async_trait::async_trait]
54#[typetag::serde]
55impl State for UpgradeCandidateRegion {
56    async fn next(
57        &mut self,
58        ctx: &mut Context,
59        _procedure_ctx: &ProcedureContext,
60    ) -> Result<(Box<dyn State>, Status)> {
61        let now = Instant::now();
62        if self.upgrade_region_with_retry(ctx).await {
63            ctx.update_upgrade_candidate_region_elapsed(now);
64            Ok((Box::new(UpdateMetadata::Upgrade), Status::executing(false)))
65        } else {
66            ctx.update_upgrade_candidate_region_elapsed(now);
67            Ok((Box::new(UpdateMetadata::Rollback), Status::executing(false)))
68        }
69    }
70
71    fn as_any(&self) -> &dyn Any {
72        self
73    }
74}
75
76impl UpgradeCandidateRegion {
77    /// Builds upgrade region instruction.
78    fn build_upgrade_region_instruction(
79        &self,
80        ctx: &Context,
81        replay_timeout: Duration,
82    ) -> Instruction {
83        let pc = &ctx.persistent_ctx;
84        let region_id = pc.region_id;
85        let last_entry_id = ctx.volatile_ctx.leader_region_last_entry_id;
86        let metadata_last_entry_id = ctx.volatile_ctx.leader_region_metadata_last_entry_id;
87
88        Instruction::UpgradeRegion(UpgradeRegion {
89            region_id,
90            last_entry_id,
91            metadata_last_entry_id,
92            replay_timeout: Some(replay_timeout),
93            location_id: Some(ctx.persistent_ctx.from_peer.id),
94        })
95    }
96
97    /// Tries to upgrade a candidate region.
98    ///
99    /// Retry:
100    /// - If `require_ready` is true, but the candidate region returns `ready` is false.
101    /// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout.
102    ///
103    /// Abort:
104    /// - The candidate region doesn't exist.
105    /// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable.
106    /// - [PushMessage](error::Error::PushMessage), The receiver is dropped.
107    /// - [MailboxReceiver](error::Error::MailboxReceiver), The sender is dropped without sending (impossible).
108    /// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply) (impossible).
109    /// - [ExceededDeadline](error::Error::ExceededDeadline)
110    /// - Invalid JSON (impossible).
111    async fn upgrade_region(&self, ctx: &Context) -> Result<()> {
112        let pc = &ctx.persistent_ctx;
113        let region_id = pc.region_id;
114        let candidate = &pc.to_peer;
115        let operation_timeout =
116            ctx.next_operation_timeout()
117                .context(error::ExceededDeadlineSnafu {
118                    operation: "Upgrade region",
119                })?;
120        let upgrade_instruction = self.build_upgrade_region_instruction(ctx, operation_timeout);
121
122        let msg = MailboxMessage::json_message(
123            &format!("Upgrade candidate region: {}", region_id),
124            &format!("Metasrv@{}", ctx.server_addr()),
125            &format!("Datanode-{}@{}", candidate.id, candidate.addr),
126            common_time::util::current_time_millis(),
127            &upgrade_instruction,
128        )
129        .with_context(|_| error::SerializeToJsonSnafu {
130            input: upgrade_instruction.to_string(),
131        })?;
132
133        let ch = Channel::Datanode(candidate.id);
134        let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
135
136        match receiver.await {
137            Ok(msg) => {
138                let reply = HeartbeatMailbox::json_reply(&msg)?;
139                let InstructionReply::UpgradeRegion(UpgradeRegionReply {
140                    ready,
141                    exists,
142                    error,
143                }) = reply
144                else {
145                    return error::UnexpectedInstructionReplySnafu {
146                        mailbox_message: msg.to_string(),
147                        reason: "Unexpected reply of the upgrade region instruction",
148                    }
149                    .fail();
150                };
151
152                // Notes: The order of handling is important.
153                if error.is_some() {
154                    return error::RetryLaterSnafu {
155                        reason: format!(
156                            "Failed to upgrade the region {} on datanode {:?}, error: {:?}",
157                            region_id, candidate, error
158                        ),
159                    }
160                    .fail();
161                }
162
163                ensure!(
164                    exists,
165                    error::UnexpectedSnafu {
166                        violated: format!(
167                            "Candidate region {} doesn't exist on datanode {:?}",
168                            region_id, candidate
169                        )
170                    }
171                );
172
173                if self.require_ready && !ready {
174                    return error::RetryLaterSnafu {
175                        reason: format!(
176                            "Candidate region {} still replaying the wal on datanode {:?}",
177                            region_id, candidate
178                        ),
179                    }
180                    .fail();
181                }
182
183                Ok(())
184            }
185            Err(error::Error::MailboxTimeout { .. }) => {
186                let reason = format!(
187                        "Mailbox received timeout for upgrade candidate region {region_id} on datanode {:?}", 
188                        candidate,
189                    );
190                error::RetryLaterSnafu { reason }.fail()
191            }
192            Err(err) => Err(err),
193        }
194    }
195
196    /// Upgrades a candidate region.
197    ///
198    /// Returns true if the candidate region is upgraded successfully.
199    async fn upgrade_region_with_retry(&self, ctx: &mut Context) -> bool {
200        let mut retry = 0;
201        let mut upgraded = false;
202
203        loop {
204            let timer = Instant::now();
205            if let Err(err) = self.upgrade_region(ctx).await {
206                retry += 1;
207                ctx.update_operations_elapsed(timer);
208                if matches!(err, error::Error::ExceededDeadline { .. }) {
209                    error!("Failed to upgrade region, exceeded deadline");
210                    break;
211                } else if err.is_retryable() && retry < self.optimistic_retry {
212                    error!("Failed to upgrade region, error: {err:?}, retry later");
213                    sleep(self.retry_initial_interval).await;
214                } else {
215                    error!("Failed to upgrade region, error: {err:?}");
216                    break;
217                }
218            } else {
219                ctx.update_operations_elapsed(timer);
220                upgraded = true;
221                break;
222            }
223        }
224
225        upgraded
226    }
227}
228
229#[cfg(test)]
230mod tests {
231    use std::assert_matches::assert_matches;
232
233    use common_meta::peer::Peer;
234    use store_api::storage::RegionId;
235
236    use super::*;
237    use crate::error::Error;
238    use crate::procedure::region_migration::manager::RegionMigrationTriggerReason;
239    use crate::procedure::region_migration::test_util::{new_procedure_context, TestingEnv};
240    use crate::procedure::region_migration::{ContextFactory, PersistentContext};
241    use crate::procedure::test_util::{
242        new_close_region_reply, new_upgrade_region_reply, send_mock_reply,
243    };
244
245    fn new_persistent_context() -> PersistentContext {
246        PersistentContext {
247            catalog: "greptime".into(),
248            schema: "public".into(),
249            from_peer: Peer::empty(1),
250            to_peer: Peer::empty(2),
251            region_id: RegionId::new(1024, 1),
252            timeout: Duration::from_millis(1000),
253            trigger_reason: RegionMigrationTriggerReason::Manual,
254        }
255    }
256
257    #[tokio::test]
258    async fn test_datanode_is_unreachable() {
259        let state = UpgradeCandidateRegion::default();
260        let persistent_context = new_persistent_context();
261        let env = TestingEnv::new();
262        let ctx = env.context_factory().new_context(persistent_context);
263
264        let err = state.upgrade_region(&ctx).await.unwrap_err();
265
266        assert_matches!(err, Error::PusherNotFound { .. });
267        assert!(!err.is_retryable());
268    }
269
270    #[tokio::test]
271    async fn test_pusher_dropped() {
272        let state = UpgradeCandidateRegion::default();
273        let persistent_context = new_persistent_context();
274        let to_peer_id = persistent_context.to_peer.id;
275
276        let mut env = TestingEnv::new();
277        let ctx = env.context_factory().new_context(persistent_context);
278        let mailbox_ctx = env.mailbox_context();
279
280        let (tx, rx) = tokio::sync::mpsc::channel(1);
281
282        mailbox_ctx
283            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
284            .await;
285
286        drop(rx);
287
288        let err = state.upgrade_region(&ctx).await.unwrap_err();
289
290        assert_matches!(err, Error::PushMessage { .. });
291        assert!(!err.is_retryable());
292    }
293
294    #[tokio::test]
295    async fn test_procedure_exceeded_deadline() {
296        let state = UpgradeCandidateRegion::default();
297        let persistent_context = new_persistent_context();
298        let env = TestingEnv::new();
299        let mut ctx = env.context_factory().new_context(persistent_context);
300        ctx.volatile_ctx.metrics.operations_elapsed =
301            ctx.persistent_ctx.timeout + Duration::from_secs(1);
302
303        let err = state.upgrade_region(&ctx).await.unwrap_err();
304
305        assert_matches!(err, Error::ExceededDeadline { .. });
306        assert!(!err.is_retryable());
307    }
308
309    #[tokio::test]
310    async fn test_unexpected_instruction_reply() {
311        let state = UpgradeCandidateRegion::default();
312        let persistent_context = new_persistent_context();
313        let to_peer_id = persistent_context.to_peer.id;
314
315        let mut env = TestingEnv::new();
316        let ctx = env.context_factory().new_context(persistent_context);
317        let mailbox_ctx = env.mailbox_context();
318        let mailbox = mailbox_ctx.mailbox().clone();
319
320        let (tx, rx) = tokio::sync::mpsc::channel(1);
321
322        mailbox_ctx
323            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
324            .await;
325
326        send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
327
328        let err = state.upgrade_region(&ctx).await.unwrap_err();
329        assert_matches!(err, Error::UnexpectedInstructionReply { .. });
330        assert!(!err.is_retryable());
331    }
332
333    #[tokio::test]
334    async fn test_upgrade_region_failed() {
335        let state = UpgradeCandidateRegion::default();
336        let persistent_context = new_persistent_context();
337        let to_peer_id = persistent_context.to_peer.id;
338
339        let mut env = TestingEnv::new();
340        let ctx = env.context_factory().new_context(persistent_context);
341        let mailbox_ctx = env.mailbox_context();
342        let mailbox = mailbox_ctx.mailbox().clone();
343
344        let (tx, rx) = tokio::sync::mpsc::channel(1);
345
346        mailbox_ctx
347            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
348            .await;
349
350        // A reply contains an error.
351        send_mock_reply(mailbox, rx, |id| {
352            Ok(new_upgrade_region_reply(
353                id,
354                true,
355                true,
356                Some("test mocked".to_string()),
357            ))
358        });
359
360        let err = state.upgrade_region(&ctx).await.unwrap_err();
361
362        assert_matches!(err, Error::RetryLater { .. });
363        assert!(err.is_retryable());
364        assert!(format!("{err:?}").contains("test mocked"));
365    }
366
367    #[tokio::test]
368    async fn test_upgrade_region_not_found() {
369        let state = UpgradeCandidateRegion::default();
370        let persistent_context = new_persistent_context();
371        let to_peer_id = persistent_context.to_peer.id;
372
373        let mut env = TestingEnv::new();
374        let ctx = env.context_factory().new_context(persistent_context);
375        let mailbox_ctx = env.mailbox_context();
376        let mailbox = mailbox_ctx.mailbox().clone();
377
378        let (tx, rx) = tokio::sync::mpsc::channel(1);
379
380        mailbox_ctx
381            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
382            .await;
383
384        send_mock_reply(mailbox, rx, |id| {
385            Ok(new_upgrade_region_reply(id, true, false, None))
386        });
387
388        let err = state.upgrade_region(&ctx).await.unwrap_err();
389
390        assert_matches!(err, Error::Unexpected { .. });
391        assert!(!err.is_retryable());
392        assert!(err.to_string().contains("doesn't exist"));
393    }
394
395    #[tokio::test]
396    async fn test_upgrade_region_require_ready() {
397        let mut state = UpgradeCandidateRegion {
398            require_ready: true,
399            ..Default::default()
400        };
401
402        let persistent_context = new_persistent_context();
403        let to_peer_id = persistent_context.to_peer.id;
404
405        let mut env = TestingEnv::new();
406        let ctx = env.context_factory().new_context(persistent_context);
407        let mailbox_ctx = env.mailbox_context();
408        let mailbox = mailbox_ctx.mailbox().clone();
409
410        let (tx, rx) = tokio::sync::mpsc::channel(1);
411
412        mailbox_ctx
413            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
414            .await;
415
416        send_mock_reply(mailbox, rx, |id| {
417            Ok(new_upgrade_region_reply(id, false, true, None))
418        });
419
420        let err = state.upgrade_region(&ctx).await.unwrap_err();
421
422        assert_matches!(err, Error::RetryLater { .. });
423        assert!(err.is_retryable());
424        assert!(format!("{err:?}").contains("still replaying the wal"));
425
426        // Sets the `require_ready` to false.
427        state.require_ready = false;
428
429        let mailbox = mailbox_ctx.mailbox().clone();
430        let (tx, rx) = tokio::sync::mpsc::channel(1);
431
432        mailbox_ctx
433            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
434            .await;
435
436        send_mock_reply(mailbox, rx, |id| {
437            Ok(new_upgrade_region_reply(id, false, true, None))
438        });
439
440        state.upgrade_region(&ctx).await.unwrap();
441    }
442
443    #[tokio::test]
444    async fn test_upgrade_region_with_retry_ok() {
445        let mut state = Box::<UpgradeCandidateRegion>::default();
446        state.retry_initial_interval = Duration::from_millis(100);
447        let persistent_context = new_persistent_context();
448        let to_peer_id = persistent_context.to_peer.id;
449
450        let mut env = TestingEnv::new();
451        let mut ctx = env.context_factory().new_context(persistent_context);
452        let mailbox_ctx = env.mailbox_context();
453        let mailbox = mailbox_ctx.mailbox().clone();
454
455        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
456
457        mailbox_ctx
458            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
459            .await;
460
461        common_runtime::spawn_global(async move {
462            let resp = rx.recv().await.unwrap().unwrap();
463            let reply_id = resp.mailbox_message.unwrap().id;
464            mailbox
465                .on_recv(
466                    reply_id,
467                    Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
468                )
469                .await
470                .unwrap();
471
472            // retry: 1
473            let resp = rx.recv().await.unwrap().unwrap();
474            let reply_id = resp.mailbox_message.unwrap().id;
475            mailbox
476                .on_recv(
477                    reply_id,
478                    Ok(new_upgrade_region_reply(reply_id, false, true, None)),
479                )
480                .await
481                .unwrap();
482
483            // retry: 2
484            let resp = rx.recv().await.unwrap().unwrap();
485            let reply_id = resp.mailbox_message.unwrap().id;
486            mailbox
487                .on_recv(
488                    reply_id,
489                    Ok(new_upgrade_region_reply(reply_id, true, true, None)),
490                )
491                .await
492                .unwrap();
493        });
494
495        let procedure_ctx = new_procedure_context();
496        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
497
498        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
499
500        assert_matches!(update_metadata, UpdateMetadata::Upgrade);
501    }
502
503    #[tokio::test]
504    async fn test_upgrade_region_with_retry_failed() {
505        let mut state = Box::<UpgradeCandidateRegion>::default();
506        state.retry_initial_interval = Duration::from_millis(100);
507        let persistent_context = new_persistent_context();
508        let to_peer_id = persistent_context.to_peer.id;
509
510        let mut env = TestingEnv::new();
511        let mut ctx = env.context_factory().new_context(persistent_context);
512        let mailbox_ctx = env.mailbox_context();
513        let mailbox = mailbox_ctx.mailbox().clone();
514
515        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
516
517        mailbox_ctx
518            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
519            .await;
520
521        common_runtime::spawn_global(async move {
522            let resp = rx.recv().await.unwrap().unwrap();
523            let reply_id = resp.mailbox_message.unwrap().id;
524            mailbox
525                .on_recv(
526                    reply_id,
527                    Err(error::MailboxTimeoutSnafu { id: reply_id }.build()),
528                )
529                .await
530                .unwrap();
531
532            // retry: 1
533            let resp = rx.recv().await.unwrap().unwrap();
534            let reply_id = resp.mailbox_message.unwrap().id;
535            mailbox
536                .on_recv(
537                    reply_id,
538                    Ok(new_upgrade_region_reply(reply_id, false, true, None)),
539                )
540                .await
541                .unwrap();
542
543            // retry: 2
544            let resp = rx.recv().await.unwrap().unwrap();
545            let reply_id = resp.mailbox_message.unwrap().id;
546            mailbox
547                .on_recv(
548                    reply_id,
549                    Ok(new_upgrade_region_reply(reply_id, false, false, None)),
550                )
551                .await
552                .unwrap();
553        });
554        let procedure_ctx = new_procedure_context();
555        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
556
557        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
558        assert_matches!(update_metadata, UpdateMetadata::Rollback);
559    }
560
561    #[tokio::test]
562    async fn test_upgrade_region_procedure_exceeded_deadline() {
563        let mut state = Box::<UpgradeCandidateRegion>::default();
564        state.retry_initial_interval = Duration::from_millis(100);
565        let persistent_context = new_persistent_context();
566        let to_peer_id = persistent_context.to_peer.id;
567
568        let mut env = TestingEnv::new();
569        let mut ctx = env.context_factory().new_context(persistent_context);
570        let mailbox_ctx = env.mailbox_context();
571        let mailbox = mailbox_ctx.mailbox().clone();
572        ctx.volatile_ctx.metrics.operations_elapsed =
573            ctx.persistent_ctx.timeout + Duration::from_secs(1);
574
575        let (tx, rx) = tokio::sync::mpsc::channel(1);
576        mailbox_ctx
577            .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
578            .await;
579
580        send_mock_reply(mailbox, rx, |id| {
581            Ok(new_upgrade_region_reply(id, false, true, None))
582        });
583        let procedure_ctx = new_procedure_context();
584        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
585        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
586        assert_matches!(update_metadata, UpdateMetadata::Rollback);
587    }
588}