meta_srv/procedure/region_migration/
flush_leader_region.rs1use std::any::Any;
16
17use api::v1::meta::MailboxMessage;
18use common_meta::instruction::{FlushRegions, Instruction, InstructionReply};
19use common_procedure::{Context as ProcedureContext, Status};
20use common_telemetry::{info, warn};
21use serde::{Deserialize, Serialize};
22use snafu::{OptionExt, ResultExt};
23use tokio::time::Instant;
24
25use crate::error::{self, Error, Result};
26use crate::handler::HeartbeatMailbox;
27use crate::procedure::region_migration::update_metadata::UpdateMetadata;
28use crate::procedure::region_migration::{Context, State};
29use crate::service::mailbox::Channel;
30
31#[derive(Debug, Serialize, Deserialize)]
35pub struct PreFlushRegion;
36
37#[async_trait::async_trait]
38#[typetag::serde]
39impl State for PreFlushRegion {
40 async fn next(
41 &mut self,
42 ctx: &mut Context,
43 _procedure_ctx: &ProcedureContext,
44 ) -> Result<(Box<dyn State>, Status)> {
45 let timer = Instant::now();
46 self.flush_region(ctx).await?;
47 ctx.update_flush_leader_region_elapsed(timer);
48 Ok((
53 Box::new(UpdateMetadata::Downgrade),
54 Status::executing(false),
55 ))
56 }
57
58 fn as_any(&self) -> &dyn Any {
59 self
60 }
61}
62
63impl PreFlushRegion {
64 fn build_flush_leader_region_instruction(&self, ctx: &Context) -> Instruction {
66 let pc = &ctx.persistent_ctx;
67 let region_id = pc.region_id;
68 Instruction::FlushRegions(FlushRegions::sync_single(region_id))
69 }
70
71 async fn flush_region(&self, ctx: &mut Context) -> Result<()> {
85 let operation_timeout =
86 ctx.next_operation_timeout()
87 .context(error::ExceededDeadlineSnafu {
88 operation: "Flush leader region",
89 })?;
90 let flush_instruction = self.build_flush_leader_region_instruction(ctx);
91 let region_id = ctx.persistent_ctx.region_id;
92 let leader = &ctx.persistent_ctx.from_peer;
93
94 let msg = MailboxMessage::json_message(
95 &format!("Flush leader region: {}", region_id),
96 &format!("Metasrv@{}", ctx.server_addr()),
97 &format!("Datanode-{}@{}", leader.id, leader.addr),
98 common_time::util::current_time_millis(),
99 &flush_instruction,
100 )
101 .with_context(|_| error::SerializeToJsonSnafu {
102 input: flush_instruction.to_string(),
103 })?;
104
105 let ch = Channel::Datanode(leader.id);
106 let now = Instant::now();
107 let result = ctx.mailbox.send(&ch, msg, operation_timeout).await;
108
109 match result {
110 Ok(receiver) => match receiver.await {
111 Ok(msg) => {
112 let reply = HeartbeatMailbox::json_reply(&msg)?;
113 info!(
114 "Received flush leader region reply: {:?}, region: {}, elapsed: {:?}",
115 reply,
116 region_id,
117 now.elapsed()
118 );
119
120 let reply_result = match reply {
121 InstructionReply::FlushRegions(flush_reply) => {
122 if flush_reply.results.len() != 1 {
123 return error::UnexpectedInstructionReplySnafu {
124 mailbox_message: msg.to_string(),
125 reason: "expect single region flush result",
126 }
127 .fail();
128 }
129 let (reply_region_id, result) = &flush_reply.results[0];
130 if *reply_region_id != region_id {
131 return error::UnexpectedInstructionReplySnafu {
132 mailbox_message: msg.to_string(),
133 reason: "flush reply region ID mismatch",
134 }
135 .fail();
136 }
137 match result {
138 Ok(()) => (true, None),
139 Err(err) => (false, Some(err.clone())),
140 }
141 }
142 _ => {
143 return error::UnexpectedInstructionReplySnafu {
144 mailbox_message: msg.to_string(),
145 reason: "expect flush region reply",
146 }
147 .fail();
148 }
149 };
150 let (result, error) = reply_result;
151
152 if error.is_some() {
153 warn!(
154 "Failed to flush leader region {} on datanode {:?}, error: {:?}. Skip flush operation.",
155 region_id, leader, error
156 );
157 } else if result {
158 info!(
159 "The flush leader region {} on datanode {:?} is successful, elapsed: {:?}",
160 region_id,
161 leader,
162 now.elapsed()
163 );
164 }
165
166 Ok(())
167 }
168 Err(Error::MailboxTimeout { .. }) => error::ExceededDeadlineSnafu {
169 operation: "Flush leader region",
170 }
171 .fail(),
172 Err(err) => Err(err),
173 },
174 Err(Error::PusherNotFound { .. }) => {
175 warn!(
176 "Failed to flush leader region({}), the datanode({}) is unreachable(PusherNotFound). Skip flush operation.",
177 region_id, leader
178 );
179 Ok(())
180 }
181 Err(err) => Err(err),
182 }
183 }
184}
185
186#[cfg(test)]
187mod tests {
188 use std::assert_matches::assert_matches;
189
190 use store_api::storage::RegionId;
191
192 use super::*;
193 use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
194 use crate::procedure::region_migration::{ContextFactory, PersistentContext};
195 use crate::procedure::test_util::{
196 new_close_region_reply, new_flush_region_reply_for_region, send_mock_reply,
197 };
198
199 fn new_persistent_context() -> PersistentContext {
200 test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
201 }
202
203 #[tokio::test]
204 async fn test_datanode_is_unreachable() {
205 let state = PreFlushRegion;
206 let persistent_context = new_persistent_context();
209 let env = TestingEnv::new();
210 let mut ctx = env.context_factory().new_context(persistent_context);
211 state.flush_region(&mut ctx).await.unwrap();
213 }
214
215 #[tokio::test]
216 async fn test_unexpected_instruction_reply() {
217 common_telemetry::init_default_ut_logging();
218 let state = PreFlushRegion;
219 let persistent_context = new_persistent_context();
222 let from_peer_id = persistent_context.from_peer.id;
223 let mut env = TestingEnv::new();
224 let mut ctx = env.context_factory().new_context(persistent_context);
225 let mailbox_ctx = env.mailbox_context();
226 let mailbox = mailbox_ctx.mailbox().clone();
227 let (tx, rx) = tokio::sync::mpsc::channel(1);
228 mailbox_ctx
229 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
230 .await;
231 send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
233 let err = state.flush_region(&mut ctx).await.unwrap_err();
234 assert_matches!(err, Error::UnexpectedInstructionReply { .. });
235 assert!(!err.is_retryable());
236 }
237
238 #[tokio::test]
239 async fn test_instruction_exceeded_deadline() {
240 let state = PreFlushRegion;
241 let persistent_context = new_persistent_context();
244 let from_peer_id = persistent_context.from_peer.id;
245 let mut env = TestingEnv::new();
246 let mut ctx = env.context_factory().new_context(persistent_context);
247 let mailbox_ctx = env.mailbox_context();
248 let mailbox = mailbox_ctx.mailbox().clone();
249 let (tx, rx) = tokio::sync::mpsc::channel(1);
250 mailbox_ctx
251 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
252 .await;
253 send_mock_reply(mailbox, rx, |id| {
255 Err(error::MailboxTimeoutSnafu { id }.build())
256 });
257
258 let err = state.flush_region(&mut ctx).await.unwrap_err();
259 assert_matches!(err, Error::ExceededDeadline { .. });
260 assert!(!err.is_retryable());
261 }
262
263 #[tokio::test]
264 async fn test_flush_region_failed() {
265 common_telemetry::init_default_ut_logging();
266 let state = PreFlushRegion;
267 let persistent_context = new_persistent_context();
270 let from_peer_id = persistent_context.from_peer.id;
271 let region_id = persistent_context.region_id;
272 let mut env = TestingEnv::new();
273 let mut ctx = env.context_factory().new_context(persistent_context);
274 let mailbox_ctx = env.mailbox_context();
275 let mailbox = mailbox_ctx.mailbox().clone();
276 let (tx, rx) = tokio::sync::mpsc::channel(1);
277 mailbox_ctx
278 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
279 .await;
280 send_mock_reply(mailbox, rx, move |id| {
281 Ok(new_flush_region_reply_for_region(
282 id,
283 region_id,
284 false,
285 Some("test mocked".to_string()),
286 ))
287 });
288 state.flush_region(&mut ctx).await.unwrap();
290 }
291
292 #[tokio::test]
293 async fn test_next_update_metadata_downgrade_state() {
294 common_telemetry::init_default_ut_logging();
295 let mut state = PreFlushRegion;
296 let persistent_context = new_persistent_context();
299 let from_peer_id = persistent_context.from_peer.id;
300 let region_id = persistent_context.region_id;
301 let mut env = TestingEnv::new();
302 let mut ctx = env.context_factory().new_context(persistent_context);
303 let mailbox_ctx = env.mailbox_context();
304 let mailbox = mailbox_ctx.mailbox().clone();
305 let (tx, rx) = tokio::sync::mpsc::channel(1);
306 mailbox_ctx
307 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
308 .await;
309 send_mock_reply(mailbox, rx, move |id| {
310 Ok(new_flush_region_reply_for_region(id, region_id, true, None))
311 });
312 let procedure_ctx = new_procedure_context();
313 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
314
315 let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
316 assert_matches!(update_metadata, UpdateMetadata::Downgrade);
317 }
318}