meta_srv/procedure/region_migration/
flush_leader_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use api::v1::meta::MailboxMessage;
18use common_meta::instruction::{FlushRegions, Instruction, InstructionReply};
19use common_procedure::{Context as ProcedureContext, Status};
20use common_telemetry::{info, warn};
21use serde::{Deserialize, Serialize};
22use snafu::{OptionExt, ResultExt};
23use tokio::time::Instant;
24
25use crate::error::{self, Error, Result};
26use crate::handler::HeartbeatMailbox;
27use crate::procedure::region_migration::update_metadata::UpdateMetadata;
28use crate::procedure::region_migration::{Context, State};
29use crate::service::mailbox::Channel;
30
31/// Flushes the leader region before downgrading it.
32///
33/// This can minimize the time window where the region is not writable.
34#[derive(Debug, Serialize, Deserialize)]
35pub struct PreFlushRegion;
36
37#[async_trait::async_trait]
38#[typetag::serde]
39impl State for PreFlushRegion {
40    async fn next(
41        &mut self,
42        ctx: &mut Context,
43        _procedure_ctx: &ProcedureContext,
44    ) -> Result<(Box<dyn State>, Status)> {
45        let timer = Instant::now();
46        self.flush_region(ctx).await?;
47        ctx.update_flush_leader_region_elapsed(timer);
48        // We intentionally don't update `operations_elapsed` here to prevent
49        // the `next_operation_timeout` from being reduced by the flush operation.
50        // This ensures sufficient time for subsequent critical operations.
51
52        Ok((
53            Box::new(UpdateMetadata::Downgrade),
54            Status::executing(false),
55        ))
56    }
57
58    fn as_any(&self) -> &dyn Any {
59        self
60    }
61}
62
63impl PreFlushRegion {
64    /// Builds flush leader region instruction.
65    fn build_flush_leader_region_instruction(&self, ctx: &Context) -> Instruction {
66        let pc = &ctx.persistent_ctx;
67        let region_id = pc.region_id;
68        Instruction::FlushRegions(FlushRegions::sync_single(region_id))
69    }
70
71    /// Tries to flush a leader region.
72    ///
73    /// Ignore:
74    /// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable.
75    /// - [PushMessage](error::Error::PushMessage), The receiver is dropped.
76    /// - Failed to flush region on the Datanode.
77    ///
78    /// Abort:
79    /// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout.
80    /// - [MailboxReceiver](error::Error::MailboxReceiver), The sender is dropped without sending (impossible).
81    /// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply).
82    /// - [ExceededDeadline](error::Error::ExceededDeadline)
83    /// - Invalid JSON.
84    async fn flush_region(&self, ctx: &mut Context) -> Result<()> {
85        let operation_timeout =
86            ctx.next_operation_timeout()
87                .context(error::ExceededDeadlineSnafu {
88                    operation: "Flush leader region",
89                })?;
90        let flush_instruction = self.build_flush_leader_region_instruction(ctx);
91        let region_id = ctx.persistent_ctx.region_id;
92        let leader = &ctx.persistent_ctx.from_peer;
93
94        let msg = MailboxMessage::json_message(
95            &format!("Flush leader region: {}", region_id),
96            &format!("Metasrv@{}", ctx.server_addr()),
97            &format!("Datanode-{}@{}", leader.id, leader.addr),
98            common_time::util::current_time_millis(),
99            &flush_instruction,
100        )
101        .with_context(|_| error::SerializeToJsonSnafu {
102            input: flush_instruction.to_string(),
103        })?;
104
105        let ch = Channel::Datanode(leader.id);
106        let now = Instant::now();
107        let result = ctx.mailbox.send(&ch, msg, operation_timeout).await;
108
109        match result {
110            Ok(receiver) => match receiver.await {
111                Ok(msg) => {
112                    let reply = HeartbeatMailbox::json_reply(&msg)?;
113                    info!(
114                        "Received flush leader region reply: {:?}, region: {}, elapsed: {:?}",
115                        reply,
116                        region_id,
117                        now.elapsed()
118                    );
119
120                    let reply_result = match reply {
121                        InstructionReply::FlushRegions(flush_reply) => {
122                            if flush_reply.results.len() != 1 {
123                                return error::UnexpectedInstructionReplySnafu {
124                                    mailbox_message: msg.to_string(),
125                                    reason: "expect single region flush result",
126                                }
127                                .fail();
128                            }
129                            let (reply_region_id, result) = &flush_reply.results[0];
130                            if *reply_region_id != region_id {
131                                return error::UnexpectedInstructionReplySnafu {
132                                    mailbox_message: msg.to_string(),
133                                    reason: "flush reply region ID mismatch",
134                                }
135                                .fail();
136                            }
137                            match result {
138                                Ok(()) => (true, None),
139                                Err(err) => (false, Some(err.clone())),
140                            }
141                        }
142                        _ => {
143                            return error::UnexpectedInstructionReplySnafu {
144                                mailbox_message: msg.to_string(),
145                                reason: "expect flush region reply",
146                            }
147                            .fail();
148                        }
149                    };
150                    let (result, error) = reply_result;
151
152                    if error.is_some() {
153                        warn!(
154                            "Failed to flush leader region {} on datanode {:?}, error: {:?}. Skip flush operation.",
155                            region_id, leader, error
156                        );
157                    } else if result {
158                        info!(
159                            "The flush leader region {} on datanode {:?} is successful, elapsed: {:?}",
160                            region_id,
161                            leader,
162                            now.elapsed()
163                        );
164                    }
165
166                    Ok(())
167                }
168                Err(Error::MailboxTimeout { .. }) => error::ExceededDeadlineSnafu {
169                    operation: "Flush leader region",
170                }
171                .fail(),
172                Err(err) => Err(err),
173            },
174            Err(Error::PusherNotFound { .. }) => {
175                warn!(
176                    "Failed to flush leader region({}), the datanode({}) is unreachable(PusherNotFound). Skip flush operation.",
177                    region_id,
178                    leader
179                );
180                Ok(())
181            }
182            Err(err) => Err(err),
183        }
184    }
185}
186
187#[cfg(test)]
188mod tests {
189    use std::assert_matches::assert_matches;
190
191    use store_api::storage::RegionId;
192
193    use super::*;
194    use crate::procedure::region_migration::test_util::{self, new_procedure_context, TestingEnv};
195    use crate::procedure::region_migration::{ContextFactory, PersistentContext};
196    use crate::procedure::test_util::{
197        new_close_region_reply, new_flush_region_reply_for_region, send_mock_reply,
198    };
199
200    fn new_persistent_context() -> PersistentContext {
201        test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
202    }
203
204    #[tokio::test]
205    async fn test_datanode_is_unreachable() {
206        let state = PreFlushRegion;
207        // from_peer: 1
208        // to_peer: 2
209        let persistent_context = new_persistent_context();
210        let env = TestingEnv::new();
211        let mut ctx = env.context_factory().new_context(persistent_context);
212        // Should be ok, if leader region is unreachable. it will skip flush operation.
213        state.flush_region(&mut ctx).await.unwrap();
214    }
215
216    #[tokio::test]
217    async fn test_unexpected_instruction_reply() {
218        common_telemetry::init_default_ut_logging();
219        let state = PreFlushRegion;
220        // from_peer: 1
221        // to_peer: 2
222        let persistent_context = new_persistent_context();
223        let from_peer_id = persistent_context.from_peer.id;
224        let mut env = TestingEnv::new();
225        let mut ctx = env.context_factory().new_context(persistent_context);
226        let mailbox_ctx = env.mailbox_context();
227        let mailbox = mailbox_ctx.mailbox().clone();
228        let (tx, rx) = tokio::sync::mpsc::channel(1);
229        mailbox_ctx
230            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
231            .await;
232        // Sends an incorrect reply.
233        send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
234        let err = state.flush_region(&mut ctx).await.unwrap_err();
235        assert_matches!(err, Error::UnexpectedInstructionReply { .. });
236        assert!(!err.is_retryable());
237    }
238
239    #[tokio::test]
240    async fn test_instruction_exceeded_deadline() {
241        let state = PreFlushRegion;
242        // from_peer: 1
243        // to_peer: 2
244        let persistent_context = new_persistent_context();
245        let from_peer_id = persistent_context.from_peer.id;
246        let mut env = TestingEnv::new();
247        let mut ctx = env.context_factory().new_context(persistent_context);
248        let mailbox_ctx = env.mailbox_context();
249        let mailbox = mailbox_ctx.mailbox().clone();
250        let (tx, rx) = tokio::sync::mpsc::channel(1);
251        mailbox_ctx
252            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
253            .await;
254        // Sends an timeout error.
255        send_mock_reply(mailbox, rx, |id| {
256            Err(error::MailboxTimeoutSnafu { id }.build())
257        });
258
259        let err = state.flush_region(&mut ctx).await.unwrap_err();
260        assert_matches!(err, Error::ExceededDeadline { .. });
261        assert!(!err.is_retryable());
262    }
263
264    #[tokio::test]
265    async fn test_flush_region_failed() {
266        common_telemetry::init_default_ut_logging();
267        let state = PreFlushRegion;
268        // from_peer: 1
269        // to_peer: 2
270        let persistent_context = new_persistent_context();
271        let from_peer_id = persistent_context.from_peer.id;
272        let region_id = persistent_context.region_id;
273        let mut env = TestingEnv::new();
274        let mut ctx = env.context_factory().new_context(persistent_context);
275        let mailbox_ctx = env.mailbox_context();
276        let mailbox = mailbox_ctx.mailbox().clone();
277        let (tx, rx) = tokio::sync::mpsc::channel(1);
278        mailbox_ctx
279            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
280            .await;
281        send_mock_reply(mailbox, rx, move |id| {
282            Ok(new_flush_region_reply_for_region(
283                id,
284                region_id,
285                false,
286                Some("test mocked".to_string()),
287            ))
288        });
289        // Should be ok, if flush leader region failed. it will skip flush operation.
290        state.flush_region(&mut ctx).await.unwrap();
291    }
292
293    #[tokio::test]
294    async fn test_next_update_metadata_downgrade_state() {
295        common_telemetry::init_default_ut_logging();
296        let mut state = PreFlushRegion;
297        // from_peer: 1
298        // to_peer: 2
299        let persistent_context = new_persistent_context();
300        let from_peer_id = persistent_context.from_peer.id;
301        let region_id = persistent_context.region_id;
302        let mut env = TestingEnv::new();
303        let mut ctx = env.context_factory().new_context(persistent_context);
304        let mailbox_ctx = env.mailbox_context();
305        let mailbox = mailbox_ctx.mailbox().clone();
306        let (tx, rx) = tokio::sync::mpsc::channel(1);
307        mailbox_ctx
308            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
309            .await;
310        send_mock_reply(mailbox, rx, move |id| {
311            Ok(new_flush_region_reply_for_region(id, region_id, true, None))
312        });
313        let procedure_ctx = new_procedure_context();
314        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
315
316        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
317        assert_matches!(update_metadata, UpdateMetadata::Downgrade);
318    }
319}