meta_srv/procedure/region_migration/
flush_leader_region.rs1use std::any::Any;
16
17use api::v1::meta::MailboxMessage;
18use common_meta::instruction::{FlushErrorStrategy, FlushRegions, Instruction, InstructionReply};
19use common_procedure::{Context as ProcedureContext, Status};
20use common_telemetry::{info, warn};
21use serde::{Deserialize, Serialize};
22use snafu::{OptionExt, ResultExt};
23use tokio::time::Instant;
24
25use crate::error::{self, Error, Result};
26use crate::handler::HeartbeatMailbox;
27use crate::procedure::region_migration::update_metadata::UpdateMetadata;
28use crate::procedure::region_migration::{Context, State};
29use crate::service::mailbox::Channel;
30
31#[derive(Debug, Serialize, Deserialize)]
35pub struct PreFlushRegion;
36
37#[async_trait::async_trait]
38#[typetag::serde]
39impl State for PreFlushRegion {
40 async fn next(
41 &mut self,
42 ctx: &mut Context,
43 _procedure_ctx: &ProcedureContext,
44 ) -> Result<(Box<dyn State>, Status)> {
45 let timer = Instant::now();
46 self.flush_region(ctx).await?;
47 ctx.update_flush_leader_region_elapsed(timer);
48 Ok((
53 Box::new(UpdateMetadata::Downgrade),
54 Status::executing(false),
55 ))
56 }
57
58 fn as_any(&self) -> &dyn Any {
59 self
60 }
61}
62
63impl PreFlushRegion {
64 fn build_flush_leader_region_instruction(&self, ctx: &Context) -> Instruction {
66 let pc = &ctx.persistent_ctx;
67 Instruction::FlushRegions(FlushRegions::sync_batch(
68 pc.region_ids.clone(),
69 FlushErrorStrategy::TryAll,
70 ))
71 }
72
73 async fn flush_region(&self, ctx: &mut Context) -> Result<()> {
87 let operation_timeout =
88 ctx.next_operation_timeout()
89 .context(error::ExceededDeadlineSnafu {
90 operation: "Flush leader region",
91 })?;
92 let flush_instruction = self.build_flush_leader_region_instruction(ctx);
93 let region_ids = &ctx.persistent_ctx.region_ids;
94 let leader = &ctx.persistent_ctx.from_peer;
95
96 let msg = MailboxMessage::json_message(
97 &format!("Flush leader region: {:?}", region_ids),
98 &format!("Metasrv@{}", ctx.server_addr()),
99 &format!("Datanode-{}@{}", leader.id, leader.addr),
100 common_time::util::current_time_millis(),
101 &flush_instruction,
102 )
103 .with_context(|_| error::SerializeToJsonSnafu {
104 input: flush_instruction.to_string(),
105 })?;
106
107 let ch = Channel::Datanode(leader.id);
108 let now = Instant::now();
109 let result = ctx.mailbox.send(&ch, msg, operation_timeout).await;
110
111 match result {
112 Ok(receiver) => match receiver.await {
113 Ok(msg) => {
114 let reply = HeartbeatMailbox::json_reply(&msg)?;
115 info!(
116 "Received flush leader region reply: {:?}, region: {:?}, elapsed: {:?}",
117 reply,
118 region_ids,
119 now.elapsed()
120 );
121
122 let reply_result = match reply {
123 InstructionReply::FlushRegions(flush_reply) => {
124 if flush_reply.results.len() != region_ids.len() {
125 return error::UnexpectedInstructionReplySnafu {
126 mailbox_message: msg.to_string(),
127 reason: format!(
128 "expect {} region flush result, but got {}",
129 region_ids.len(),
130 flush_reply.results.len()
131 ),
132 }
133 .fail();
134 }
135
136 match flush_reply.overall_success {
137 true => (true, None),
138 false => (
139 false,
140 Some(
141 flush_reply
142 .results
143 .iter()
144 .filter_map(|(region_id, result)| match result {
145 Ok(_) => None,
146 Err(e) => Some(format!("{}: {}", region_id, e)),
147 })
148 .collect::<Vec<String>>()
149 .join("; "),
150 ),
151 ),
152 }
153 }
154 _ => {
155 return error::UnexpectedInstructionReplySnafu {
156 mailbox_message: msg.to_string(),
157 reason: "expect flush region reply",
158 }
159 .fail();
160 }
161 };
162 let (result, error) = reply_result;
163
164 if let Some(error) = error {
165 warn!(
166 "Failed to flush leader regions {:?} on datanode {:?}, error: {}. Skip flush operation.",
167 region_ids, leader, &error
168 );
169 } else if result {
170 info!(
171 "The flush leader regions {:?} on datanode {:?} is successful, elapsed: {:?}",
172 region_ids,
173 leader,
174 now.elapsed()
175 );
176 }
177
178 Ok(())
179 }
180 Err(Error::MailboxTimeout { .. }) => error::ExceededDeadlineSnafu {
181 operation: "Flush leader regions",
182 }
183 .fail(),
184 Err(err) => Err(err),
185 },
186 Err(Error::PusherNotFound { .. }) => {
187 warn!(
188 "Failed to flush leader regions({:?}), the datanode({}) is unreachable(PusherNotFound). Skip flush operation.",
189 region_ids, leader
190 );
191 Ok(())
192 }
193 Err(err) => Err(err),
194 }
195 }
196}
197
198#[cfg(test)]
199mod tests {
200 use std::assert_matches::assert_matches;
201
202 use store_api::storage::RegionId;
203
204 use super::*;
205 use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
206 use crate::procedure::region_migration::{ContextFactory, PersistentContext};
207 use crate::procedure::test_util::{
208 new_close_region_reply, new_flush_region_reply_for_region, send_mock_reply,
209 };
210
211 fn new_persistent_context() -> PersistentContext {
212 test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
213 }
214
215 #[tokio::test]
216 async fn test_datanode_is_unreachable() {
217 let state = PreFlushRegion;
218 let persistent_context = new_persistent_context();
221 let env = TestingEnv::new();
222 let mut ctx = env.context_factory().new_context(persistent_context);
223 state.flush_region(&mut ctx).await.unwrap();
225 }
226
227 #[tokio::test]
228 async fn test_unexpected_instruction_reply() {
229 common_telemetry::init_default_ut_logging();
230 let state = PreFlushRegion;
231 let persistent_context = new_persistent_context();
234 let from_peer_id = persistent_context.from_peer.id;
235 let mut env = TestingEnv::new();
236 let mut ctx = env.context_factory().new_context(persistent_context);
237 let mailbox_ctx = env.mailbox_context();
238 let mailbox = mailbox_ctx.mailbox().clone();
239 let (tx, rx) = tokio::sync::mpsc::channel(1);
240 mailbox_ctx
241 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
242 .await;
243 send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
245 let err = state.flush_region(&mut ctx).await.unwrap_err();
246 assert_matches!(err, Error::UnexpectedInstructionReply { .. });
247 assert!(!err.is_retryable());
248 }
249
250 #[tokio::test]
251 async fn test_instruction_exceeded_deadline() {
252 let state = PreFlushRegion;
253 let persistent_context = new_persistent_context();
256 let from_peer_id = persistent_context.from_peer.id;
257 let mut env = TestingEnv::new();
258 let mut ctx = env.context_factory().new_context(persistent_context);
259 let mailbox_ctx = env.mailbox_context();
260 let mailbox = mailbox_ctx.mailbox().clone();
261 let (tx, rx) = tokio::sync::mpsc::channel(1);
262 mailbox_ctx
263 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
264 .await;
265 send_mock_reply(mailbox, rx, |id| {
267 Err(error::MailboxTimeoutSnafu { id }.build())
268 });
269
270 let err = state.flush_region(&mut ctx).await.unwrap_err();
271 assert_matches!(err, Error::ExceededDeadline { .. });
272 assert!(!err.is_retryable());
273 }
274
275 #[tokio::test]
276 async fn test_flush_region_failed() {
277 common_telemetry::init_default_ut_logging();
278 let state = PreFlushRegion;
279 let persistent_context = new_persistent_context();
282 let from_peer_id = persistent_context.from_peer.id;
283 let region_id = persistent_context.region_ids[0];
284 let mut env = TestingEnv::new();
285 let mut ctx = env.context_factory().new_context(persistent_context);
286 let mailbox_ctx = env.mailbox_context();
287 let mailbox = mailbox_ctx.mailbox().clone();
288 let (tx, rx) = tokio::sync::mpsc::channel(1);
289 mailbox_ctx
290 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
291 .await;
292 send_mock_reply(mailbox, rx, move |id| {
293 Ok(new_flush_region_reply_for_region(
294 id,
295 region_id,
296 false,
297 Some("test mocked".to_string()),
298 ))
299 });
300 state.flush_region(&mut ctx).await.unwrap();
302 }
303
304 #[tokio::test]
305 async fn test_next_update_metadata_downgrade_state() {
306 common_telemetry::init_default_ut_logging();
307 let mut state = PreFlushRegion;
308 let persistent_context = new_persistent_context();
311 let from_peer_id = persistent_context.from_peer.id;
312 let region_id = persistent_context.region_ids[0];
313 let mut env = TestingEnv::new();
314 let mut ctx = env.context_factory().new_context(persistent_context);
315 let mailbox_ctx = env.mailbox_context();
316 let mailbox = mailbox_ctx.mailbox().clone();
317 let (tx, rx) = tokio::sync::mpsc::channel(1);
318 mailbox_ctx
319 .insert_heartbeat_response_receiver(Channel::Datanode(from_peer_id), tx)
320 .await;
321 send_mock_reply(mailbox, rx, move |id| {
322 Ok(new_flush_region_reply_for_region(id, region_id, true, None))
323 });
324 let procedure_ctx = new_procedure_context();
325 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
326
327 let update_metadata = next.as_any().downcast_ref::<UpdateMetadata>().unwrap();
328 assert_matches!(update_metadata, UpdateMetadata::Downgrade);
329 }
330}