meta_srv/procedure/region_migration/
open_candidate_region.rs1use std::any::Any;
16use std::ops::Div;
17
18use api::v1::meta::MailboxMessage;
19use common_meta::RegionIdent;
20use common_meta::distributed_time_constants::default_distributed_time_constants;
21use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
22use common_meta::key::datanode_table::RegionInfo;
23use common_procedure::{Context as ProcedureContext, Status};
24use common_telemetry::info;
25use serde::{Deserialize, Serialize};
26use snafu::{OptionExt, ResultExt};
27use tokio::time::Instant;
28
29use crate::error::{self, Result};
30use crate::handler::HeartbeatMailbox;
31use crate::procedure::region_migration::flush_leader_region::PreFlushRegion;
32use crate::procedure::region_migration::{Context, State};
33use crate::service::mailbox::Channel;
34
35#[derive(Debug, Serialize, Deserialize)]
36pub struct OpenCandidateRegion;
37
38#[async_trait::async_trait]
39#[typetag::serde]
40impl State for OpenCandidateRegion {
41 async fn next(
42 &mut self,
43 ctx: &mut Context,
44 _procedure_ctx: &ProcedureContext,
45 ) -> Result<(Box<dyn State>, Status)> {
46 let instruction = self.build_open_region_instruction(ctx).await?;
47 let now = Instant::now();
48 self.open_candidate_region(ctx, instruction).await?;
49 ctx.update_open_candidate_region_elapsed(now);
50
51 Ok((Box::new(PreFlushRegion), Status::executing(false)))
52 }
53
54 fn as_any(&self) -> &dyn Any {
55 self
56 }
57}
58
59impl OpenCandidateRegion {
60 async fn build_open_region_instruction(&self, ctx: &mut Context) -> Result<Instruction> {
65 let region_ids = ctx.persistent_ctx.region_ids.clone();
66 let from_peer_id = ctx.persistent_ctx.from_peer.id;
67 let to_peer_id = ctx.persistent_ctx.to_peer.id;
68 let datanode_table_values = ctx.get_from_peer_datanode_table_values().await?;
69 let mut open_regions = Vec::with_capacity(region_ids.len());
70
71 for region_id in region_ids {
72 let table_id = region_id.table_id();
73 let region_number = region_id.region_number();
74 let datanode_table_value = datanode_table_values.get(&table_id).context(
75 error::DatanodeTableNotFoundSnafu {
76 table_id,
77 datanode_id: from_peer_id,
78 },
79 )?;
80 let RegionInfo {
81 region_storage_path,
82 region_options,
83 region_wal_options,
84 engine,
85 } = datanode_table_value.region_info.clone();
86
87 open_regions.push(OpenRegion::new(
88 RegionIdent {
89 datanode_id: to_peer_id,
90 table_id,
91 region_number,
92 engine,
93 },
94 ®ion_storage_path,
95 region_options,
96 region_wal_options,
97 true,
98 ));
99 }
100
101 Ok(Instruction::OpenRegions(open_regions))
102 }
103
104 async fn open_candidate_region(
115 &self,
116 ctx: &mut Context,
117 open_instruction: Instruction,
118 ) -> Result<()> {
119 let pc = &ctx.persistent_ctx;
120 let vc = &mut ctx.volatile_ctx;
121 let region_ids = &pc.region_ids;
122 let candidate = &pc.to_peer;
123
124 if vc.opening_region_guards.is_empty() {
127 for region_id in region_ids {
128 let guard = ctx
130 .opening_region_keeper
131 .register(candidate.id, *region_id)
132 .context(error::RegionOpeningRaceSnafu {
133 peer_id: candidate.id,
134 region_id: *region_id,
135 })?;
136 vc.opening_region_guards.push(guard);
137 }
138 }
139
140 let msg = MailboxMessage::json_message(
141 &format!("Open candidate regions: {:?}", region_ids),
142 &format!("Metasrv@{}", ctx.server_addr()),
143 &format!("Datanode-{}@{}", candidate.id, candidate.addr),
144 common_time::util::current_time_millis(),
145 &open_instruction,
146 )
147 .with_context(|_| error::SerializeToJsonSnafu {
148 input: open_instruction.to_string(),
149 })?;
150
151 let operation_timeout =
152 ctx.next_operation_timeout()
153 .context(error::ExceededDeadlineSnafu {
154 operation: "Open candidate region",
155 })?;
156 let operation_timeout = operation_timeout
157 .div(2)
158 .max(default_distributed_time_constants().region_lease);
159 let ch = Channel::Datanode(candidate.id);
160 let now = Instant::now();
161 let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
162
163 match receiver.await {
164 Ok(msg) => {
165 let reply = HeartbeatMailbox::json_reply(&msg)?;
166 info!(
167 "Received open region reply: {:?}, region: {:?}, elapsed: {:?}",
168 reply,
169 region_ids,
170 now.elapsed()
171 );
172 let InstructionReply::OpenRegions(SimpleReply { result, error }) = reply else {
173 return error::UnexpectedInstructionReplySnafu {
174 mailbox_message: msg.to_string(),
175 reason: "expect open region reply",
176 }
177 .fail();
178 };
179
180 if result {
181 Ok(())
182 } else {
183 error::RetryLaterSnafu {
184 reason: format!(
185 "Region {region_ids:?} is not opened by datanode {:?}, error: {error:?}, elapsed: {:?}",
186 candidate,
187 now.elapsed()
188 ),
189 }
190 .fail()
191 }
192 }
193 Err(error::Error::MailboxTimeout { .. }) => {
194 let reason = format!(
195 "Mailbox received timeout for open candidate region {region_ids:?} on datanode {:?}, elapsed: {:?}",
196 candidate,
197 now.elapsed()
198 );
199 error::RetryLaterSnafu { reason }.fail()
200 }
201 Err(e) => Err(e),
202 }
203 }
204}
205
206#[cfg(test)]
207mod tests {
208 use std::assert_matches::assert_matches;
209 use std::collections::HashMap;
210
211 use common_catalog::consts::MITO2_ENGINE;
212 use common_meta::DatanodeId;
213 use common_meta::key::table_route::TableRouteValue;
214 use common_meta::key::test_utils::new_test_table_info;
215 use common_meta::peer::Peer;
216 use common_meta::rpc::router::{Region, RegionRoute};
217 use store_api::storage::RegionId;
218
219 use super::*;
220 use crate::error::Error;
221 use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
222 use crate::procedure::region_migration::{ContextFactory, PersistentContext};
223 use crate::procedure::test_util::{
224 new_close_region_reply, new_open_region_reply, send_mock_reply,
225 };
226
227 fn new_persistent_context() -> PersistentContext {
228 test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
229 }
230
231 fn new_mock_open_instruction(datanode_id: DatanodeId, region_id: RegionId) -> Instruction {
232 Instruction::OpenRegions(vec![OpenRegion {
233 region_ident: RegionIdent {
234 datanode_id,
235 table_id: region_id.table_id(),
236 region_number: region_id.region_number(),
237 engine: MITO2_ENGINE.to_string(),
238 },
239 region_storage_path: "/bar/foo/region/".to_string(),
240 region_options: Default::default(),
241 region_wal_options: Default::default(),
242 skip_wal_replay: true,
243 }])
244 }
245
246 #[tokio::test]
247 async fn test_datanode_table_is_not_found_error() {
248 let state = OpenCandidateRegion;
249 let persistent_context = new_persistent_context();
250 let env = TestingEnv::new();
251 let mut ctx = env.context_factory().new_context(persistent_context);
252
253 let err = state
254 .build_open_region_instruction(&mut ctx)
255 .await
256 .unwrap_err();
257
258 assert_matches!(err, Error::DatanodeTableNotFound { .. });
259 assert!(!err.is_retryable());
260 }
261
262 #[tokio::test]
263 async fn test_datanode_is_unreachable() {
264 let state = OpenCandidateRegion;
265 let persistent_context = new_persistent_context();
268 let region_id = persistent_context.region_ids[0];
269 let to_peer_id = persistent_context.to_peer.id;
270 let env = TestingEnv::new();
271 let mut ctx = env.context_factory().new_context(persistent_context);
272
273 let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
274 let err = state
275 .open_candidate_region(&mut ctx, open_instruction)
276 .await
277 .unwrap_err();
278
279 assert_matches!(err, Error::PusherNotFound { .. });
280 assert!(!err.is_retryable());
281 }
282
283 #[tokio::test]
284 async fn test_candidate_region_opening_error() {
285 let state = OpenCandidateRegion;
286 let persistent_context = new_persistent_context();
289 let region_id = persistent_context.region_ids[0];
290 let to_peer_id = persistent_context.to_peer.id;
291
292 let env = TestingEnv::new();
293 let mut ctx = env.context_factory().new_context(persistent_context);
294 let opening_region_keeper = env.opening_region_keeper();
295 let _guard = opening_region_keeper
296 .register(to_peer_id, region_id)
297 .unwrap();
298
299 let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
300 let err = state
301 .open_candidate_region(&mut ctx, open_instruction)
302 .await
303 .unwrap_err();
304
305 assert_matches!(err, Error::RegionOpeningRace { .. });
306 assert!(!err.is_retryable());
307 }
308
309 #[tokio::test]
310 async fn test_unexpected_instruction_reply() {
311 let state = OpenCandidateRegion;
312 let persistent_context = new_persistent_context();
315 let region_id = persistent_context.region_ids[0];
316 let to_peer_id = persistent_context.to_peer.id;
317
318 let mut env = TestingEnv::new();
319 let mut ctx = env.context_factory().new_context(persistent_context);
320 let mailbox_ctx = env.mailbox_context();
321 let mailbox = mailbox_ctx.mailbox().clone();
322
323 let (tx, rx) = tokio::sync::mpsc::channel(1);
324
325 mailbox_ctx
326 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
327 .await;
328
329 send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
331
332 let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
333 let err = state
334 .open_candidate_region(&mut ctx, open_instruction)
335 .await
336 .unwrap_err();
337
338 assert_matches!(err, Error::UnexpectedInstructionReply { .. });
339 assert!(!err.is_retryable());
340 }
341
342 #[tokio::test]
343 async fn test_instruction_exceeded_deadline() {
344 let state = OpenCandidateRegion;
345 let persistent_context = new_persistent_context();
348 let region_id = persistent_context.region_ids[0];
349 let to_peer_id = persistent_context.to_peer.id;
350
351 let mut env = TestingEnv::new();
352 let mut ctx = env.context_factory().new_context(persistent_context);
353 let mailbox_ctx = env.mailbox_context();
354 let mailbox = mailbox_ctx.mailbox().clone();
355
356 let (tx, rx) = tokio::sync::mpsc::channel(1);
357
358 mailbox_ctx
359 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
360 .await;
361
362 send_mock_reply(mailbox, rx, |id| {
364 Err(error::MailboxTimeoutSnafu { id }.build())
365 });
366
367 let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
368 let err = state
369 .open_candidate_region(&mut ctx, open_instruction)
370 .await
371 .unwrap_err();
372
373 assert_matches!(err, Error::RetryLater { .. });
374 assert!(err.is_retryable());
375 }
376
377 #[tokio::test]
378 async fn test_open_candidate_region_failed() {
379 let state = OpenCandidateRegion;
380 let persistent_context = new_persistent_context();
383 let region_id = persistent_context.region_ids[0];
384 let to_peer_id = persistent_context.to_peer.id;
385 let mut env = TestingEnv::new();
386
387 let mut ctx = env.context_factory().new_context(persistent_context);
388 let mailbox_ctx = env.mailbox_context();
389 let mailbox = mailbox_ctx.mailbox().clone();
390
391 let (tx, rx) = tokio::sync::mpsc::channel(1);
392
393 mailbox_ctx
394 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
395 .await;
396
397 send_mock_reply(mailbox, rx, |id| {
398 Ok(new_open_region_reply(
399 id,
400 false,
401 Some("test mocked".to_string()),
402 ))
403 });
404
405 let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
406 let err = state
407 .open_candidate_region(&mut ctx, open_instruction)
408 .await
409 .unwrap_err();
410
411 assert_matches!(err, Error::RetryLater { .. });
412 assert!(err.is_retryable());
413 assert!(format!("{err:?}").contains("test mocked"));
414 }
415
416 #[tokio::test]
417 async fn test_next_flush_leader_region_state() {
418 let mut state = Box::new(OpenCandidateRegion);
419 let persistent_context = new_persistent_context();
422 let from_peer_id = persistent_context.from_peer.id;
423 let region_id = persistent_context.region_ids[0];
424 let to_peer_id = persistent_context.to_peer.id;
425 let mut env = TestingEnv::new();
426
427 let table_info = new_test_table_info(1024, vec![1]).into();
429 let region_routes = vec![RegionRoute {
430 region: Region::new_test(region_id),
431 leader_peer: Some(Peer::empty(from_peer_id)),
432 ..Default::default()
433 }];
434
435 env.table_metadata_manager()
436 .create_table_metadata(
437 table_info,
438 TableRouteValue::physical(region_routes),
439 HashMap::default(),
440 )
441 .await
442 .unwrap();
443
444 let mut ctx = env.context_factory().new_context(persistent_context);
445 let mailbox_ctx = env.mailbox_context();
446 let mailbox = mailbox_ctx.mailbox().clone();
447
448 let (tx, rx) = tokio::sync::mpsc::channel(1);
449
450 mailbox_ctx
451 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
452 .await;
453
454 send_mock_reply(mailbox, rx, |id| Ok(new_open_region_reply(id, true, None)));
455 let procedure_ctx = new_procedure_context();
456 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
457 let vc = ctx.volatile_ctx;
458 assert_eq!(vc.opening_region_guards[0].info(), (to_peer_id, region_id));
459
460 let flush_leader_region = next.as_any().downcast_ref::<PreFlushRegion>().unwrap();
461 assert_matches!(flush_leader_region, PreFlushRegion);
462 }
463}