meta_srv/procedure/region_migration/
flush_leader_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use common_procedure::{Context as ProcedureContext, Status};
18use serde::{Deserialize, Serialize};
19use snafu::OptionExt;
20use tokio::time::Instant;
21
22use crate::error::{self, Result};
23use crate::procedure::region_migration::update_metadata::UpdateMetadata;
24use crate::procedure::region_migration::{Context, State};
25use crate::procedure::utils;
26
27/// Flushes the leader region before downgrading it.
28///
29/// This can minimize the time window where the region is not writable.
30#[derive(Debug, Serialize, Deserialize)]
31pub struct PreFlushRegion;
32
33#[async_trait::async_trait]
34#[typetag::serde]
35impl State for PreFlushRegion {
36    async fn next(
37        &mut self,
38        ctx: &mut Context,
39        _procedure_ctx: &ProcedureContext,
40    ) -> Result<(Box<dyn State>, Status)> {
41        let timer = Instant::now();
42        self.flush_region(ctx).await?;
43        ctx.update_flush_leader_region_elapsed(timer);
44        // We intentionally don't update `operations_elapsed` here to prevent
45        // the `next_operation_timeout` from being reduced by the flush operation.
46        // This ensures sufficient time for subsequent critical operations.
47
48        Ok((
49            Box::new(UpdateMetadata::Downgrade),
50            Status::executing(false),
51        ))
52    }
53
54    fn as_any(&self) -> &dyn Any {
55        self
56    }
57}
58
59impl PreFlushRegion {
60    /// Tries to flush a leader region.
61    ///
62    /// Ignore:
63    /// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable.
64    /// - [PushMessage](error::Error::PushMessage), The receiver is dropped.
65    /// - Failed to flush region on the Datanode.
66    ///
67    /// Abort:
68    /// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout.
69    /// - [MailboxReceiver](error::Error::MailboxReceiver), The sender is dropped without sending (impossible).
70    /// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply).
71    /// - [ExceededDeadline](error::Error::ExceededDeadline)
72    /// - Invalid JSON.
73    async fn flush_region(&self, ctx: &mut Context) -> Result<()> {
74        let operation_timeout =
75            ctx.next_operation_timeout()
76                .context(error::ExceededDeadlineSnafu {
77                    operation: "Flush leader region",
78                })?;
79        let region_ids = &ctx.persistent_ctx.region_ids;
80        let leader = &ctx.persistent_ctx.from_peer;
81
82        utils::flush_region(
83            &ctx.mailbox,
84            &ctx.server_addr,
85            region_ids,
86            leader,
87            operation_timeout,
88            utils::ErrorStrategy::Ignore,
89        )
90        .await
91    }
92}
93
94#[cfg(test)]
95mod tests {
96    use std::assert_matches::assert_matches;
97
98    use store_api::storage::RegionId;
99
100    use super::*;
101    use crate::error::Error;
102    use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
103    use crate::procedure::region_migration::{ContextFactory, PersistentContext};
104    use crate::procedure::test_util::{
105        new_close_region_reply, new_flush_region_reply_for_region, send_mock_reply,
106    };
107    use crate::service::mailbox::Channel;
108
109    fn new_persistent_context() -> PersistentContext {
110        test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
111    }
112
113    #[tokio::test]
114    async fn test_datanode_is_unreachable() {
115        let state = PreFlushRegion;
116        // from_peer: 1
117        // to_peer: 2
118        let persistent_context = new_persistent_context();
119        let env = TestingEnv::new();
120        let mut ctx = env.context_factory().new_context(persistent_context);
121        // Should be ok, if leader region is unreachable. it will skip flush operation.
122        state.flush_region(&mut ctx).await.unwrap();
123    }
124
125    #[tokio::test]
126    async fn test_unexpected_instruction_reply() {
127        common_telemetry::init_default_ut_logging();
128        let state = PreFlushRegion;
129        // from_peer: 1
130        // to_peer: 2
131        let persistent_context = new_persistent_context();
132        let from_peer_id = persistent_context.from_peer.id;
133        let mut env = TestingEnv::new();
134        let mut ctx = env.context_factory().new_context(persistent_context);
135        let mailbox_ctx = env.mailbox_context();
136        let mailbox = mailbox_ctx.mailbox().clone();
137        let (tx, rx) = tokio::sync::mpsc::channel(1);
138        mailbox_ctx
139            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
140            .await;
141        // Sends an incorrect reply.
142        send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
143        let err = state.flush_region(&mut ctx).await.unwrap_err();
144        assert_matches!(err, Error::UnexpectedInstructionReply { .. });
145        assert!(!err.is_retryable());
146    }
147
148    #[tokio::test]
149    async fn test_instruction_exceeded_deadline() {
150        let state = PreFlushRegion;
151        // from_peer: 1
152        // to_peer: 2
153        let persistent_context = new_persistent_context();
154        let from_peer_id = persistent_context.from_peer.id;
155        let mut env = TestingEnv::new();
156        let mut ctx = env.context_factory().new_context(persistent_context);
157        let mailbox_ctx = env.mailbox_context();
158        let mailbox = mailbox_ctx.mailbox().clone();
159        let (tx, rx) = tokio::sync::mpsc::channel(1);
160        mailbox_ctx
161            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
162            .await;
163        // Sends an timeout error.
164        send_mock_reply(mailbox, rx, |id| {
165            Err(error::MailboxTimeoutSnafu { id }.build())
166        });
167
168        let err = state.flush_region(&mut ctx).await.unwrap_err();
169        assert_matches!(err, Error::ExceededDeadline { .. });
170        assert!(!err.is_retryable());
171    }
172
173    #[tokio::test]
174    async fn test_flush_region_failed() {
175        common_telemetry::init_default_ut_logging();
176        let state = PreFlushRegion;
177        // from_peer: 1
178        // to_peer: 2
179        let persistent_context = new_persistent_context();
180        let from_peer_id = persistent_context.from_peer.id;
181        let region_id = persistent_context.region_ids[0];
182        let mut env = TestingEnv::new();
183        let mut ctx = env.context_factory().new_context(persistent_context);
184        let mailbox_ctx = env.mailbox_context();
185        let mailbox = mailbox_ctx.mailbox().clone();
186        let (tx, rx) = tokio::sync::mpsc::channel(1);
187        mailbox_ctx
188            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
189            .await;
190        send_mock_reply(mailbox, rx, move |id| {
191            Ok(new_flush_region_reply_for_region(
192                id,
193                region_id,
194                false,
195                Some("test mocked".to_string()),
196            ))
197        });
198        // Should be ok, if flush leader region failed. it will skip flush operation.
199        state.flush_region(&mut ctx).await.unwrap();
200    }
201
202    #[tokio::test]
203    async fn test_next_update_metadata_downgrade_state() {
204        common_telemetry::init_default_ut_logging();
205        let mut state = PreFlushRegion;
206        // from_peer: 1
207        // to_peer: 2
208        let persistent_context = new_persistent_context();
209        let from_peer_id = persistent_context.from_peer.id;
210        let region_id = persistent_context.region_ids[0];
211        let mut env = TestingEnv::new();
212        let mut ctx = env.context_factory().new_context(persistent_context);
213        let mailbox_ctx = env.mailbox_context();
214        let mailbox = mailbox_ctx.mailbox().clone();
215        let (tx, rx) = tokio::sync::mpsc::channel(1);
216        mailbox_ctx
217            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
218            .await;
219        send_mock_reply(mailbox, rx, move |id| {
220            Ok(new_flush_region_reply_for_region(id, region_id, true, None))
221        });
222        let procedure_ctx = new_procedure_context();
223        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
224
225        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
226        assert_matches!(update_metadata, UpdateMetadata::Downgrade);
227    }
228}