meta_srv/procedure/region_migration/
open_candidate_region.rs1use std::any::Any;
16use std::ops::Div;
17
18use api::v1::meta::MailboxMessage;
19use common_meta::RegionIdent;
20use common_meta::distributed_time_constants::default_distributed_time_constants;
21use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
22use common_meta::key::datanode_table::RegionInfo;
23use common_procedure::{Context as ProcedureContext, Status};
24use common_telemetry::info;
25use common_telemetry::tracing_context::TracingContext;
26use serde::{Deserialize, Serialize};
27use snafu::{OptionExt, ResultExt};
28use tokio::time::Instant;
29
30use crate::error::{self, Result};
31use crate::handler::HeartbeatMailbox;
32use crate::procedure::region_migration::flush_leader_region::PreFlushRegion;
33use crate::procedure::region_migration::{Context, State};
34use crate::service::mailbox::Channel;
35
36#[derive(Debug, Serialize, Deserialize)]
37pub struct OpenCandidateRegion;
38
39#[async_trait::async_trait]
40#[typetag::serde]
41impl State for OpenCandidateRegion {
42 async fn next(
43 &mut self,
44 ctx: &mut Context,
45 _procedure_ctx: &ProcedureContext,
46 ) -> Result<(Box<dyn State>, Status)> {
47 let instruction = self.build_open_region_instruction(ctx).await?;
48 let now = Instant::now();
49 self.open_candidate_region(ctx, instruction).await?;
50 ctx.update_open_candidate_region_elapsed(now);
51
52 Ok((Box::new(PreFlushRegion), Status::executing(false)))
53 }
54
55 fn as_any(&self) -> &dyn Any {
56 self
57 }
58}
59
60impl OpenCandidateRegion {
61 async fn build_open_region_instruction(&self, ctx: &mut Context) -> Result<Instruction> {
66 let region_ids = ctx.persistent_ctx.region_ids.clone();
67 let from_peer_id = ctx.persistent_ctx.from_peer.id;
68 let to_peer_id = ctx.persistent_ctx.to_peer.id;
69 let datanode_table_values = ctx.get_from_peer_datanode_table_values().await?;
70 let mut open_regions = Vec::with_capacity(region_ids.len());
71
72 for region_id in region_ids {
73 let table_id = region_id.table_id();
74 let region_number = region_id.region_number();
75 let datanode_table_value = datanode_table_values.get(&table_id).context(
76 error::DatanodeTableNotFoundSnafu {
77 table_id,
78 datanode_id: from_peer_id,
79 },
80 )?;
81 let RegionInfo {
82 region_storage_path,
83 region_options,
84 region_wal_options,
85 engine,
86 } = datanode_table_value.region_info.clone();
87
88 open_regions.push(OpenRegion::new(
89 RegionIdent {
90 datanode_id: to_peer_id,
91 table_id,
92 region_number,
93 engine,
94 },
95 ®ion_storage_path,
96 region_options,
97 region_wal_options,
98 true,
99 ));
100 }
101
102 Ok(Instruction::OpenRegions(open_regions))
103 }
104
105 async fn open_candidate_region(
116 &self,
117 ctx: &mut Context,
118 open_instruction: Instruction,
119 ) -> Result<()> {
120 let pc = &ctx.persistent_ctx;
121 let vc = &mut ctx.volatile_ctx;
122 let region_ids = &pc.region_ids;
123 let candidate = &pc.to_peer;
124
125 if vc.opening_region_guards.is_empty() {
128 for region_id in region_ids {
129 let guard = ctx
131 .opening_region_keeper
132 .register(candidate.id, *region_id)
133 .context(error::RegionOperatingRaceSnafu {
134 peer_id: candidate.id,
135 region_id: *region_id,
136 })?;
137 vc.opening_region_guards.push(guard);
138 }
139 }
140
141 let tracing_ctx = TracingContext::from_current_span();
142 let msg = MailboxMessage::json_message(
143 &format!("Open candidate regions: {:?}", region_ids),
144 &format!("Metasrv@{}", ctx.server_addr()),
145 &format!("Datanode-{}@{}", candidate.id, candidate.addr),
146 common_time::util::current_time_millis(),
147 &open_instruction,
148 Some(tracing_ctx.to_w3c()),
149 )
150 .with_context(|_| error::SerializeToJsonSnafu {
151 input: open_instruction.to_string(),
152 })?;
153
154 let operation_timeout =
155 ctx.next_operation_timeout()
156 .context(error::ExceededDeadlineSnafu {
157 operation: "Open candidate region",
158 })?;
159 let operation_timeout = operation_timeout
160 .div(2)
161 .max(default_distributed_time_constants().region_lease);
162 let ch = Channel::Datanode(candidate.id);
163 let now = Instant::now();
164 let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
165
166 match receiver.await {
167 Ok(msg) => {
168 let reply = HeartbeatMailbox::json_reply(&msg)?;
169 info!(
170 "Received open region reply: {:?}, region: {:?}, elapsed: {:?}",
171 reply,
172 region_ids,
173 now.elapsed()
174 );
175 let InstructionReply::OpenRegions(SimpleReply { result, error }) = reply else {
176 return error::UnexpectedInstructionReplySnafu {
177 mailbox_message: msg.to_string(),
178 reason: "expect open region reply",
179 }
180 .fail();
181 };
182
183 if result {
184 Ok(())
185 } else {
186 error::RetryLaterSnafu {
187 reason: format!(
188 "Region {region_ids:?} is not opened by datanode {:?}, error: {error:?}, elapsed: {:?}",
189 candidate,
190 now.elapsed()
191 ),
192 }
193 .fail()
194 }
195 }
196 Err(error::Error::MailboxTimeout { .. }) => {
197 let reason = format!(
198 "Mailbox received timeout for open candidate region {region_ids:?} on datanode {:?}, elapsed: {:?}",
199 candidate,
200 now.elapsed()
201 );
202 error::RetryLaterSnafu { reason }.fail()
203 }
204 Err(e) => Err(e),
205 }
206 }
207}
208
209#[cfg(test)]
210mod tests {
211 use std::assert_matches::assert_matches;
212 use std::collections::HashMap;
213
214 use common_catalog::consts::MITO2_ENGINE;
215 use common_meta::DatanodeId;
216 use common_meta::key::table_route::TableRouteValue;
217 use common_meta::key::test_utils::new_test_table_info;
218 use common_meta::peer::Peer;
219 use common_meta::rpc::router::{Region, RegionRoute};
220 use store_api::storage::RegionId;
221
222 use super::*;
223 use crate::error::Error;
224 use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
225 use crate::procedure::region_migration::{ContextFactory, PersistentContext};
226 use crate::procedure::test_util::{
227 new_close_region_reply, new_open_region_reply, send_mock_reply,
228 };
229
230 fn new_persistent_context() -> PersistentContext {
231 test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
232 }
233
234 fn new_mock_open_instruction(datanode_id: DatanodeId, region_id: RegionId) -> Instruction {
235 Instruction::OpenRegions(vec![OpenRegion {
236 region_ident: RegionIdent {
237 datanode_id,
238 table_id: region_id.table_id(),
239 region_number: region_id.region_number(),
240 engine: MITO2_ENGINE.to_string(),
241 },
242 region_storage_path: "/bar/foo/region/".to_string(),
243 region_options: Default::default(),
244 region_wal_options: Default::default(),
245 skip_wal_replay: true,
246 }])
247 }
248
249 #[tokio::test]
250 async fn test_datanode_table_is_not_found_error() {
251 let state = OpenCandidateRegion;
252 let persistent_context = new_persistent_context();
253 let env = TestingEnv::new();
254 let mut ctx = env.context_factory().new_context(persistent_context);
255
256 let err = state
257 .build_open_region_instruction(&mut ctx)
258 .await
259 .unwrap_err();
260
261 assert_matches!(err, Error::DatanodeTableNotFound { .. });
262 assert!(!err.is_retryable());
263 }
264
265 #[tokio::test]
266 async fn test_datanode_is_unreachable() {
267 let state = OpenCandidateRegion;
268 let persistent_context = new_persistent_context();
271 let region_id = persistent_context.region_ids[0];
272 let to_peer_id = persistent_context.to_peer.id;
273 let env = TestingEnv::new();
274 let mut ctx = env.context_factory().new_context(persistent_context);
275
276 let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
277 let err = state
278 .open_candidate_region(&mut ctx, open_instruction)
279 .await
280 .unwrap_err();
281
282 assert_matches!(err, Error::PusherNotFound { .. });
283 assert!(!err.is_retryable());
284 }
285
286 #[tokio::test]
287 async fn test_candidate_region_opening_error() {
288 let state = OpenCandidateRegion;
289 let persistent_context = new_persistent_context();
292 let region_id = persistent_context.region_ids[0];
293 let to_peer_id = persistent_context.to_peer.id;
294
295 let env = TestingEnv::new();
296 let mut ctx = env.context_factory().new_context(persistent_context);
297 let opening_region_keeper = env.opening_region_keeper();
298 let _guard = opening_region_keeper
299 .register(to_peer_id, region_id)
300 .unwrap();
301
302 let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
303 let err = state
304 .open_candidate_region(&mut ctx, open_instruction)
305 .await
306 .unwrap_err();
307
308 assert_matches!(err, Error::RegionOperatingRace { .. });
309 assert!(!err.is_retryable());
310 }
311
312 #[tokio::test]
313 async fn test_unexpected_instruction_reply() {
314 let state = OpenCandidateRegion;
315 let persistent_context = new_persistent_context();
318 let region_id = persistent_context.region_ids[0];
319 let to_peer_id = persistent_context.to_peer.id;
320
321 let mut env = TestingEnv::new();
322 let mut ctx = env.context_factory().new_context(persistent_context);
323 let mailbox_ctx = env.mailbox_context();
324 let mailbox = mailbox_ctx.mailbox().clone();
325
326 let (tx, rx) = tokio::sync::mpsc::channel(1);
327
328 mailbox_ctx
329 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
330 .await;
331
332 send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
334
335 let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
336 let err = state
337 .open_candidate_region(&mut ctx, open_instruction)
338 .await
339 .unwrap_err();
340
341 assert_matches!(err, Error::UnexpectedInstructionReply { .. });
342 assert!(!err.is_retryable());
343 }
344
345 #[tokio::test]
346 async fn test_instruction_exceeded_deadline() {
347 let state = OpenCandidateRegion;
348 let persistent_context = new_persistent_context();
351 let region_id = persistent_context.region_ids[0];
352 let to_peer_id = persistent_context.to_peer.id;
353
354 let mut env = TestingEnv::new();
355 let mut ctx = env.context_factory().new_context(persistent_context);
356 let mailbox_ctx = env.mailbox_context();
357 let mailbox = mailbox_ctx.mailbox().clone();
358
359 let (tx, rx) = tokio::sync::mpsc::channel(1);
360
361 mailbox_ctx
362 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
363 .await;
364
365 send_mock_reply(mailbox, rx, |id| {
367 Err(error::MailboxTimeoutSnafu { id }.build())
368 });
369
370 let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
371 let err = state
372 .open_candidate_region(&mut ctx, open_instruction)
373 .await
374 .unwrap_err();
375
376 assert_matches!(err, Error::RetryLater { .. });
377 assert!(err.is_retryable());
378 }
379
380 #[tokio::test]
381 async fn test_open_candidate_region_failed() {
382 let state = OpenCandidateRegion;
383 let persistent_context = new_persistent_context();
386 let region_id = persistent_context.region_ids[0];
387 let to_peer_id = persistent_context.to_peer.id;
388 let mut env = TestingEnv::new();
389
390 let mut ctx = env.context_factory().new_context(persistent_context);
391 let mailbox_ctx = env.mailbox_context();
392 let mailbox = mailbox_ctx.mailbox().clone();
393
394 let (tx, rx) = tokio::sync::mpsc::channel(1);
395
396 mailbox_ctx
397 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
398 .await;
399
400 send_mock_reply(mailbox, rx, |id| {
401 Ok(new_open_region_reply(
402 id,
403 false,
404 Some("test mocked".to_string()),
405 ))
406 });
407
408 let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
409 let err = state
410 .open_candidate_region(&mut ctx, open_instruction)
411 .await
412 .unwrap_err();
413
414 assert_matches!(err, Error::RetryLater { .. });
415 assert!(err.is_retryable());
416 assert!(format!("{err:?}").contains("test mocked"));
417 }
418
419 #[tokio::test]
420 async fn test_next_flush_leader_region_state() {
421 let mut state = Box::new(OpenCandidateRegion);
422 let persistent_context = new_persistent_context();
425 let from_peer_id = persistent_context.from_peer.id;
426 let region_id = persistent_context.region_ids[0];
427 let to_peer_id = persistent_context.to_peer.id;
428 let mut env = TestingEnv::new();
429
430 let table_info = new_test_table_info(1024);
432 let region_routes = vec![RegionRoute {
433 region: Region::new_test(region_id),
434 leader_peer: Some(Peer::empty(from_peer_id)),
435 ..Default::default()
436 }];
437
438 env.table_metadata_manager()
439 .create_table_metadata(
440 table_info,
441 TableRouteValue::physical(region_routes),
442 HashMap::default(),
443 )
444 .await
445 .unwrap();
446
447 let mut ctx = env.context_factory().new_context(persistent_context);
448 let mailbox_ctx = env.mailbox_context();
449 let mailbox = mailbox_ctx.mailbox().clone();
450
451 let (tx, rx) = tokio::sync::mpsc::channel(1);
452
453 mailbox_ctx
454 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
455 .await;
456
457 send_mock_reply(mailbox, rx, |id| Ok(new_open_region_reply(id, true, None)));
458 let procedure_ctx = new_procedure_context();
459 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
460 let vc = ctx.volatile_ctx;
461 assert_eq!(vc.opening_region_guards[0].info(), (to_peer_id, region_id));
462
463 let flush_leader_region = next.as_any().downcast_ref::<PreFlushRegion>().unwrap();
464 assert_matches!(flush_leader_region, PreFlushRegion);
465 }
466}