meta_srv/procedure/region_migration/
flush_leader_region.rs1use std::any::Any;
16
17use api::v1::meta::MailboxMessage;
18use common_meta::instruction::{FlushRegions, Instruction, InstructionReply};
19use common_procedure::{Context as ProcedureContext, Status};
20use common_telemetry::{info, warn};
21use serde::{Deserialize, Serialize};
22use snafu::{OptionExt, ResultExt};
23use tokio::time::Instant;
24
25use crate::error::{self, Error, Result};
26use crate::handler::HeartbeatMailbox;
27use crate::procedure::region_migration::update_metadata::UpdateMetadata;
28use crate::procedure::region_migration::{Context, State};
29use crate::service::mailbox::Channel;
30
31#[derive(Debug, Serialize, Deserialize)]
35pub struct PreFlushRegion;
36
37#[async_trait::async_trait]
38#[typetag::serde]
39impl State for PreFlushRegion {
40 async fn next(
41 &mut self,
42 ctx: &mut Context,
43 _procedure_ctx: &ProcedureContext,
44 ) -> Result<(Box<dyn State>, Status)> {
45 let timer = Instant::now();
46 self.flush_region(ctx).await?;
47 ctx.update_flush_leader_region_elapsed(timer);
48 Ok((
53 Box::new(UpdateMetadata::Downgrade),
54 Status::executing(false),
55 ))
56 }
57
58 fn as_any(&self) -> &dyn Any {
59 self
60 }
61}
62
63impl PreFlushRegion {
64 fn build_flush_leader_region_instruction(&self, ctx: &Context) -> Instruction {
66 let pc = &ctx.persistent_ctx;
67 let region_id = pc.region_id;
68 Instruction::FlushRegions(FlushRegions::sync_single(region_id))
69 }
70
71 async fn flush_region(&self, ctx: &mut Context) -> Result<()> {
85 let operation_timeout =
86 ctx.next_operation_timeout()
87 .context(error::ExceededDeadlineSnafu {
88 operation: "Flush leader region",
89 })?;
90 let flush_instruction = self.build_flush_leader_region_instruction(ctx);
91 let region_id = ctx.persistent_ctx.region_id;
92 let leader = &ctx.persistent_ctx.from_peer;
93
94 let msg = MailboxMessage::json_message(
95 &format!("Flush leader region: {}", region_id),
96 &format!("Metasrv@{}", ctx.server_addr()),
97 &format!("Datanode-{}@{}", leader.id, leader.addr),
98 common_time::util::current_time_millis(),
99 &flush_instruction,
100 )
101 .with_context(|_| error::SerializeToJsonSnafu {
102 input: flush_instruction.to_string(),
103 })?;
104
105 let ch = Channel::Datanode(leader.id);
106 let now = Instant::now();
107 let result = ctx.mailbox.send(&ch, msg, operation_timeout).await;
108
109 match result {
110 Ok(receiver) => match receiver.await {
111 Ok(msg) => {
112 let reply = HeartbeatMailbox::json_reply(&msg)?;
113 info!(
114 "Received flush leader region reply: {:?}, region: {}, elapsed: {:?}",
115 reply,
116 region_id,
117 now.elapsed()
118 );
119
120 let reply_result = match reply {
121 InstructionReply::FlushRegions(flush_reply) => {
122 if flush_reply.results.len() != 1 {
123 return error::UnexpectedInstructionReplySnafu {
124 mailbox_message: msg.to_string(),
125 reason: "expect single region flush result",
126 }
127 .fail();
128 }
129 let (reply_region_id, result) = &flush_reply.results[0];
130 if *reply_region_id != region_id {
131 return error::UnexpectedInstructionReplySnafu {
132 mailbox_message: msg.to_string(),
133 reason: "flush reply region ID mismatch",
134 }
135 .fail();
136 }
137 match result {
138 Ok(()) => (true, None),
139 Err(err) => (false, Some(err.clone())),
140 }
141 }
142 _ => {
143 return error::UnexpectedInstructionReplySnafu {
144 mailbox_message: msg.to_string(),
145 reason: "expect flush region reply",
146 }
147 .fail();
148 }
149 };
150 let (result, error) = reply_result;
151
152 if error.is_some() {
153 warn!(
154 "Failed to flush leader region {} on datanode {:?}, error: {:?}. Skip flush operation.",
155 region_id, leader, error
156 );
157 } else if result {
158 info!(
159 "The flush leader region {} on datanode {:?} is successful, elapsed: {:?}",
160 region_id,
161 leader,
162 now.elapsed()
163 );
164 }
165
166 Ok(())
167 }
168 Err(Error::MailboxTimeout { .. }) => error::ExceededDeadlineSnafu {
169 operation: "Flush leader region",
170 }
171 .fail(),
172 Err(err) => Err(err),
173 },
174 Err(Error::PusherNotFound { .. }) => {
175 warn!(
176 "Failed to flush leader region({}), the datanode({}) is unreachable(PusherNotFound). Skip flush operation.",
177 region_id,
178 leader
179 );
180 Ok(())
181 }
182 Err(err) => Err(err),
183 }
184 }
185}
186
187#[cfg(test)]
188mod tests {
189 use std::assert_matches::assert_matches;
190
191 use store_api::storage::RegionId;
192
193 use super::*;
194 use crate::procedure::region_migration::test_util::{self, new_procedure_context, TestingEnv};
195 use crate::procedure::region_migration::{ContextFactory, PersistentContext};
196 use crate::procedure::test_util::{
197 new_close_region_reply, new_flush_region_reply_for_region, send_mock_reply,
198 };
199
200 fn new_persistent_context() -> PersistentContext {
201 test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
202 }
203
204 #[tokio::test]
205 async fn test_datanode_is_unreachable() {
206 let state = PreFlushRegion;
207 let persistent_context = new_persistent_context();
210 let env = TestingEnv::new();
211 let mut ctx = env.context_factory().new_context(persistent_context);
212 state.flush_region(&mut ctx).await.unwrap();
214 }
215
216 #[tokio::test]
217 async fn test_unexpected_instruction_reply() {
218 common_telemetry::init_default_ut_logging();
219 let state = PreFlushRegion;
220 let persistent_context = new_persistent_context();
223 let from_peer_id = persistent_context.from_peer.id;
224 let mut env = TestingEnv::new();
225 let mut ctx = env.context_factory().new_context(persistent_context);
226 let mailbox_ctx = env.mailbox_context();
227 let mailbox = mailbox_ctx.mailbox().clone();
228 let (tx, rx) = tokio::sync::mpsc::channel(1);
229 mailbox_ctx
230 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
231 .await;
232 send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
234 let err = state.flush_region(&mut ctx).await.unwrap_err();
235 assert_matches!(err, Error::UnexpectedInstructionReply { .. });
236 assert!(!err.is_retryable());
237 }
238
239 #[tokio::test]
240 async fn test_instruction_exceeded_deadline() {
241 let state = PreFlushRegion;
242 let persistent_context = new_persistent_context();
245 let from_peer_id = persistent_context.from_peer.id;
246 let mut env = TestingEnv::new();
247 let mut ctx = env.context_factory().new_context(persistent_context);
248 let mailbox_ctx = env.mailbox_context();
249 let mailbox = mailbox_ctx.mailbox().clone();
250 let (tx, rx) = tokio::sync::mpsc::channel(1);
251 mailbox_ctx
252 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
253 .await;
254 send_mock_reply(mailbox, rx, |id| {
256 Err(error::MailboxTimeoutSnafu { id }.build())
257 });
258
259 let err = state.flush_region(&mut ctx).await.unwrap_err();
260 assert_matches!(err, Error::ExceededDeadline { .. });
261 assert!(!err.is_retryable());
262 }
263
264 #[tokio::test]
265 async fn test_flush_region_failed() {
266 common_telemetry::init_default_ut_logging();
267 let state = PreFlushRegion;
268 let persistent_context = new_persistent_context();
271 let from_peer_id = persistent_context.from_peer.id;
272 let region_id = persistent_context.region_id;
273 let mut env = TestingEnv::new();
274 let mut ctx = env.context_factory().new_context(persistent_context);
275 let mailbox_ctx = env.mailbox_context();
276 let mailbox = mailbox_ctx.mailbox().clone();
277 let (tx, rx) = tokio::sync::mpsc::channel(1);
278 mailbox_ctx
279 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
280 .await;
281 send_mock_reply(mailbox, rx, move |id| {
282 Ok(new_flush_region_reply_for_region(
283 id,
284 region_id,
285 false,
286 Some("test mocked".to_string()),
287 ))
288 });
289 state.flush_region(&mut ctx).await.unwrap();
291 }
292
293 #[tokio::test]
294 async fn test_next_update_metadata_downgrade_state() {
295 common_telemetry::init_default_ut_logging();
296 let mut state = PreFlushRegion;
297 let persistent_context = new_persistent_context();
300 let from_peer_id = persistent_context.from_peer.id;
301 let region_id = persistent_context.region_id;
302 let mut env = TestingEnv::new();
303 let mut ctx = env.context_factory().new_context(persistent_context);
304 let mailbox_ctx = env.mailbox_context();
305 let mailbox = mailbox_ctx.mailbox().clone();
306 let (tx, rx) = tokio::sync::mpsc::channel(1);
307 mailbox_ctx
308 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
309 .await;
310 send_mock_reply(mailbox, rx, move |id| {
311 Ok(new_flush_region_reply_for_region(id, region_id, true, None))
312 });
313 let procedure_ctx = new_procedure_context();
314 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
315
316 let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
317 assert_matches!(update_metadata, UpdateMetadata::Downgrade);
318 }
319}