meta_srv/procedure/region_migration/
flush_leader_region.rs1use std::any::Any;
16
17use api::v1::meta::MailboxMessage;
18use common_meta::instruction::{Instruction, InstructionReply, SimpleReply};
19use common_procedure::{Context as ProcedureContext, Status};
20use common_telemetry::{info, warn};
21use serde::{Deserialize, Serialize};
22use snafu::{OptionExt, ResultExt};
23use tokio::time::Instant;
24
25use crate::error::{self, Error, Result};
26use crate::handler::HeartbeatMailbox;
27use crate::procedure::region_migration::update_metadata::UpdateMetadata;
28use crate::procedure::region_migration::{Context, State};
29use crate::service::mailbox::Channel;
30
31#[derive(Debug, Serialize, Deserialize)]
35pub struct PreFlushRegion;
36
37#[async_trait::async_trait]
38#[typetag::serde]
39impl State for PreFlushRegion {
40 async fn next(
41 &mut self,
42 ctx: &mut Context,
43 _procedure_ctx: &ProcedureContext,
44 ) -> Result<(Box<dyn State>, Status)> {
45 let timer = Instant::now();
46 self.flush_region(ctx).await?;
47 ctx.update_flush_leader_region_elapsed(timer);
48 Ok((
53 Box::new(UpdateMetadata::Downgrade),
54 Status::executing(false),
55 ))
56 }
57
58 fn as_any(&self) -> &dyn Any {
59 self
60 }
61}
62
63impl PreFlushRegion {
64 fn build_flush_leader_region_instruction(&self, ctx: &Context) -> Instruction {
66 let pc = &ctx.persistent_ctx;
67 let region_id = pc.region_id;
68 Instruction::FlushRegion(region_id)
69 }
70
71 async fn flush_region(&self, ctx: &mut Context) -> Result<()> {
85 let operation_timeout =
86 ctx.next_operation_timeout()
87 .context(error::ExceededDeadlineSnafu {
88 operation: "Flush leader region",
89 })?;
90 let flush_instruction = self.build_flush_leader_region_instruction(ctx);
91 let region_id = ctx.persistent_ctx.region_id;
92 let leader = &ctx.persistent_ctx.from_peer;
93
94 let msg = MailboxMessage::json_message(
95 &format!("Flush leader region: {}", region_id),
96 &format!("Metasrv@{}", ctx.server_addr()),
97 &format!("Datanode-{}@{}", leader.id, leader.addr),
98 common_time::util::current_time_millis(),
99 &flush_instruction,
100 )
101 .with_context(|_| error::SerializeToJsonSnafu {
102 input: flush_instruction.to_string(),
103 })?;
104
105 let ch = Channel::Datanode(leader.id);
106 let now = Instant::now();
107 let result = ctx.mailbox.send(&ch, msg, operation_timeout).await;
108
109 match result {
110 Ok(receiver) => match receiver.await {
111 Ok(msg) => {
112 let reply = HeartbeatMailbox::json_reply(&msg)?;
113 info!(
114 "Received flush leader region reply: {:?}, region: {}, elapsed: {:?}",
115 reply,
116 region_id,
117 now.elapsed()
118 );
119
120 let InstructionReply::FlushRegion(SimpleReply { result, error }) = reply else {
121 return error::UnexpectedInstructionReplySnafu {
122 mailbox_message: msg.to_string(),
123 reason: "expect flush region reply",
124 }
125 .fail();
126 };
127
128 if error.is_some() {
129 warn!(
130 "Failed to flush leader region {} on datanode {:?}, error: {:?}. Skip flush operation.",
131 region_id, leader, error
132 );
133 } else if result {
134 info!(
135 "The flush leader region {} on datanode {:?} is successful, elapsed: {:?}",
136 region_id,
137 leader,
138 now.elapsed()
139 );
140 }
141
142 Ok(())
143 }
144 Err(Error::MailboxTimeout { .. }) => error::ExceededDeadlineSnafu {
145 operation: "Flush leader region",
146 }
147 .fail(),
148 Err(err) => Err(err),
149 },
150 Err(Error::PusherNotFound { .. }) => {
151 warn!(
152 "Failed to flush leader region({}), the datanode({}) is unreachable(PusherNotFound). Skip flush operation.",
153 region_id,
154 leader
155 );
156 Ok(())
157 }
158 Err(err) => Err(err),
159 }
160 }
161}
162
163#[cfg(test)]
164mod tests {
165 use std::assert_matches::assert_matches;
166
167 use store_api::storage::RegionId;
168
169 use super::*;
170 use crate::procedure::region_migration::test_util::{self, new_procedure_context, TestingEnv};
171 use crate::procedure::region_migration::{ContextFactory, PersistentContext};
172 use crate::procedure::test_util::{
173 new_close_region_reply, new_flush_region_reply, send_mock_reply,
174 };
175
176 fn new_persistent_context() -> PersistentContext {
177 test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
178 }
179
180 #[tokio::test]
181 async fn test_datanode_is_unreachable() {
182 let state = PreFlushRegion;
183 let persistent_context = new_persistent_context();
186 let env = TestingEnv::new();
187 let mut ctx = env.context_factory().new_context(persistent_context);
188 state.flush_region(&mut ctx).await.unwrap();
190 }
191
192 #[tokio::test]
193 async fn test_unexpected_instruction_reply() {
194 common_telemetry::init_default_ut_logging();
195 let state = PreFlushRegion;
196 let persistent_context = new_persistent_context();
199 let from_peer_id = persistent_context.from_peer.id;
200 let mut env = TestingEnv::new();
201 let mut ctx = env.context_factory().new_context(persistent_context);
202 let mailbox_ctx = env.mailbox_context();
203 let mailbox = mailbox_ctx.mailbox().clone();
204 let (tx, rx) = tokio::sync::mpsc::channel(1);
205 mailbox_ctx
206 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
207 .await;
208 send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
210 let err = state.flush_region(&mut ctx).await.unwrap_err();
211 assert_matches!(err, Error::UnexpectedInstructionReply { .. });
212 assert!(!err.is_retryable());
213 }
214
215 #[tokio::test]
216 async fn test_instruction_exceeded_deadline() {
217 let state = PreFlushRegion;
218 let persistent_context = new_persistent_context();
221 let from_peer_id = persistent_context.from_peer.id;
222 let mut env = TestingEnv::new();
223 let mut ctx = env.context_factory().new_context(persistent_context);
224 let mailbox_ctx = env.mailbox_context();
225 let mailbox = mailbox_ctx.mailbox().clone();
226 let (tx, rx) = tokio::sync::mpsc::channel(1);
227 mailbox_ctx
228 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
229 .await;
230 send_mock_reply(mailbox, rx, |id| {
232 Err(error::MailboxTimeoutSnafu { id }.build())
233 });
234
235 let err = state.flush_region(&mut ctx).await.unwrap_err();
236 assert_matches!(err, Error::ExceededDeadline { .. });
237 assert!(!err.is_retryable());
238 }
239
240 #[tokio::test]
241 async fn test_flush_region_failed() {
242 common_telemetry::init_default_ut_logging();
243 let state = PreFlushRegion;
244 let persistent_context = new_persistent_context();
247 let from_peer_id = persistent_context.from_peer.id;
248 let mut env = TestingEnv::new();
249 let mut ctx = env.context_factory().new_context(persistent_context);
250 let mailbox_ctx = env.mailbox_context();
251 let mailbox = mailbox_ctx.mailbox().clone();
252 let (tx, rx) = tokio::sync::mpsc::channel(1);
253 mailbox_ctx
254 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
255 .await;
256 send_mock_reply(mailbox, rx, |id| {
257 Ok(new_flush_region_reply(
258 id,
259 false,
260 Some("test mocked".to_string()),
261 ))
262 });
263 state.flush_region(&mut ctx).await.unwrap();
265 }
266
267 #[tokio::test]
268 async fn test_next_update_metadata_downgrade_state() {
269 common_telemetry::init_default_ut_logging();
270 let mut state = PreFlushRegion;
271 let persistent_context = new_persistent_context();
274 let from_peer_id = persistent_context.from_peer.id;
275 let mut env = TestingEnv::new();
276 let mut ctx = env.context_factory().new_context(persistent_context);
277 let mailbox_ctx = env.mailbox_context();
278 let mailbox = mailbox_ctx.mailbox().clone();
279 let (tx, rx) = tokio::sync::mpsc::channel(1);
280 mailbox_ctx
281 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
282 .await;
283 send_mock_reply(mailbox, rx, |id| Ok(new_flush_region_reply(id, true, None)));
284 let procedure_ctx = new_procedure_context();
285 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
286
287 let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
288 assert_matches!(update_metadata, UpdateMetadata::Downgrade);
289 }
290}