meta_srv/procedure/region_migration/
flush_leader_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use api::v1::meta::MailboxMessage;
18use common_meta::instruction::{FlushRegions, Instruction, InstructionReply};
19use common_procedure::{Context as ProcedureContext, Status};
20use common_telemetry::{info, warn};
21use serde::{Deserialize, Serialize};
22use snafu::{OptionExt, ResultExt};
23use tokio::time::Instant;
24
25use crate::error::{self, Error, Result};
26use crate::handler::HeartbeatMailbox;
27use crate::procedure::region_migration::update_metadata::UpdateMetadata;
28use crate::procedure::region_migration::{Context, State};
29use crate::service::mailbox::Channel;
30
31/// Flushes the leader region before downgrading it.
32///
33/// This can minimize the time window where the region is not writable.
34#[derive(Debug, Serialize, Deserialize)]
35pub struct PreFlushRegion;
36
37#[async_trait::async_trait]
38#[typetag::serde]
39impl State for PreFlushRegion {
40    async fn next(
41        &mut self,
42        ctx: &mut Context,
43        _procedure_ctx: &ProcedureContext,
44    ) -> Result<(Box<dyn State>, Status)> {
45        let timer = Instant::now();
46        self.flush_region(ctx).await?;
47        ctx.update_flush_leader_region_elapsed(timer);
48        // We intentionally don't update `operations_elapsed` here to prevent
49        // the `next_operation_timeout` from being reduced by the flush operation.
50        // This ensures sufficient time for subsequent critical operations.
51
52        Ok((
53            Box::new(UpdateMetadata::Downgrade),
54            Status::executing(false),
55        ))
56    }
57
58    fn as_any(&self) -> &dyn Any {
59        self
60    }
61}
62
63impl PreFlushRegion {
64    /// Builds flush leader region instruction.
65    fn build_flush_leader_region_instruction(&self, ctx: &Context) -> Instruction {
66        let pc = &ctx.persistent_ctx;
67        let region_id = pc.region_id;
68        Instruction::FlushRegions(FlushRegions::sync_single(region_id))
69    }
70
71    /// Tries to flush a leader region.
72    ///
73    /// Ignore:
74    /// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable.
75    /// - [PushMessage](error::Error::PushMessage), The receiver is dropped.
76    /// - Failed to flush region on the Datanode.
77    ///
78    /// Abort:
79    /// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout.
80    /// - [MailboxReceiver](error::Error::MailboxReceiver), The sender is dropped without sending (impossible).
81    /// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply).
82    /// - [ExceededDeadline](error::Error::ExceededDeadline)
83    /// - Invalid JSON.
84    async fn flush_region(&self, ctx: &mut Context) -> Result<()> {
85        let operation_timeout =
86            ctx.next_operation_timeout()
87                .context(error::ExceededDeadlineSnafu {
88                    operation: "Flush leader region",
89                })?;
90        let flush_instruction = self.build_flush_leader_region_instruction(ctx);
91        let region_id = ctx.persistent_ctx.region_id;
92        let leader = &ctx.persistent_ctx.from_peer;
93
94        let msg = MailboxMessage::json_message(
95            &format!("Flush leader region: {}", region_id),
96            &format!("Metasrv@{}", ctx.server_addr()),
97            &format!("Datanode-{}@{}", leader.id, leader.addr),
98            common_time::util::current_time_millis(),
99            &flush_instruction,
100        )
101        .with_context(|_| error::SerializeToJsonSnafu {
102            input: flush_instruction.to_string(),
103        })?;
104
105        let ch = Channel::Datanode(leader.id);
106        let now = Instant::now();
107        let result = ctx.mailbox.send(&ch, msg, operation_timeout).await;
108
109        match result {
110            Ok(receiver) => match receiver.await {
111                Ok(msg) => {
112                    let reply = HeartbeatMailbox::json_reply(&msg)?;
113                    info!(
114                        "Received flush leader region reply: {:?}, region: {}, elapsed: {:?}",
115                        reply,
116                        region_id,
117                        now.elapsed()
118                    );
119
120                    let reply_result = match reply {
121                        InstructionReply::FlushRegions(flush_reply) => {
122                            if flush_reply.results.len() != 1 {
123                                return error::UnexpectedInstructionReplySnafu {
124                                    mailbox_message: msg.to_string(),
125                                    reason: "expect single region flush result",
126                                }
127                                .fail();
128                            }
129                            let (reply_region_id, result) = &flush_reply.results[0];
130                            if *reply_region_id != region_id {
131                                return error::UnexpectedInstructionReplySnafu {
132                                    mailbox_message: msg.to_string(),
133                                    reason: "flush reply region ID mismatch",
134                                }
135                                .fail();
136                            }
137                            match result {
138                                Ok(()) => (true, None),
139                                Err(err) => (false, Some(err.clone())),
140                            }
141                        }
142                        _ => {
143                            return error::UnexpectedInstructionReplySnafu {
144                                mailbox_message: msg.to_string(),
145                                reason: "expect flush region reply",
146                            }
147                            .fail();
148                        }
149                    };
150                    let (result, error) = reply_result;
151
152                    if error.is_some() {
153                        warn!(
154                            "Failed to flush leader region {} on datanode {:?}, error: {:?}. Skip flush operation.",
155                            region_id, leader, error
156                        );
157                    } else if result {
158                        info!(
159                            "The flush leader region {} on datanode {:?} is successful, elapsed: {:?}",
160                            region_id,
161                            leader,
162                            now.elapsed()
163                        );
164                    }
165
166                    Ok(())
167                }
168                Err(Error::MailboxTimeout { .. }) => error::ExceededDeadlineSnafu {
169                    operation: "Flush leader region",
170                }
171                .fail(),
172                Err(err) => Err(err),
173            },
174            Err(Error::PusherNotFound { .. }) => {
175                warn!(
176                    "Failed to flush leader region({}), the datanode({}) is unreachable(PusherNotFound). Skip flush operation.",
177                    region_id, leader
178                );
179                Ok(())
180            }
181            Err(err) => Err(err),
182        }
183    }
184}
185
186#[cfg(test)]
187mod tests {
188    use std::assert_matches::assert_matches;
189
190    use store_api::storage::RegionId;
191
192    use super::*;
193    use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
194    use crate::procedure::region_migration::{ContextFactory, PersistentContext};
195    use crate::procedure::test_util::{
196        new_close_region_reply, new_flush_region_reply_for_region, send_mock_reply,
197    };
198
199    fn new_persistent_context() -> PersistentContext {
200        test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
201    }
202
203    #[tokio::test]
204    async fn test_datanode_is_unreachable() {
205        let state = PreFlushRegion;
206        // from_peer: 1
207        // to_peer: 2
208        let persistent_context = new_persistent_context();
209        let env = TestingEnv::new();
210        let mut ctx = env.context_factory().new_context(persistent_context);
211        // Should be ok, if leader region is unreachable. it will skip flush operation.
212        state.flush_region(&mut ctx).await.unwrap();
213    }
214
215    #[tokio::test]
216    async fn test_unexpected_instruction_reply() {
217        common_telemetry::init_default_ut_logging();
218        let state = PreFlushRegion;
219        // from_peer: 1
220        // to_peer: 2
221        let persistent_context = new_persistent_context();
222        let from_peer_id = persistent_context.from_peer.id;
223        let mut env = TestingEnv::new();
224        let mut ctx = env.context_factory().new_context(persistent_context);
225        let mailbox_ctx = env.mailbox_context();
226        let mailbox = mailbox_ctx.mailbox().clone();
227        let (tx, rx) = tokio::sync::mpsc::channel(1);
228        mailbox_ctx
229            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
230            .await;
231        // Sends an incorrect reply.
232        send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
233        let err = state.flush_region(&mut ctx).await.unwrap_err();
234        assert_matches!(err, Error::UnexpectedInstructionReply { .. });
235        assert!(!err.is_retryable());
236    }
237
238    #[tokio::test]
239    async fn test_instruction_exceeded_deadline() {
240        let state = PreFlushRegion;
241        // from_peer: 1
242        // to_peer: 2
243        let persistent_context = new_persistent_context();
244        let from_peer_id = persistent_context.from_peer.id;
245        let mut env = TestingEnv::new();
246        let mut ctx = env.context_factory().new_context(persistent_context);
247        let mailbox_ctx = env.mailbox_context();
248        let mailbox = mailbox_ctx.mailbox().clone();
249        let (tx, rx) = tokio::sync::mpsc::channel(1);
250        mailbox_ctx
251            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
252            .await;
253        // Sends an timeout error.
254        send_mock_reply(mailbox, rx, |id| {
255            Err(error::MailboxTimeoutSnafu { id }.build())
256        });
257
258        let err = state.flush_region(&mut ctx).await.unwrap_err();
259        assert_matches!(err, Error::ExceededDeadline { .. });
260        assert!(!err.is_retryable());
261    }
262
263    #[tokio::test]
264    async fn test_flush_region_failed() {
265        common_telemetry::init_default_ut_logging();
266        let state = PreFlushRegion;
267        // from_peer: 1
268        // to_peer: 2
269        let persistent_context = new_persistent_context();
270        let from_peer_id = persistent_context.from_peer.id;
271        let region_id = persistent_context.region_id;
272        let mut env = TestingEnv::new();
273        let mut ctx = env.context_factory().new_context(persistent_context);
274        let mailbox_ctx = env.mailbox_context();
275        let mailbox = mailbox_ctx.mailbox().clone();
276        let (tx, rx) = tokio::sync::mpsc::channel(1);
277        mailbox_ctx
278            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
279            .await;
280        send_mock_reply(mailbox, rx, move |id| {
281            Ok(new_flush_region_reply_for_region(
282                id,
283                region_id,
284                false,
285                Some("test mocked".to_string()),
286            ))
287        });
288        // Should be ok, if flush leader region failed. it will skip flush operation.
289        state.flush_region(&mut ctx).await.unwrap();
290    }
291
292    #[tokio::test]
293    async fn test_next_update_metadata_downgrade_state() {
294        common_telemetry::init_default_ut_logging();
295        let mut state = PreFlushRegion;
296        // from_peer: 1
297        // to_peer: 2
298        let persistent_context = new_persistent_context();
299        let from_peer_id = persistent_context.from_peer.id;
300        let region_id = persistent_context.region_id;
301        let mut env = TestingEnv::new();
302        let mut ctx = env.context_factory().new_context(persistent_context);
303        let mailbox_ctx = env.mailbox_context();
304        let mailbox = mailbox_ctx.mailbox().clone();
305        let (tx, rx) = tokio::sync::mpsc::channel(1);
306        mailbox_ctx
307            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
308            .await;
309        send_mock_reply(mailbox, rx, move |id| {
310            Ok(new_flush_region_reply_for_region(id, region_id, true, None))
311        });
312        let procedure_ctx = new_procedure_context();
313        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
314
315        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
316        assert_matches!(update_metadata, UpdateMetadata::Downgrade);
317    }
318}