meta_srv/procedure/region_migration/
open_candidate_region.rs1use std::any::Any;
16use std::time::Duration;
17
18use api::v1::meta::MailboxMessage;
19use common_meta::distributed_time_constants::REGION_LEASE_SECS;
20use common_meta::instruction::{Instruction, InstructionReply, OpenRegion, SimpleReply};
21use common_meta::key::datanode_table::RegionInfo;
22use common_meta::RegionIdent;
23use common_procedure::{Context as ProcedureContext, Status};
24use common_telemetry::info;
25use serde::{Deserialize, Serialize};
26use snafu::{OptionExt, ResultExt};
27use tokio::time::Instant;
28
29use crate::error::{self, Result};
30use crate::handler::HeartbeatMailbox;
31use crate::procedure::region_migration::flush_leader_region::PreFlushRegion;
32use crate::procedure::region_migration::{Context, State};
33use crate::service::mailbox::Channel;
34
35const OPEN_CANDIDATE_REGION_TIMEOUT: Duration = Duration::from_secs(REGION_LEASE_SECS);
37
38#[derive(Debug, Serialize, Deserialize)]
39pub struct OpenCandidateRegion;
40
41#[async_trait::async_trait]
42#[typetag::serde]
43impl State for OpenCandidateRegion {
44 async fn next(
45 &mut self,
46 ctx: &mut Context,
47 _procedure_ctx: &ProcedureContext,
48 ) -> Result<(Box<dyn State>, Status)> {
49 let instruction = self.build_open_region_instruction(ctx).await?;
50 let now = Instant::now();
51 self.open_candidate_region(ctx, instruction).await?;
52 ctx.update_open_candidate_region_elapsed(now);
53
54 Ok((Box::new(PreFlushRegion), Status::executing(false)))
55 }
56
57 fn as_any(&self) -> &dyn Any {
58 self
59 }
60}
61
62impl OpenCandidateRegion {
63 async fn build_open_region_instruction(&self, ctx: &mut Context) -> Result<Instruction> {
68 let pc = &ctx.persistent_ctx;
69 let table_id = pc.region_id.table_id();
70 let region_number = pc.region_id.region_number();
71 let candidate_id = pc.to_peer.id;
72 let datanode_table_value = ctx.get_from_peer_datanode_table_value().await?;
73
74 let RegionInfo {
75 region_storage_path,
76 region_options,
77 region_wal_options,
78 engine,
79 } = datanode_table_value.region_info.clone();
80
81 let open_instruction = Instruction::OpenRegion(OpenRegion::new(
82 RegionIdent {
83 datanode_id: candidate_id,
84 table_id,
85 region_number,
86 engine,
87 },
88 ®ion_storage_path,
89 region_options,
90 region_wal_options,
91 true,
92 ));
93
94 Ok(open_instruction)
95 }
96
97 async fn open_candidate_region(
108 &self,
109 ctx: &mut Context,
110 open_instruction: Instruction,
111 ) -> Result<()> {
112 let pc = &ctx.persistent_ctx;
113 let vc = &mut ctx.volatile_ctx;
114 let region_id = pc.region_id;
115 let candidate = &pc.to_peer;
116
117 if vc.opening_region_guard.is_none() {
120 let guard = ctx
122 .opening_region_keeper
123 .register(candidate.id, region_id)
124 .context(error::RegionOpeningRaceSnafu {
125 peer_id: candidate.id,
126 region_id,
127 })?;
128 vc.opening_region_guard = Some(guard);
129 }
130
131 let msg = MailboxMessage::json_message(
132 &format!("Open candidate region: {}", region_id),
133 &format!("Metasrv@{}", ctx.server_addr()),
134 &format!("Datanode-{}@{}", candidate.id, candidate.addr),
135 common_time::util::current_time_millis(),
136 &open_instruction,
137 )
138 .with_context(|_| error::SerializeToJsonSnafu {
139 input: open_instruction.to_string(),
140 })?;
141
142 let ch = Channel::Datanode(candidate.id);
143 let now = Instant::now();
144 let receiver = ctx
145 .mailbox
146 .send(&ch, msg, OPEN_CANDIDATE_REGION_TIMEOUT)
147 .await?;
148
149 match receiver.await {
150 Ok(msg) => {
151 let reply = HeartbeatMailbox::json_reply(&msg)?;
152 info!(
153 "Received open region reply: {:?}, region: {}, elapsed: {:?}",
154 reply,
155 region_id,
156 now.elapsed()
157 );
158 let InstructionReply::OpenRegion(SimpleReply { result, error }) = reply else {
159 return error::UnexpectedInstructionReplySnafu {
160 mailbox_message: msg.to_string(),
161 reason: "expect open region reply",
162 }
163 .fail();
164 };
165
166 if result {
167 Ok(())
168 } else {
169 error::RetryLaterSnafu {
170 reason: format!(
171 "Region {region_id} is not opened by datanode {:?}, error: {error:?}, elapsed: {:?}",
172 candidate,
173 now.elapsed()
174 ),
175 }
176 .fail()
177 }
178 }
179 Err(error::Error::MailboxTimeout { .. }) => {
180 let reason = format!(
181 "Mailbox received timeout for open candidate region {region_id} on datanode {:?}, elapsed: {:?}",
182 candidate,
183 now.elapsed()
184 );
185 error::RetryLaterSnafu { reason }.fail()
186 }
187 Err(e) => Err(e),
188 }
189 }
190}
191
192#[cfg(test)]
193mod tests {
194 use std::assert_matches::assert_matches;
195 use std::collections::HashMap;
196
197 use common_catalog::consts::MITO2_ENGINE;
198 use common_meta::key::table_route::TableRouteValue;
199 use common_meta::key::test_utils::new_test_table_info;
200 use common_meta::peer::Peer;
201 use common_meta::rpc::router::{Region, RegionRoute};
202 use common_meta::DatanodeId;
203 use store_api::storage::RegionId;
204
205 use super::*;
206 use crate::error::Error;
207 use crate::procedure::region_migration::test_util::{self, new_procedure_context, TestingEnv};
208 use crate::procedure::region_migration::{ContextFactory, PersistentContext};
209 use crate::procedure::test_util::{
210 new_close_region_reply, new_open_region_reply, send_mock_reply,
211 };
212
213 fn new_persistent_context() -> PersistentContext {
214 test_util::new_persistent_context(1, 2, RegionId::new(1024, 1))
215 }
216
217 fn new_mock_open_instruction(datanode_id: DatanodeId, region_id: RegionId) -> Instruction {
218 Instruction::OpenRegion(OpenRegion {
219 region_ident: RegionIdent {
220 datanode_id,
221 table_id: region_id.table_id(),
222 region_number: region_id.region_number(),
223 engine: MITO2_ENGINE.to_string(),
224 },
225 region_storage_path: "/bar/foo/region/".to_string(),
226 region_options: Default::default(),
227 region_wal_options: Default::default(),
228 skip_wal_replay: true,
229 })
230 }
231
232 #[tokio::test]
233 async fn test_datanode_table_is_not_found_error() {
234 let state = OpenCandidateRegion;
235 let persistent_context = new_persistent_context();
236 let env = TestingEnv::new();
237 let mut ctx = env.context_factory().new_context(persistent_context);
238
239 let err = state
240 .build_open_region_instruction(&mut ctx)
241 .await
242 .unwrap_err();
243
244 assert_matches!(err, Error::DatanodeTableNotFound { .. });
245 assert!(!err.is_retryable());
246 }
247
248 #[tokio::test]
249 async fn test_datanode_is_unreachable() {
250 let state = OpenCandidateRegion;
251 let persistent_context = new_persistent_context();
254 let region_id = persistent_context.region_id;
255 let to_peer_id = persistent_context.to_peer.id;
256 let env = TestingEnv::new();
257 let mut ctx = env.context_factory().new_context(persistent_context);
258
259 let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
260 let err = state
261 .open_candidate_region(&mut ctx, open_instruction)
262 .await
263 .unwrap_err();
264
265 assert_matches!(err, Error::PusherNotFound { .. });
266 assert!(!err.is_retryable());
267 }
268
269 #[tokio::test]
270 async fn test_candidate_region_opening_error() {
271 let state = OpenCandidateRegion;
272 let persistent_context = new_persistent_context();
275 let region_id = persistent_context.region_id;
276 let to_peer_id = persistent_context.to_peer.id;
277
278 let env = TestingEnv::new();
279 let mut ctx = env.context_factory().new_context(persistent_context);
280 let opening_region_keeper = env.opening_region_keeper();
281 let _guard = opening_region_keeper
282 .register(to_peer_id, region_id)
283 .unwrap();
284
285 let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
286 let err = state
287 .open_candidate_region(&mut ctx, open_instruction)
288 .await
289 .unwrap_err();
290
291 assert_matches!(err, Error::RegionOpeningRace { .. });
292 assert!(!err.is_retryable());
293 }
294
295 #[tokio::test]
296 async fn test_unexpected_instruction_reply() {
297 let state = OpenCandidateRegion;
298 let persistent_context = new_persistent_context();
301 let region_id = persistent_context.region_id;
302 let to_peer_id = persistent_context.to_peer.id;
303
304 let mut env = TestingEnv::new();
305 let mut ctx = env.context_factory().new_context(persistent_context);
306 let mailbox_ctx = env.mailbox_context();
307 let mailbox = mailbox_ctx.mailbox().clone();
308
309 let (tx, rx) = tokio::sync::mpsc::channel(1);
310
311 mailbox_ctx
312 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
313 .await;
314
315 send_mock_reply(mailbox, rx, |id| Ok(new_close_region_reply(id)));
317
318 let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
319 let err = state
320 .open_candidate_region(&mut ctx, open_instruction)
321 .await
322 .unwrap_err();
323
324 assert_matches!(err, Error::UnexpectedInstructionReply { .. });
325 assert!(!err.is_retryable());
326 }
327
328 #[tokio::test]
329 async fn test_instruction_exceeded_deadline() {
330 let state = OpenCandidateRegion;
331 let persistent_context = new_persistent_context();
334 let region_id = persistent_context.region_id;
335 let to_peer_id = persistent_context.to_peer.id;
336
337 let mut env = TestingEnv::new();
338 let mut ctx = env.context_factory().new_context(persistent_context);
339 let mailbox_ctx = env.mailbox_context();
340 let mailbox = mailbox_ctx.mailbox().clone();
341
342 let (tx, rx) = tokio::sync::mpsc::channel(1);
343
344 mailbox_ctx
345 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
346 .await;
347
348 send_mock_reply(mailbox, rx, |id| {
350 Err(error::MailboxTimeoutSnafu { id }.build())
351 });
352
353 let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
354 let err = state
355 .open_candidate_region(&mut ctx, open_instruction)
356 .await
357 .unwrap_err();
358
359 assert_matches!(err, Error::RetryLater { .. });
360 assert!(err.is_retryable());
361 }
362
363 #[tokio::test]
364 async fn test_open_candidate_region_failed() {
365 let state = OpenCandidateRegion;
366 let persistent_context = new_persistent_context();
369 let region_id = persistent_context.region_id;
370 let to_peer_id = persistent_context.to_peer.id;
371 let mut env = TestingEnv::new();
372
373 let mut ctx = env.context_factory().new_context(persistent_context);
374 let mailbox_ctx = env.mailbox_context();
375 let mailbox = mailbox_ctx.mailbox().clone();
376
377 let (tx, rx) = tokio::sync::mpsc::channel(1);
378
379 mailbox_ctx
380 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
381 .await;
382
383 send_mock_reply(mailbox, rx, |id| {
384 Ok(new_open_region_reply(
385 id,
386 false,
387 Some("test mocked".to_string()),
388 ))
389 });
390
391 let open_instruction = new_mock_open_instruction(to_peer_id, region_id);
392 let err = state
393 .open_candidate_region(&mut ctx, open_instruction)
394 .await
395 .unwrap_err();
396
397 assert_matches!(err, Error::RetryLater { .. });
398 assert!(err.is_retryable());
399 assert!(format!("{err:?}").contains("test mocked"));
400 }
401
402 #[tokio::test]
403 async fn test_next_flush_leader_region_state() {
404 let mut state = Box::new(OpenCandidateRegion);
405 let persistent_context = new_persistent_context();
408 let from_peer_id = persistent_context.from_peer.id;
409 let region_id = persistent_context.region_id;
410 let to_peer_id = persistent_context.to_peer.id;
411 let mut env = TestingEnv::new();
412
413 let table_info = new_test_table_info(1024, vec![1]).into();
415 let region_routes = vec![RegionRoute {
416 region: Region::new_test(persistent_context.region_id),
417 leader_peer: Some(Peer::empty(from_peer_id)),
418 ..Default::default()
419 }];
420
421 env.table_metadata_manager()
422 .create_table_metadata(
423 table_info,
424 TableRouteValue::physical(region_routes),
425 HashMap::default(),
426 )
427 .await
428 .unwrap();
429
430 let mut ctx = env.context_factory().new_context(persistent_context);
431 let mailbox_ctx = env.mailbox_context();
432 let mailbox = mailbox_ctx.mailbox().clone();
433
434 let (tx, rx) = tokio::sync::mpsc::channel(1);
435
436 mailbox_ctx
437 .insert_heartbeat_response_receiver(Channel::Datanode(to_peer_id), tx)
438 .await;
439
440 send_mock_reply(mailbox, rx, |id| Ok(new_open_region_reply(id, true, None)));
441 let procedure_ctx = new_procedure_context();
442 let (next, _) = state.next(&mut ctx, &procedure_ctx).await.unwrap();
443 let vc = ctx.volatile_ctx;
444 assert_eq!(
445 vc.opening_region_guard.unwrap().info(),
446 (to_peer_id, region_id)
447 );
448
449 let flush_leader_region = next.as_any().downcast_ref::<PreFlushRegion>().unwrap();
450 assert_matches!(flush_leader_region, PreFlushRegion);
451 }
452}