meta_srv/procedure/region_migration/
open_candidate_region.rs1use std::any::Any;
16use std::ops::Div;
17
18use api::v1::meta::MailboxMessage;
19use common_meta::RegionIdent;
20use common_meta::distributed_time_constants::default_distributed_time_constants;
21use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
22use common_meta::key::datanode_table::RegionInfo;
23use common_procedure::{Context as ProcedureContext, Status};
24use common_telemetry::info;
25use common_telemetry::tracing_context::TracingContext;
26use serde::{Deserialize, Serialize};
27use snafu::{OptionExt, ResultExt};
28use store_api::region_engine::RegionRole;
29use tokio::time::Instant;
30
31use crate::error::{self, Result};
32use crate::handler::HeartbeatMailbox;
33use crate::procedure::region_migration::flush_leader_region::PreFlushRegion;
34use crate::procedure::region_migration::{Context, State};
35use crate::service::mailbox::Channel;
36
37#[derive(Debug, Serialize, Deserialize)]
38pub struct OpenCandidateRegion;
39
40#[async_trait::async_trait]
41#[typetag::serde]
42impl State for OpenCandidateRegion {
43 async fn next(
44 &mut self,
45 ctx: &mut Context,
46 _procedure_ctx: &ProcedureContext,
47 ) -> Result<(Box<dyn State>, Status)> {
48 let instruction = self.build_open_region_instruction(ctx).await?;
49 let now = Instant::now();
50 self.open_candidate_region(ctx, instruction).await?;
51 ctx.update_open_candidate_region_elapsed(now);
52
53 Ok((Box::new(PreFlushRegion), Status::executing(false)))
54 }
55
56 fn as_any(&self) -> &dyn Any {
57 self
58 }
59}
60
61impl OpenCandidateRegion {
62 async fn build_open_region_instruction(&self, ctx: &mut Context) -> Result<Instruction> {
67 let region_ids = ctx.persistent_ctx.region_ids.clone();
68 let from_peer_id = ctx.persistent_ctx.from_peer.id;
69 let to_peer_id = ctx.persistent_ctx.to_peer.id;
70 let datanode_table_values = ctx.get_from_peer_datanode_table_values().await?;
71 let mut open_regions = Vec::with_capacity(region_ids.len());
72
73 for region_id in region_ids {
74 let table_id = region_id.table_id();
75 let region_number = region_id.region_number();
76 let datanode_table_value = datanode_table_values.get(&table_id).context(
77 error::DatanodeTableNotFoundSnafu {
78 table_id,
79 datanode_id: from_peer_id,
80 },
81 )?;
82 let RegionInfo {
83 region_storage_path,
84 region_options,
85 region_wal_options,
86 engine,
87 } = datanode_table_value.region_info.clone();
88
89 open_regions.push(OpenRegion::new(
90 RegionIdent {
91 datanode_id: to_peer_id,
92 table_id,
93 region_number,
94 engine,
95 },
96 ®ion_storage_path,
97 region_options,
98 region_wal_options,
99 true,
100 ));
101 }
102
103 Ok(Instruction::OpenRegions(open_regions))
104 }
105
106 async fn open_candidate_region(
117 &self,
118 ctx: &mut Context,
119 open_instruction: Instruction,
120 ) -> Result<()> {
121 let pc = &ctx.persistent_ctx;
122 let vc = &mut ctx.volatile_ctx;
123 let region_ids = &pc.region_ids;
124 let candidate = &pc.to_peer;
125
126 if vc.opening_region_guards.is_empty() {
129 for region_id in region_ids {
130 let guard = ctx
132 .opening_region_keeper
133 .register_with_role(candidate.id, *region_id, RegionRole::Follower)
134 .context(error::RegionOperatingRaceSnafu {
135 peer_id: candidate.id,
136 region_id: *region_id,
137 })?;
138 vc.opening_region_guards.push(guard);
139 }
140 }
141
142 let tracing_ctx = TracingContext::from_current_span();
143 let msg = MailboxMessage::json_message(
144 &format!("Open candidate regions: {:?}", region_ids),
145 &format!("Metasrv@{}", ctx.server_addr()),
146 &format!("Datanode-{}@{}", candidate.id, candidate.addr),
147 common_time::util::current_time_millis(),
148 &open_instruction,
149 Some(tracing_ctx.to_w3c()),
150 )
151 .with_context(|_| error::SerializeToJsonSnafu {
152 input: open_instruction.to_string(),
153 })?;
154
155 let operation_timeout =
156 ctx.next_operation_timeout()
157 .context(error::ExceededDeadlineSnafu {
158 operation: "Open candidate region",
159 })?;
160 let operation_timeout = operation_timeout
161 .div(2)
162 .max(default_distributed_time_constants().region_lease);
163 let ch = Channel::Datanode(candidate.id);
164 let now = Instant::now();
165 let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
166
167 match receiver.await {
168 Ok(msg) => {
169 let reply = HeartbeatMailbox::json_reply(&msg)?;
170 info!(
171 "Received open region reply: {:?}, region: {:?}, elapsed: {:?}",
172 reply,
173 region_ids,
174 now.elapsed()
175 );
176 let InstructionReply::OpenRegions(SimpleReply { result, error }) = reply else {
177 return error::UnexpectedInstructionReplySnafu {
178 mailbox_message: msg.to_string(),
179 reason: "expect open region reply",
180 }
181 .fail();
182 };
183
184 if result {
185 Ok(())
186 } else {
187 error::RetryLaterSnafu {
188 reason: format!(
189 "Region {region_ids:?} is not opened by datanode {:?}, error: {error:?}, elapsed: {:?}",
190 candidate,
191 now.elapsed()
192 ),
193 }
194 .fail()
195 }
196 }
197 Err(error::Error::MailboxTimeout { .. }) => {
198 let reason = format!(
199 "Mailbox received timeout for open candidate region {region_ids:?} on datanode {:?}, elapsed: {:?}",
200 candidate,
201 now.elapsed()
202 );
203 error::RetryLaterSnafu { reason }.fail()
204 }
205 Err(e) => Err(e),
206 }
207 }
208}
209
210#[cfg(test)]
211mod tests {
212 use std::assert_matches;
213 use std::collections::HashMap;
214
215 use common_catalog::consts::MITO2_ENGINE;
216 use common_meta::DatanodeId;
217 use common_meta::key::table_route::TableRouteValue;
218 use common_meta::key::test_utils::new_test_table_info;
219 use common_meta::peer::Peer;
220 use common_meta::rpc::router::{Region, RegionRoute};
221 use store_api::storage::RegionId;
222
223 use super::*;
224 use crate::error::Error;
225 use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
226 use crate::procedure::region_migration::{ContextFactory, PersistentContext};
227 use crate::procedure::test_util::{
228 new_close_region_reply, new_open_region_reply, send_mock_reply,
229 };
230
231 fn new_persistent_context() -> PersistentContext {
232 test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
233 }
234
235 fn new_mock_open_instruction(datanode_id: DatanodeId, region_id: RegionId) -> Instruction {
236 Instruction::OpenRegions(vec![OpenRegion {
237 region_ident: RegionIdent {
238 datanode_id,
239 table_id: region_id.table_id(),
240 region_number: region_id.region_number(),
241 engine: MITO2_ENGINE.to_string(),
242 },
243 region_storage_path: "/bar/foo/region/".to_string(),
244 region_options: Default::default(),
245 region_wal_options: Default::default(),
246 skip_wal_replay: true,
247 }])
248 }
249
250 #[tokio::test]
251 async fn test_datanode_table_is_not_found_error() {
252 let state = OpenCandidateRegion;
253 let persistent_context = new_persistent_context();
254 let env = TestingEnv::new();
255 let mut ctx = env.context_factory().new_context(persistent_context);
256
257 let err = state
258 .build_open_region_instruction(&mut ctx)
259 .await
260 .unwrap_err();
261
262 assert_matches!(err, Error::DatanodeTableNotFound { .. });
263 assert!(!err.is_retryable());
264 }
265
266 #[tokio::test]
267 async fn test_datanode_is_unreachable() {
268 let state = OpenCandidateRegion;
269 let persistent_context = new_persistent_context();
272 let region_id = persistent_context.region_ids[0];
273 let to_peer_id = persistent_context.to_peer.id;
274 let env = TestingEnv::new();
275 let mut ctx = env.context_factory().new_context(persistent_context);
276
277 let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
278 let err = state
279 .open_candidate_region(&mut ctx, open_instruction)
280 .await
281 .unwrap_err();
282
283 assert_matches!(err, Error::PusherNotFound { .. });
284 assert!(!err.is_retryable());
285 }
286
287 #[tokio::test]
288 async fn test_candidate_region_opening_error() {
289 let state = OpenCandidateRegion;
290 let persistent_context = new_persistent_context();
293 let region_id = persistent_context.region_ids[0];
294 let to_peer_id = persistent_context.to_peer.id;
295
296 let env = TestingEnv::new();
297 let mut ctx = env.context_factory().new_context(persistent_context);
298 let opening_region_keeper = env.opening_region_keeper();
299 let _guard = opening_region_keeper
300 .register_with_role(to_peer_id, region_id, RegionRole::Follower)
301 .unwrap();
302
303 let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
304 let err = state
305 .open_candidate_region(&mut ctx, open_instruction)
306 .await
307 .unwrap_err();
308
309 assert_matches!(err, Error::RegionOperatingRace { .. });
310 assert!(!err.is_retryable());
311 }
312
313 #[tokio::test]
314 async fn test_unexpected_instruction_reply() {
315 let state = OpenCandidateRegion;
316 let persistent_context = new_persistent_context();
319 let region_id = persistent_context.region_ids[0];
320 let to_peer_id = persistent_context.to_peer.id;
321
322 let mut env = TestingEnv::new();
323 let mut ctx = env.context_factory().new_context(persistent_context);
324 let mailbox_ctx = env.mailbox_context();
325 let mailbox = mailbox_ctx.mailbox().clone();
326
327 let (tx, rx) = tokio::sync::mpsc::channel(1);
328
329 mailbox_ctx
330 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
331 .await;
332
333 send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
335
336 let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
337 let err = state
338 .open_candidate_region(&mut ctx, open_instruction)
339 .await
340 .unwrap_err();
341
342 assert_matches!(err, Error::UnexpectedInstructionReply { .. });
343 assert!(!err.is_retryable());
344 }
345
346 #[tokio::test]
347 async fn test_instruction_exceeded_deadline() {
348 let state = OpenCandidateRegion;
349 let persistent_context = new_persistent_context();
352 let region_id = persistent_context.region_ids[0];
353 let to_peer_id = persistent_context.to_peer.id;
354
355 let mut env = TestingEnv::new();
356 let mut ctx = env.context_factory().new_context(persistent_context);
357 let mailbox_ctx = env.mailbox_context();
358 let mailbox = mailbox_ctx.mailbox().clone();
359
360 let (tx, rx) = tokio::sync::mpsc::channel(1);
361
362 mailbox_ctx
363 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
364 .await;
365
366 send_mock_reply(mailbox, rx, |id| {
368 Err(error::MailboxTimeoutSnafu { id }.build())
369 });
370
371 let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
372 let err = state
373 .open_candidate_region(&mut ctx, open_instruction)
374 .await
375 .unwrap_err();
376
377 assert_matches!(err, Error::RetryLater { .. });
378 assert!(err.is_retryable());
379 }
380
381 #[tokio::test]
382 async fn test_open_candidate_region_failed() {
383 let state = OpenCandidateRegion;
384 let persistent_context = new_persistent_context();
387 let region_id = persistent_context.region_ids[0];
388 let to_peer_id = persistent_context.to_peer.id;
389 let mut env = TestingEnv::new();
390
391 let mut ctx = env.context_factory().new_context(persistent_context);
392 let mailbox_ctx = env.mailbox_context();
393 let mailbox = mailbox_ctx.mailbox().clone();
394
395 let (tx, rx) = tokio::sync::mpsc::channel(1);
396
397 mailbox_ctx
398 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
399 .await;
400
401 send_mock_reply(mailbox, rx, |id| {
402 Ok(new_open_region_reply(
403 id,
404 false,
405 Some("test mocked".to_string()),
406 ))
407 });
408
409 let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
410 let err = state
411 .open_candidate_region(&mut ctx, open_instruction)
412 .await
413 .unwrap_err();
414
415 assert_matches!(err, Error::RetryLater { .. });
416 assert!(err.is_retryable());
417 assert!(format!("{err:?}").contains("test mocked"));
418 }
419
420 #[tokio::test]
421 async fn test_next_flush_leader_region_state() {
422 let mut state = Box::new(OpenCandidateRegion);
423 let persistent_context = new_persistent_context();
426 let from_peer_id = persistent_context.from_peer.id;
427 let region_id = persistent_context.region_ids[0];
428 let to_peer_id = persistent_context.to_peer.id;
429 let mut env = TestingEnv::new();
430
431 let table_info = new_test_table_info(1024);
433 let region_routes = vec![RegionRoute {
434 region: Region::new_test(region_id),
435 leader_peer: Some(Peer::empty(from_peer_id)),
436 ..Default::default()
437 }];
438
439 env.table_metadata_manager()
440 .create_table_metadata(
441 table_info,
442 TableRouteValue::physical(region_routes),
443 HashMap::default(),
444 )
445 .await
446 .unwrap();
447
448 let mut ctx = env.context_factory().new_context(persistent_context);
449 let mailbox_ctx = env.mailbox_context();
450 let mailbox = mailbox_ctx.mailbox().clone();
451
452 let (tx, rx) = tokio::sync::mpsc::channel(1);
453
454 mailbox_ctx
455 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
456 .await;
457
458 send_mock_reply(mailbox, rx, |id| Ok(new_open_region_reply(id, true, None)));
459 let procedure_ctx = new_procedure_context();
460 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
461 let vc = ctx.volatile_ctx;
462 assert_eq!(vc.opening_region_guards[0].info(), (to_peer_id, region_id));
463
464 let flush_leader_region = next.as_any().downcast_ref::<PreFlushRegion>().unwrap();
465 assert_matches!(flush_leader_region, PreFlushRegion);
466 }
467}