meta_srv/procedure/region_migration/
flush_leader_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use api::v1::meta::MailboxMessage;
18use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
19use common_procedure::{Context as ProcedureContext, Status};
20use common_telemetry::{info, warn};
21use serde::{Deserialize, Serialize};
22use snafu::{OptionExt, ResultExt};
23use tokio::time::Instant;
24
25use crate::error::{self, Error, Result};
26use crate::handler::HeartbeatMailbox;
27use crate::procedure::region_migration::update_metadata::UpdateMetadata;
28use crate::procedure::region_migration::{Context, State};
29use crate::service::mailbox::Channel;
30
31/// Flushes the leader region before downgrading it.
32///
33/// This can minimize the time window where the region is not writable.
34#[derive(Debug, Serialize, Deserialize)]
35pub struct PreFlushRegion;
36
37#[async_trait::async_trait]
38#[typetag::serde]
39impl State for PreFlushRegion {
40    async fn next(
41        &mut self,
42        ctx: &mut Context,
43        _procedure_ctx: &ProcedureContext,
44    ) -> Result<(Box<dyn State>, Status)> {
45        let timer = Instant::now();
46        self.flush_region(ctx).await?;
47        ctx.update_flush_leader_region_elapsed(timer);
48        // We intentionally don't update `operations_elapsed` here to prevent
49        // the `next_operation_timeout` from being reduced by the flush operation.
50        // This ensures sufficient time for subsequent critical operations.
51
52        Ok((
53            Box::new(UpdateMetadata::Downgrade),
54            Status::executing(false),
55        ))
56    }
57
58    fn as_any(&self) -> &dyn Any {
59        self
60    }
61}
62
63impl PreFlushRegion {
64    /// Builds flush leader region instruction.
65    fn build_flush_leader_region_instruction(&self, ctx: &Context) -> Instruction {
66        let pc = &ctx.persistent_ctx;
67        let region_id = pc.region_id;
68        Instruction::FlushRegion(region_id)
69    }
70
71    /// Tries to flush a leader region.
72    ///
73    /// Ignore:
74    /// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable.
75    /// - [PushMessage](error::Error::PushMessage), The receiver is dropped.
76    /// - Failed to flush region on the Datanode.
77    ///
78    /// Abort:
79    /// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout.
80    /// - [MailboxReceiver](error::Error::MailboxReceiver), The sender is dropped without sending (impossible).
81    /// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply).
82    /// - [ExceededDeadline](error::Error::ExceededDeadline)
83    /// - Invalid JSON.
84    async fn flush_region(&self, ctx: &mut Context) -> Result<()> {
85        let operation_timeout =
86            ctx.next_operation_timeout()
87                .context(error::ExceededDeadlineSnafu {
88                    operation: "Flush leader region",
89                })?;
90        let flush_instruction = self.build_flush_leader_region_instruction(ctx);
91        let region_id = ctx.persistent_ctx.region_id;
92        let leader = &ctx.persistent_ctx.from_peer;
93
94        let msg = MailboxMessage::json_message(
95            &format!("Flush leader region: {}", region_id),
96            &format!("Metasrv@{}", ctx.server_addr()),
97            &format!("Datanode-{}@{}", leader.id, leader.addr),
98            common_time::util::current_time_millis(),
99            &flush_instruction,
100        )
101        .with_context(|_| error::SerializeToJsonSnafu {
102            input: flush_instruction.to_string(),
103        })?;
104
105        let ch = Channel::Datanode(leader.id);
106        let now = Instant::now();
107        let result = ctx.mailbox.send(&ch, msg, operation_timeout).await;
108
109        match result {
110            Ok(receiver) => match receiver.await {
111                Ok(msg) => {
112                    let reply = HeartbeatMailbox::json_reply(&msg)?;
113                    info!(
114                        "Received flush leader region reply: {:?}, region: {}, elapsed: {:?}",
115                        reply,
116                        region_id,
117                        now.elapsed()
118                    );
119
120                    let InstructionReply::FlushRegion(SimpleReply { result, error }) = reply else {
121                        return error::UnexpectedInstructionReplySnafu {
122                            mailbox_message: msg.to_string(),
123                            reason: "expect flush region reply",
124                        }
125                        .fail();
126                    };
127
128                    if error.is_some() {
129                        warn!(
130                            "Failed to flush leader region {} on datanode {:?}, error: {:?}. Skip flush operation.",
131                            region_id, leader, error
132                        );
133                    } else if result {
134                        info!(
135                            "The flush leader region {} on datanode {:?} is successful, elapsed: {:?}",
136                            region_id,
137                            leader,
138                            now.elapsed()
139                        );
140                    }
141
142                    Ok(())
143                }
144                Err(Error::MailboxTimeout { .. }) => error::ExceededDeadlineSnafu {
145                    operation: "Flush leader region",
146                }
147                .fail(),
148                Err(err) => Err(err),
149            },
150            Err(Error::PusherNotFound { .. }) => {
151                warn!(
152                    "Failed to flush leader region({}), the datanode({}) is unreachable(PusherNotFound). Skip flush operation.",
153                    region_id,
154                    leader
155                );
156                Ok(())
157            }
158            Err(err) => Err(err),
159        }
160    }
161}
162
163#[cfg(test)]
164mod tests {
165    use std::assert_matches::assert_matches;
166
167    use store_api::storage::RegionId;
168
169    use super::*;
170    use crate::procedure::region_migration::test_util::{self, new_procedure_context, TestingEnv};
171    use crate::procedure::region_migration::{ContextFactory, PersistentContext};
172    use crate::procedure::test_util::{
173        new_close_region_reply, new_flush_region_reply, send_mock_reply,
174    };
175
176    fn new_persistent_context() -> PersistentContext {
177        test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
178    }
179
180    #[tokio::test]
181    async fn test_datanode_is_unreachable() {
182        let state = PreFlushRegion;
183        // from_peer: 1
184        // to_peer: 2
185        let persistent_context = new_persistent_context();
186        let env = TestingEnv::new();
187        let mut ctx = env.context_factory().new_context(persistent_context);
188        // Should be ok, if leader region is unreachable. it will skip flush operation.
189        state.flush_region(&mut ctx).await.unwrap();
190    }
191
192    #[tokio::test]
193    async fn test_unexpected_instruction_reply() {
194        common_telemetry::init_default_ut_logging();
195        let state = PreFlushRegion;
196        // from_peer: 1
197        // to_peer: 2
198        let persistent_context = new_persistent_context();
199        let from_peer_id = persistent_context.from_peer.id;
200        let mut env = TestingEnv::new();
201        let mut ctx = env.context_factory().new_context(persistent_context);
202        let mailbox_ctx = env.mailbox_context();
203        let mailbox = mailbox_ctx.mailbox().clone();
204        let (tx, rx) = tokio::sync::mpsc::channel(1);
205        mailbox_ctx
206            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
207            .await;
208        // Sends an incorrect reply.
209        send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
210        let err = state.flush_region(&mut ctx).await.unwrap_err();
211        assert_matches!(err, Error::UnexpectedInstructionReply { .. });
212        assert!(!err.is_retryable());
213    }
214
215    #[tokio::test]
216    async fn test_instruction_exceeded_deadline() {
217        let state = PreFlushRegion;
218        // from_peer: 1
219        // to_peer: 2
220        let persistent_context = new_persistent_context();
221        let from_peer_id = persistent_context.from_peer.id;
222        let mut env = TestingEnv::new();
223        let mut ctx = env.context_factory().new_context(persistent_context);
224        let mailbox_ctx = env.mailbox_context();
225        let mailbox = mailbox_ctx.mailbox().clone();
226        let (tx, rx) = tokio::sync::mpsc::channel(1);
227        mailbox_ctx
228            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
229            .await;
230        // Sends an timeout error.
231        send_mock_reply(mailbox, rx, |id| {
232            Err(error::MailboxTimeoutSnafu { id }.build())
233        });
234
235        let err = state.flush_region(&mut ctx).await.unwrap_err();
236        assert_matches!(err, Error::ExceededDeadline { .. });
237        assert!(!err.is_retryable());
238    }
239
240    #[tokio::test]
241    async fn test_flush_region_failed() {
242        common_telemetry::init_default_ut_logging();
243        let state = PreFlushRegion;
244        // from_peer: 1
245        // to_peer: 2
246        let persistent_context = new_persistent_context();
247        let from_peer_id = persistent_context.from_peer.id;
248        let mut env = TestingEnv::new();
249        let mut ctx = env.context_factory().new_context(persistent_context);
250        let mailbox_ctx = env.mailbox_context();
251        let mailbox = mailbox_ctx.mailbox().clone();
252        let (tx, rx) = tokio::sync::mpsc::channel(1);
253        mailbox_ctx
254            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
255            .await;
256        send_mock_reply(mailbox, rx, |id| {
257            Ok(new_flush_region_reply(
258                id,
259                false,
260                Some("test mocked".to_string()),
261            ))
262        });
263        // Should be ok, if flush leader region failed. it will skip flush operation.
264        state.flush_region(&mut ctx).await.unwrap();
265    }
266
267    #[tokio::test]
268    async fn test_next_update_metadata_downgrade_state() {
269        common_telemetry::init_default_ut_logging();
270        let mut state = PreFlushRegion;
271        // from_peer: 1
272        // to_peer: 2
273        let persistent_context = new_persistent_context();
274        let from_peer_id = persistent_context.from_peer.id;
275        let mut env = TestingEnv::new();
276        let mut ctx = env.context_factory().new_context(persistent_context);
277        let mailbox_ctx = env.mailbox_context();
278        let mailbox = mailbox_ctx.mailbox().clone();
279        let (tx, rx) = tokio::sync::mpsc::channel(1);
280        mailbox_ctx
281            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
282            .await;
283        send_mock_reply(mailbox, rx, |id| Ok(new_flush_region_reply(id, true, None)));
284        let procedure_ctx = new_procedure_context();
285        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
286
287        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
288        assert_matches!(update_metadata, UpdateMetadata::Downgrade);
289    }
290}