Skip to main content

meta_srv/procedure/region_migration/
flush_leader_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use common_procedure::{Context as ProcedureContext, Status};
18use serde::{Deserialize, Serialize};
19use snafu::OptionExt;
20use store_api::region_request::RegionFlushReason;
21use tokio::time::Instant;
22
23use crate::error::{self, Result};
24use crate::procedure::region_migration::update_metadata::UpdateMetadata;
25use crate::procedure::region_migration::{Context, State};
26use crate::procedure::utils;
27
28/// Flushes the leader region before downgrading it.
29///
30/// This can minimize the time window where the region is not writable.
31#[derive(Debug, Serialize, Deserialize)]
32pub struct PreFlushRegion;
33
34#[async_trait::async_trait]
35#[typetag::serde]
36impl State for PreFlushRegion {
37    async fn next(
38        &mut self,
39        ctx: &mut Context,
40        _procedure_ctx: &ProcedureContext,
41    ) -> Result<(Box<dyn State>, Status)> {
42        let timer = Instant::now();
43        self.flush_region(ctx).await?;
44        ctx.update_flush_leader_region_elapsed(timer);
45        // We intentionally don't update `operations_elapsed` here to prevent
46        // the `next_operation_timeout` from being reduced by the flush operation.
47        // This ensures sufficient time for subsequent critical operations.
48
49        Ok((
50            Box::new(UpdateMetadata::Downgrade),
51            Status::executing(false),
52        ))
53    }
54
55    fn as_any(&self) -> &dyn Any {
56        self
57    }
58}
59
60impl PreFlushRegion {
61    /// Tries to flush a leader region.
62    ///
63    /// Ignore:
64    /// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable.
65    /// - [PushMessage](error::Error::PushMessage), The receiver is dropped.
66    /// - Failed to flush region on the Datanode.
67    ///
68    /// Abort:
69    /// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout.
70    /// - [MailboxReceiver](error::Error::MailboxReceiver), The sender is dropped without sending (impossible).
71    /// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply).
72    /// - [ExceededDeadline](error::Error::ExceededDeadline)
73    /// - Invalid JSON.
74    async fn flush_region(&self, ctx: &mut Context) -> Result<()> {
75        let operation_timeout =
76            ctx.next_operation_timeout()
77                .context(error::ExceededDeadlineSnafu {
78                    operation: "Flush leader region",
79                })?;
80        let region_ids = &ctx.persistent_ctx.region_ids;
81        let leader = &ctx.persistent_ctx.from_peer;
82
83        utils::flush_region(
84            &ctx.mailbox,
85            &ctx.server_addr,
86            region_ids,
87            leader,
88            operation_timeout,
89            utils::ErrorStrategy::Ignore,
90            Some(RegionFlushReason::RegionMigration),
91        )
92        .await
93    }
94}
95
96#[cfg(test)]
97mod tests {
98    use std::assert_matches;
99
100    use store_api::storage::RegionId;
101
102    use super::*;
103    use crate::error::Error;
104    use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
105    use crate::procedure::region_migration::{ContextFactory, PersistentContext};
106    use crate::procedure::test_util::{
107        new_close_region_reply, new_flush_region_reply_for_region, send_mock_reply,
108    };
109    use crate::service::mailbox::Channel;
110
111    fn new_persistent_context() -> PersistentContext {
112        test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
113    }
114
115    #[tokio::test]
116    async fn test_datanode_is_unreachable() {
117        let state = PreFlushRegion;
118        // from_peer: 1
119        // to_peer: 2
120        let persistent_context = new_persistent_context();
121        let env = TestingEnv::new();
122        let mut ctx = env.context_factory().new_context(persistent_context);
123        // Should be ok, if leader region is unreachable. it will skip flush operation.
124        state.flush_region(&mut ctx).await.unwrap();
125    }
126
127    #[tokio::test]
128    async fn test_unexpected_instruction_reply() {
129        common_telemetry::init_default_ut_logging();
130        let state = PreFlushRegion;
131        // from_peer: 1
132        // to_peer: 2
133        let persistent_context = new_persistent_context();
134        let from_peer_id = persistent_context.from_peer.id;
135        let mut env = TestingEnv::new();
136        let mut ctx = env.context_factory().new_context(persistent_context);
137        let mailbox_ctx = env.mailbox_context();
138        let mailbox = mailbox_ctx.mailbox().clone();
139        let (tx, rx) = tokio::sync::mpsc::channel(1);
140        mailbox_ctx
141            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
142            .await;
143        // Sends an incorrect reply.
144        send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
145        let err = state.flush_region(&mut ctx).await.unwrap_err();
146        assert_matches!(err, Error::UnexpectedInstructionReply { .. });
147        assert!(!err.is_retryable());
148    }
149
150    #[tokio::test]
151    async fn test_instruction_exceeded_deadline() {
152        let state = PreFlushRegion;
153        // from_peer: 1
154        // to_peer: 2
155        let persistent_context = new_persistent_context();
156        let from_peer_id = persistent_context.from_peer.id;
157        let mut env = TestingEnv::new();
158        let mut ctx = env.context_factory().new_context(persistent_context);
159        let mailbox_ctx = env.mailbox_context();
160        let mailbox = mailbox_ctx.mailbox().clone();
161        let (tx, rx) = tokio::sync::mpsc::channel(1);
162        mailbox_ctx
163            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
164            .await;
165        // Sends an timeout error.
166        send_mock_reply(mailbox, rx, |id| {
167            Err(error::MailboxTimeoutSnafu { id }.build())
168        });
169
170        let err = state.flush_region(&mut ctx).await.unwrap_err();
171        assert_matches!(err, Error::ExceededDeadline { .. });
172        assert!(!err.is_retryable());
173    }
174
175    #[tokio::test]
176    async fn test_flush_region_failed() {
177        common_telemetry::init_default_ut_logging();
178        let state = PreFlushRegion;
179        // from_peer: 1
180        // to_peer: 2
181        let persistent_context = new_persistent_context();
182        let from_peer_id = persistent_context.from_peer.id;
183        let region_id = persistent_context.region_ids[0];
184        let mut env = TestingEnv::new();
185        let mut ctx = env.context_factory().new_context(persistent_context);
186        let mailbox_ctx = env.mailbox_context();
187        let mailbox = mailbox_ctx.mailbox().clone();
188        let (tx, rx) = tokio::sync::mpsc::channel(1);
189        mailbox_ctx
190            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
191            .await;
192        send_mock_reply(mailbox, rx, move |id| {
193            Ok(new_flush_region_reply_for_region(
194                id,
195                region_id,
196                false,
197                Some("test mocked".to_string()),
198            ))
199        });
200        // Should be ok, if flush leader region failed. it will skip flush operation.
201        state.flush_region(&mut ctx).await.unwrap();
202    }
203
204    #[tokio::test]
205    async fn test_next_update_metadata_downgrade_state() {
206        common_telemetry::init_default_ut_logging();
207        let mut state = PreFlushRegion;
208        // from_peer: 1
209        // to_peer: 2
210        let persistent_context = new_persistent_context();
211        let from_peer_id = persistent_context.from_peer.id;
212        let region_id = persistent_context.region_ids[0];
213        let mut env = TestingEnv::new();
214        let mut ctx = env.context_factory().new_context(persistent_context);
215        let mailbox_ctx = env.mailbox_context();
216        let mailbox = mailbox_ctx.mailbox().clone();
217        let (tx, rx) = tokio::sync::mpsc::channel(1);
218        mailbox_ctx
219            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
220            .await;
221        send_mock_reply(mailbox, rx, move |id| {
222            Ok(new_flush_region_reply_for_region(id, region_id, true, None))
223        });
224        let procedure_ctx = new_procedure_context();
225        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
226
227        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
228        assert_matches!(update_metadata, UpdateMetadata::Downgrade);
229    }
230}