meta_srv/procedure/region_migration/
flush_leader_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use api::v1::meta::MailboxMessage;
18use common_meta::instruction::{FlushErrorStrategy, FlushRegions, Instruction, InstructionReply};
19use common_procedure::{Context as ProcedureContext, Status};
20use common_telemetry::{info, warn};
21use serde::{Deserialize, Serialize};
22use snafu::{OptionExt, ResultExt};
23use tokio::time::Instant;
24
25use crate::error::{self, Error, Result};
26use crate::handler::HeartbeatMailbox;
27use crate::procedure::region_migration::update_metadata::UpdateMetadata;
28use crate::procedure::region_migration::{Context, State};
29use crate::service::mailbox::Channel;
30
31/// Flushes the leader region before downgrading it.
32///
33/// This can minimize the time window where the region is not writable.
34#[derive(Debug, Serialize, Deserialize)]
35pub struct PreFlushRegion;
36
37#[async_trait::async_trait]
38#[typetag::serde]
39impl State for PreFlushRegion {
40    async fn next(
41        &mut self,
42        ctx: &mut Context,
43        _procedure_ctx: &ProcedureContext,
44    ) -> Result<(Box<dyn State>, Status)> {
45        let timer = Instant::now();
46        self.flush_region(ctx).await?;
47        ctx.update_flush_leader_region_elapsed(timer);
48        // We intentionally don't update `operations_elapsed` here to prevent
49        // the `next_operation_timeout` from being reduced by the flush operation.
50        // This ensures sufficient time for subsequent critical operations.
51
52        Ok((
53            Box::new(UpdateMetadata::Downgrade),
54            Status::executing(false),
55        ))
56    }
57
58    fn as_any(&self) -> &dyn Any {
59        self
60    }
61}
62
63impl PreFlushRegion {
64    /// Builds flush leader region instruction.
65    fn build_flush_leader_region_instruction(&self, ctx: &Context) -> Instruction {
66        let pc = &ctx.persistent_ctx;
67        Instruction::FlushRegions(FlushRegions::sync_batch(
68            pc.region_ids.clone(),
69            FlushErrorStrategy::TryAll,
70        ))
71    }
72
73    /// Tries to flush a leader region.
74    ///
75    /// Ignore:
76    /// - [PusherNotFound](error::Error::PusherNotFound), The datanode is unreachable.
77    /// - [PushMessage](error::Error::PushMessage), The receiver is dropped.
78    /// - Failed to flush region on the Datanode.
79    ///
80    /// Abort:
81    /// - [MailboxTimeout](error::Error::MailboxTimeout), Timeout.
82    /// - [MailboxReceiver](error::Error::MailboxReceiver), The sender is dropped without sending (impossible).
83    /// - [UnexpectedInstructionReply](error::Error::UnexpectedInstructionReply).
84    /// - [ExceededDeadline](error::Error::ExceededDeadline)
85    /// - Invalid JSON.
86    async fn flush_region(&self, ctx: &mut Context) -> Result<()> {
87        let operation_timeout =
88            ctx.next_operation_timeout()
89                .context(error::ExceededDeadlineSnafu {
90                    operation: "Flush leader region",
91                })?;
92        let flush_instruction = self.build_flush_leader_region_instruction(ctx);
93        let region_ids = &ctx.persistent_ctx.region_ids;
94        let leader = &ctx.persistent_ctx.from_peer;
95
96        let msg = MailboxMessage::json_message(
97            &format!("Flush leader region: {:?}", region_ids),
98            &format!("Metasrv@{}", ctx.server_addr()),
99            &format!("Datanode-{}@{}", leader.id, leader.addr),
100            common_time::util::current_time_millis(),
101            &flush_instruction,
102        )
103        .with_context(|_| error::SerializeToJsonSnafu {
104            input: flush_instruction.to_string(),
105        })?;
106
107        let ch = Channel::Datanode(leader.id);
108        let now = Instant::now();
109        let result = ctx.mailbox.send(&ch, msg, operation_timeout).await;
110
111        match result {
112            Ok(receiver) => match receiver.await {
113                Ok(msg) => {
114                    let reply = HeartbeatMailbox::json_reply(&msg)?;
115                    info!(
116                        "Received flush leader region reply: {:?}, region: {:?}, elapsed: {:?}",
117                        reply,
118                        region_ids,
119                        now.elapsed()
120                    );
121
122                    let reply_result = match reply {
123                        InstructionReply::FlushRegions(flush_reply) => {
124                            if flush_reply.results.len() != region_ids.len() {
125                                return error::UnexpectedInstructionReplySnafu {
126                                    mailbox_message: msg.to_string(),
127                                    reason: format!(
128                                        "expect {} region flush result, but got {}",
129                                        region_ids.len(),
130                                        flush_reply.results.len()
131                                    ),
132                                }
133                                .fail();
134                            }
135
136                            match flush_reply.overall_success {
137                                true => (true, None),
138                                false => (
139                                    false,
140                                    Some(
141                                        flush_reply
142                                            .results
143                                            .iter()
144                                            .filter_map(|(region_id, result)| match result {
145                                                Ok(_) => None,
146                                                Err(e) => Some(format!("{}: {}", region_id, e)),
147                                            })
148                                            .collect::<Vec<String>>()
149                                            .join("; "),
150                                    ),
151                                ),
152                            }
153                        }
154                        _ => {
155                            return error::UnexpectedInstructionReplySnafu {
156                                mailbox_message: msg.to_string(),
157                                reason: "expect flush region reply",
158                            }
159                            .fail();
160                        }
161                    };
162                    let (result, error) = reply_result;
163
164                    if let Some(error) = error {
165                        warn!(
166                            "Failed to flush leader regions {:?} on datanode {:?}, error: {}. Skip flush operation.",
167                            region_ids, leader, &error
168                        );
169                    } else if result {
170                        info!(
171                            "The flush leader regions {:?} on datanode {:?} is successful, elapsed: {:?}",
172                            region_ids,
173                            leader,
174                            now.elapsed()
175                        );
176                    }
177
178                    Ok(())
179                }
180                Err(Error::MailboxTimeout { .. }) => error::ExceededDeadlineSnafu {
181                    operation: "Flush leader regions",
182                }
183                .fail(),
184                Err(err) => Err(err),
185            },
186            Err(Error::PusherNotFound { .. }) => {
187                warn!(
188                    "Failed to flush leader regions({:?}), the datanode({}) is unreachable(PusherNotFound). Skip flush operation.",
189                    region_ids, leader
190                );
191                Ok(())
192            }
193            Err(err) => Err(err),
194        }
195    }
196}
197
198#[cfg(test)]
199mod tests {
200    use std::assert_matches::assert_matches;
201
202    use store_api::storage::RegionId;
203
204    use super::*;
205    use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
206    use crate::procedure::region_migration::{ContextFactory, PersistentContext};
207    use crate::procedure::test_util::{
208        new_close_region_reply, new_flush_region_reply_for_region, send_mock_reply,
209    };
210
211    fn new_persistent_context() -> PersistentContext {
212        test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
213    }
214
215    #[tokio::test]
216    async fn test_datanode_is_unreachable() {
217        let state = PreFlushRegion;
218        // from_peer: 1
219        // to_peer: 2
220        let persistent_context = new_persistent_context();
221        let env = TestingEnv::new();
222        let mut ctx = env.context_factory().new_context(persistent_context);
223        // Should be ok, if leader region is unreachable. it will skip flush operation.
224        state.flush_region(&mut ctx).await.unwrap();
225    }
226
227    #[tokio::test]
228    async fn test_unexpected_instruction_reply() {
229        common_telemetry::init_default_ut_logging();
230        let state = PreFlushRegion;
231        // from_peer: 1
232        // to_peer: 2
233        let persistent_context = new_persistent_context();
234        let from_peer_id = persistent_context.from_peer.id;
235        let mut env = TestingEnv::new();
236        let mut ctx = env.context_factory().new_context(persistent_context);
237        let mailbox_ctx = env.mailbox_context();
238        let mailbox = mailbox_ctx.mailbox().clone();
239        let (tx, rx) = tokio::sync::mpsc::channel(1);
240        mailbox_ctx
241            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
242            .await;
243        // Sends an incorrect reply.
244        send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
245        let err = state.flush_region(&mut ctx).await.unwrap_err();
246        assert_matches!(err, Error::UnexpectedInstructionReply { .. });
247        assert!(!err.is_retryable());
248    }
249
250    #[tokio::test]
251    async fn test_instruction_exceeded_deadline() {
252        let state = PreFlushRegion;
253        // from_peer: 1
254        // to_peer: 2
255        let persistent_context = new_persistent_context();
256        let from_peer_id = persistent_context.from_peer.id;
257        let mut env = TestingEnv::new();
258        let mut ctx = env.context_factory().new_context(persistent_context);
259        let mailbox_ctx = env.mailbox_context();
260        let mailbox = mailbox_ctx.mailbox().clone();
261        let (tx, rx) = tokio::sync::mpsc::channel(1);
262        mailbox_ctx
263            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
264            .await;
265        // Sends an timeout error.
266        send_mock_reply(mailbox, rx, |id| {
267            Err(error::MailboxTimeoutSnafu { id }.build())
268        });
269
270        let err = state.flush_region(&mut ctx).await.unwrap_err();
271        assert_matches!(err, Error::ExceededDeadline { .. });
272        assert!(!err.is_retryable());
273    }
274
275    #[tokio::test]
276    async fn test_flush_region_failed() {
277        common_telemetry::init_default_ut_logging();
278        let state = PreFlushRegion;
279        // from_peer: 1
280        // to_peer: 2
281        let persistent_context = new_persistent_context();
282        let from_peer_id = persistent_context.from_peer.id;
283        let region_id = persistent_context.region_ids[0];
284        let mut env = TestingEnv::new();
285        let mut ctx = env.context_factory().new_context(persistent_context);
286        let mailbox_ctx = env.mailbox_context();
287        let mailbox = mailbox_ctx.mailbox().clone();
288        let (tx, rx) = tokio::sync::mpsc::channel(1);
289        mailbox_ctx
290            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
291            .await;
292        send_mock_reply(mailbox, rx, move |id| {
293            Ok(new_flush_region_reply_for_region(
294                id,
295                region_id,
296                false,
297                Some("test mocked".to_string()),
298            ))
299        });
300        // Should be ok, if flush leader region failed. it will skip flush operation.
301        state.flush_region(&mut ctx).await.unwrap();
302    }
303
304    #[tokio::test]
305    async fn test_next_update_metadata_downgrade_state() {
306        common_telemetry::init_default_ut_logging();
307        let mut state = PreFlushRegion;
308        // from_peer: 1
309        // to_peer: 2
310        let persistent_context = new_persistent_context();
311        let from_peer_id = persistent_context.from_peer.id;
312        let region_id = persistent_context.region_ids[0];
313        let mut env = TestingEnv::new();
314        let mut ctx = env.context_factory().new_context(persistent_context);
315        let mailbox_ctx = env.mailbox_context();
316        let mailbox = mailbox_ctx.mailbox().clone();
317        let (tx, rx) = tokio::sync::mpsc::channel(1);
318        mailbox_ctx
319            .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
320            .await;
321        send_mock_reply(mailbox, rx, move |id| {
322            Ok(new_flush_region_reply_for_region(id, region_id, true, None))
323        });
324        let procedure_ctx = new_procedure_context();
325        let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
326
327        let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
328        assert_matches!(update_metadata, UpdateMetadata::Downgrade);
329    }
330}