meta_srv/procedure/region_migration/
open_candidate_region.rs1use std::any::Any;
16use std::ops::Div;
17use std::time::Duration;
18
19use api::v1::meta::MailboxMessage;
20use common_meta::RegionIdent;
21use common_meta::distributed_time_constants::REGION_LEASE_SECS;
22use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
23use common_meta::key::datanode_table::RegionInfo;
24use common_procedure::{Context as ProcedureContext, Status};
25use common_telemetry::info;
26use serde::{Deserialize, Serialize};
27use snafu::{OptionExt, ResultExt};
28use tokio::time::Instant;
29
30use crate::error::{self, Result};
31use crate::handler::HeartbeatMailbox;
32use crate::procedure::region_migration::flush_leader_region::PreFlushRegion;
33use crate::procedure::region_migration::{Context, State};
34use crate::service::mailbox::Channel;
35
36const OPEN_CANDIDATE_REGION_TIMEOUT: Duration = Duration::from_secs(REGION_LEASE_SECS);
38
39#[derive(Debug, Serialize, Deserialize)]
40pub struct OpenCandidateRegion;
41
42#[async_trait::async_trait]
43#[typetag::serde]
44impl State for OpenCandidateRegion {
45 async fn next(
46 &mut self,
47 ctx: &mut Context,
48 _procedure_ctx: &ProcedureContext,
49 ) -> Result<(Box<dyn State>, Status)> {
50 let instruction = self.build_open_region_instruction(ctx).await?;
51 let now = Instant::now();
52 self.open_candidate_region(ctx, instruction).await?;
53 ctx.update_open_candidate_region_elapsed(now);
54
55 Ok((Box::new(PreFlushRegion), Status::executing(false)))
56 }
57
58 fn as_any(&self) -> &dyn Any {
59 self
60 }
61}
62
63impl OpenCandidateRegion {
64 async fn build_open_region_instruction(&self, ctx: &mut Context) -> Result<Instruction> {
69 let region_ids = ctx.persistent_ctx.region_ids.clone();
70 let from_peer_id = ctx.persistent_ctx.from_peer.id;
71 let to_peer_id = ctx.persistent_ctx.to_peer.id;
72 let datanode_table_values = ctx.get_from_peer_datanode_table_values().await?;
73 let mut open_regions = Vec::with_capacity(region_ids.len());
74
75 for region_id in region_ids {
76 let table_id = region_id.table_id();
77 let region_number = region_id.region_number();
78 let datanode_table_value = datanode_table_values.get(&table_id).context(
79 error::DatanodeTableNotFoundSnafu {
80 table_id,
81 datanode_id: from_peer_id,
82 },
83 )?;
84 let RegionInfo {
85 region_storage_path,
86 region_options,
87 region_wal_options,
88 engine,
89 } = datanode_table_value.region_info.clone();
90
91 open_regions.push(OpenRegion::new(
92 RegionIdent {
93 datanode_id: to_peer_id,
94 table_id,
95 region_number,
96 engine,
97 },
98 ®ion_storage_path,
99 region_options,
100 region_wal_options,
101 true,
102 ));
103 }
104
105 Ok(Instruction::OpenRegions(open_regions))
106 }
107
108 async fn open_candidate_region(
119 &self,
120 ctx: &mut Context,
121 open_instruction: Instruction,
122 ) -> Result<()> {
123 let pc = &ctx.persistent_ctx;
124 let vc = &mut ctx.volatile_ctx;
125 let region_ids = &pc.region_ids;
126 let candidate = &pc.to_peer;
127
128 if vc.opening_region_guards.is_empty() {
131 for region_id in region_ids {
132 let guard = ctx
134 .opening_region_keeper
135 .register(candidate.id, *region_id)
136 .context(error::RegionOpeningRaceSnafu {
137 peer_id: candidate.id,
138 region_id: *region_id,
139 })?;
140 vc.opening_region_guards.push(guard);
141 }
142 }
143
144 let msg = MailboxMessage::json_message(
145 &format!("Open candidate regions: {:?}", region_ids),
146 &format!("Metasrv@{}", ctx.server_addr()),
147 &format!("Datanode-{}@{}", candidate.id, candidate.addr),
148 common_time::util::current_time_millis(),
149 &open_instruction,
150 )
151 .with_context(|_| error::SerializeToJsonSnafu {
152 input: open_instruction.to_string(),
153 })?;
154
155 let operation_timeout =
156 ctx.next_operation_timeout()
157 .context(error::ExceededDeadlineSnafu {
158 operation: "Open candidate region",
159 })?;
160 let operation_timeout = operation_timeout.div(2).max(OPEN_CANDIDATE_REGION_TIMEOUT);
161 let ch = Channel::Datanode(candidate.id);
162 let now = Instant::now();
163 let receiver = ctx.mailbox.send(&ch, msg, operation_timeout).await?;
164
165 match receiver.await {
166 Ok(msg) => {
167 let reply = HeartbeatMailbox::json_reply(&msg)?;
168 info!(
169 "Received open region reply: {:?}, region: {:?}, elapsed: {:?}",
170 reply,
171 region_ids,
172 now.elapsed()
173 );
174 let InstructionReply::OpenRegions(SimpleReply { result, error }) = reply else {
175 return error::UnexpectedInstructionReplySnafu {
176 mailbox_message: msg.to_string(),
177 reason: "expect open region reply",
178 }
179 .fail();
180 };
181
182 if result {
183 Ok(())
184 } else {
185 error::RetryLaterSnafu {
186 reason: format!(
187 "Region {region_ids:?} is not opened by datanode {:?}, error: {error:?}, elapsed: {:?}",
188 candidate,
189 now.elapsed()
190 ),
191 }
192 .fail()
193 }
194 }
195 Err(error::Error::MailboxTimeout { .. }) => {
196 let reason = format!(
197 "Mailbox received timeout for open candidate region {region_ids:?} on datanode {:?}, elapsed: {:?}",
198 candidate,
199 now.elapsed()
200 );
201 error::RetryLaterSnafu { reason }.fail()
202 }
203 Err(e) => Err(e),
204 }
205 }
206}
207
208#[cfg(test)]
209mod tests {
210 use std::assert_matches::assert_matches;
211 use std::collections::HashMap;
212
213 use common_catalog::consts::MITO2_ENGINE;
214 use common_meta::DatanodeId;
215 use common_meta::key::table_route::TableRouteValue;
216 use common_meta::key::test_utils::new_test_table_info;
217 use common_meta::peer::Peer;
218 use common_meta::rpc::router::{Region, RegionRoute};
219 use store_api::storage::RegionId;
220
221 use super::*;
222 use crate::error::Error;
223 use crate::procedure::region_migration::test_util::{self, TestingEnv, new_procedure_context};
224 use crate::procedure::region_migration::{ContextFactory, PersistentContext};
225 use crate::procedure::test_util::{
226 new_close_region_reply, new_open_region_reply, send_mock_reply,
227 };
228
229 fn new_persistent_context() -> PersistentContext {
230 test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
231 }
232
233 fn new_mock_open_instruction(datanode_id: DatanodeId, region_id: RegionId) -> Instruction {
234 Instruction::OpenRegions(vec![OpenRegion {
235 region_ident: RegionIdent {
236 datanode_id,
237 table_id: region_id.table_id(),
238 region_number: region_id.region_number(),
239 engine: MITO2_ENGINE.to_string(),
240 },
241 region_storage_path: "/bar/foo/region/".to_string(),
242 region_options: Default::default(),
243 region_wal_options: Default::default(),
244 skip_wal_replay: true,
245 }])
246 }
247
248 #[tokio::test]
249 async fn test_datanode_table_is_not_found_error() {
250 let state = OpenCandidateRegion;
251 let persistent_context = new_persistent_context();
252 let env = TestingEnv::new();
253 let mut ctx = env.context_factory().new_context(persistent_context);
254
255 let err = state
256 .build_open_region_instruction(&mut ctx)
257 .await
258 .unwrap_err();
259
260 assert_matches!(err, Error::DatanodeTableNotFound { .. });
261 assert!(!err.is_retryable());
262 }
263
264 #[tokio::test]
265 async fn test_datanode_is_unreachable() {
266 let state = OpenCandidateRegion;
267 let persistent_context = new_persistent_context();
270 let region_id = persistent_context.region_ids[0];
271 let to_peer_id = persistent_context.to_peer.id;
272 let env = TestingEnv::new();
273 let mut ctx = env.context_factory().new_context(persistent_context);
274
275 let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
276 let err = state
277 .open_candidate_region(&mut ctx, open_instruction)
278 .await
279 .unwrap_err();
280
281 assert_matches!(err, Error::PusherNotFound { .. });
282 assert!(!err.is_retryable());
283 }
284
285 #[tokio::test]
286 async fn test_candidate_region_opening_error() {
287 let state = OpenCandidateRegion;
288 let persistent_context = new_persistent_context();
291 let region_id = persistent_context.region_ids[0];
292 let to_peer_id = persistent_context.to_peer.id;
293
294 let env = TestingEnv::new();
295 let mut ctx = env.context_factory().new_context(persistent_context);
296 let opening_region_keeper = env.opening_region_keeper();
297 let _guard = opening_region_keeper
298 .register(to_peer_id, region_id)
299 .unwrap();
300
301 let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
302 let err = state
303 .open_candidate_region(&mut ctx, open_instruction)
304 .await
305 .unwrap_err();
306
307 assert_matches!(err, Error::RegionOpeningRace { .. });
308 assert!(!err.is_retryable());
309 }
310
311 #[tokio::test]
312 async fn test_unexpected_instruction_reply() {
313 let state = OpenCandidateRegion;
314 let persistent_context = new_persistent_context();
317 let region_id = persistent_context.region_ids[0];
318 let to_peer_id = persistent_context.to_peer.id;
319
320 let mut env = TestingEnv::new();
321 let mut ctx = env.context_factory().new_context(persistent_context);
322 let mailbox_ctx = env.mailbox_context();
323 let mailbox = mailbox_ctx.mailbox().clone();
324
325 let (tx, rx) = tokio::sync::mpsc::channel(1);
326
327 mailbox_ctx
328 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
329 .await;
330
331 send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
333
334 let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
335 let err = state
336 .open_candidate_region(&mut ctx, open_instruction)
337 .await
338 .unwrap_err();
339
340 assert_matches!(err, Error::UnexpectedInstructionReply { .. });
341 assert!(!err.is_retryable());
342 }
343
344 #[tokio::test]
345 async fn test_instruction_exceeded_deadline() {
346 let state = OpenCandidateRegion;
347 let persistent_context = new_persistent_context();
350 let region_id = persistent_context.region_ids[0];
351 let to_peer_id = persistent_context.to_peer.id;
352
353 let mut env = TestingEnv::new();
354 let mut ctx = env.context_factory().new_context(persistent_context);
355 let mailbox_ctx = env.mailbox_context();
356 let mailbox = mailbox_ctx.mailbox().clone();
357
358 let (tx, rx) = tokio::sync::mpsc::channel(1);
359
360 mailbox_ctx
361 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
362 .await;
363
364 send_mock_reply(mailbox, rx, |id| {
366 Err(error::MailboxTimeoutSnafu { id }.build())
367 });
368
369 let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
370 let err = state
371 .open_candidate_region(&mut ctx, open_instruction)
372 .await
373 .unwrap_err();
374
375 assert_matches!(err, Error::RetryLater { .. });
376 assert!(err.is_retryable());
377 }
378
379 #[tokio::test]
380 async fn test_open_candidate_region_failed() {
381 let state = OpenCandidateRegion;
382 let persistent_context = new_persistent_context();
385 let region_id = persistent_context.region_ids[0];
386 let to_peer_id = persistent_context.to_peer.id;
387 let mut env = TestingEnv::new();
388
389 let mut ctx = env.context_factory().new_context(persistent_context);
390 let mailbox_ctx = env.mailbox_context();
391 let mailbox = mailbox_ctx.mailbox().clone();
392
393 let (tx, rx) = tokio::sync::mpsc::channel(1);
394
395 mailbox_ctx
396 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
397 .await;
398
399 send_mock_reply(mailbox, rx, |id| {
400 Ok(new_open_region_reply(
401 id,
402 false,
403 Some("test mocked".to_string()),
404 ))
405 });
406
407 let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
408 let err = state
409 .open_candidate_region(&mut ctx, open_instruction)
410 .await
411 .unwrap_err();
412
413 assert_matches!(err, Error::RetryLater { .. });
414 assert!(err.is_retryable());
415 assert!(format!("{err:?}").contains("test mocked"));
416 }
417
418 #[tokio::test]
419 async fn test_next_flush_leader_region_state() {
420 let mut state = Box::new(OpenCandidateRegion);
421 let persistent_context = new_persistent_context();
424 let from_peer_id = persistent_context.from_peer.id;
425 let region_id = persistent_context.region_ids[0];
426 let to_peer_id = persistent_context.to_peer.id;
427 let mut env = TestingEnv::new();
428
429 let table_info = new_test_table_info(1024, vec![1]).into();
431 let region_routes = vec![RegionRoute {
432 region: Region::new_test(region_id),
433 leader_peer: Some(Peer::empty(from_peer_id)),
434 ..Default::default()
435 }];
436
437 env.table_metadata_manager()
438 .create_table_metadata(
439 table_info,
440 TableRouteValue::physical(region_routes),
441 HashMap::default(),
442 )
443 .await
444 .unwrap();
445
446 let mut ctx = env.context_factory().new_context(persistent_context);
447 let mailbox_ctx = env.mailbox_context();
448 let mailbox = mailbox_ctx.mailbox().clone();
449
450 let (tx, rx) = tokio::sync::mpsc::channel(1);
451
452 mailbox_ctx
453 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
454 .await;
455
456 send_mock_reply(mailbox, rx, |id| Ok(new_open_region_reply(id, true, None)));
457 let procedure_ctx = new_procedure_context();
458 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
459 let vc = ctx.volatile_ctx;
460 assert_eq!(vc.opening_region_guards[0].info(), (to_peer_id, region_id));
461
462 let flush_leader_region = next.as_any().downcast_ref::<PreFlushRegion>().unwrap();
463 assert_matches!(flush_leader_region, PreFlushRegion);
464 }
465}