1use async_trait::async_trait;
16use common_meta::error::{InvalidHeartbeatResponseSnafu, Result as MetaResult};
17use common_meta::heartbeat::handler::{
18 HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
19};
20use common_meta::instruction::{Instruction, InstructionReply};
21use common_telemetry::error;
22use snafu::OptionExt;
23use store_api::storage::GcReport;
24
25mod close_region;
26mod downgrade_region;
27mod file_ref;
28mod flush_region;
29mod gc_worker;
30mod open_region;
31mod upgrade_region;
32
33use crate::heartbeat::handler::close_region::CloseRegionsHandler;
34use crate::heartbeat::handler::downgrade_region::DowngradeRegionsHandler;
35use crate::heartbeat::handler::file_ref::GetFileRefsHandler;
36use crate::heartbeat::handler::flush_region::FlushRegionsHandler;
37use crate::heartbeat::handler::gc_worker::GcRegionsHandler;
38use crate::heartbeat::handler::open_region::OpenRegionsHandler;
39use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler;
40use crate::heartbeat::task_tracker::TaskTracker;
41use crate::region_server::RegionServer;
42
43#[derive(Clone)]
45pub struct RegionHeartbeatResponseHandler {
46 region_server: RegionServer,
47 downgrade_tasks: TaskTracker<()>,
48 flush_tasks: TaskTracker<()>,
49 open_region_parallelism: usize,
50 gc_tasks: TaskTracker<GcReport>,
51}
52
53#[async_trait::async_trait]
54pub trait InstructionHandler: Send + Sync {
55 type Instruction;
56 async fn handle(
57 &self,
58 ctx: &HandlerContext,
59 instruction: Self::Instruction,
60 ) -> Option<InstructionReply>;
61}
62
63#[derive(Clone)]
64pub struct HandlerContext {
65 region_server: RegionServer,
66 downgrade_tasks: TaskTracker<()>,
67 flush_tasks: TaskTracker<()>,
68 gc_tasks: TaskTracker<GcReport>,
69}
70
71impl HandlerContext {
72 #[cfg(test)]
73 pub fn new_for_test(region_server: RegionServer) -> Self {
74 Self {
75 region_server,
76 downgrade_tasks: TaskTracker::new(),
77 flush_tasks: TaskTracker::new(),
78 gc_tasks: TaskTracker::new(),
79 }
80 }
81}
82
83impl RegionHeartbeatResponseHandler {
84 pub fn new(region_server: RegionServer) -> Self {
86 Self {
87 region_server,
88 downgrade_tasks: TaskTracker::new(),
89 flush_tasks: TaskTracker::new(),
90 open_region_parallelism: (num_cpus::get() / 2).max(1),
92 gc_tasks: TaskTracker::new(),
93 }
94 }
95
96 pub fn with_open_region_parallelism(mut self, parallelism: usize) -> Self {
98 self.open_region_parallelism = parallelism;
99 self
100 }
101
102 fn build_handler(
103 &self,
104 instruction: &Instruction,
105 ) -> MetaResult<Option<Box<InstructionHandlers>>> {
106 match instruction {
107 Instruction::CloseRegions(_) => Ok(Some(Box::new(CloseRegionsHandler.into()))),
108 Instruction::OpenRegions(_) => Ok(Some(Box::new(
109 OpenRegionsHandler {
110 open_region_parallelism: self.open_region_parallelism,
111 }
112 .into(),
113 ))),
114 Instruction::FlushRegions(_) => Ok(Some(Box::new(FlushRegionsHandler.into()))),
115 Instruction::DowngradeRegions(_) => Ok(Some(Box::new(DowngradeRegionsHandler.into()))),
116 Instruction::UpgradeRegions(_) => Ok(Some(Box::new(
117 UpgradeRegionsHandler {
118 upgrade_region_parallelism: self.open_region_parallelism,
119 }
120 .into(),
121 ))),
122 Instruction::GetFileRefs(_) => Ok(Some(Box::new(GetFileRefsHandler.into()))),
123 Instruction::GcRegions(_) => Ok(Some(Box::new(GcRegionsHandler.into()))),
124 Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(),
125 Instruction::Suspend => Ok(None),
126 }
127 }
128}
129
130#[allow(clippy::enum_variant_names)]
131pub enum InstructionHandlers {
132 CloseRegions(CloseRegionsHandler),
133 OpenRegions(OpenRegionsHandler),
134 FlushRegions(FlushRegionsHandler),
135 DowngradeRegions(DowngradeRegionsHandler),
136 UpgradeRegions(UpgradeRegionsHandler),
137 GetFileRefs(GetFileRefsHandler),
138 GcRegions(GcRegionsHandler),
139}
140
141macro_rules! impl_from_handler {
142 ($($handler:ident => $variant:ident),*) => {
143 $(
144 impl From<$handler> for InstructionHandlers {
145 fn from(handler: $handler) -> Self {
146 InstructionHandlers::$variant(handler)
147 }
148 }
149 )*
150 };
151}
152
153impl_from_handler!(
154 CloseRegionsHandler => CloseRegions,
155 OpenRegionsHandler => OpenRegions,
156 FlushRegionsHandler => FlushRegions,
157 DowngradeRegionsHandler => DowngradeRegions,
158 UpgradeRegionsHandler => UpgradeRegions,
159 GetFileRefsHandler => GetFileRefs,
160 GcRegionsHandler => GcRegions
161);
162
163macro_rules! dispatch_instr {
164 (
165 $( $instr_variant:ident => $handler_variant:ident ),* $(,)?
166 ) => {
167 impl InstructionHandlers {
168 pub async fn handle(
169 &self,
170 ctx: &HandlerContext,
171 instruction: Instruction,
172 ) -> Option<InstructionReply> {
173 match (self, instruction) {
174 $(
175 (
176 InstructionHandlers::$handler_variant(handler),
177 Instruction::$instr_variant(instr),
178 ) => handler.handle(ctx, instr).await,
179 )*
180 _ => unreachable!(),
182 }
183 }
184 pub fn is_acceptable(instruction: &Instruction) -> bool {
186 matches!(
187 instruction,
188 $(
189 Instruction::$instr_variant { .. }
190 )|*
191 )
192 }
193 }
194 };
195}
196
197dispatch_instr!(
198 CloseRegions => CloseRegions,
199 OpenRegions => OpenRegions,
200 FlushRegions => FlushRegions,
201 DowngradeRegions => DowngradeRegions,
202 UpgradeRegions => UpgradeRegions,
203 GetFileRefs => GetFileRefs,
204 GcRegions => GcRegions,
205);
206
207#[async_trait]
208impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
209 fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
210 if let Some((_, instruction)) = ctx.incoming_message.as_ref() {
211 return InstructionHandlers::is_acceptable(instruction);
212 }
213 false
214 }
215
216 async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {
217 let (meta, instruction) = ctx
218 .incoming_message
219 .take()
220 .context(InvalidHeartbeatResponseSnafu)?;
221
222 let mailbox = ctx.mailbox.clone();
223 if let Some(handler) = self.build_handler(&instruction)? {
224 let context = HandlerContext {
225 region_server: self.region_server.clone(),
226 downgrade_tasks: self.downgrade_tasks.clone(),
227 flush_tasks: self.flush_tasks.clone(),
228 gc_tasks: self.gc_tasks.clone(),
229 };
230 let _handle = common_runtime::spawn_global(async move {
231 let reply = handler.handle(&context, instruction).await;
232 if let Some(reply) = reply
233 && let Err(e) = mailbox.send((meta, reply)).await
234 {
235 let error = e.to_string();
236 let (meta, reply) = e.0;
237 error!("Failed to send reply {reply} to {meta:?}: {error}");
238 }
239 });
240 }
241
242 Ok(HandleControl::Continue)
243 }
244}
245
246#[cfg(test)]
247mod tests {
248 use std::assert_matches::assert_matches;
249 use std::collections::HashMap;
250 use std::sync::Arc;
251 use std::time::Duration;
252
253 use common_meta::RegionIdent;
254 use common_meta::heartbeat::mailbox::{
255 HeartbeatMailbox, IncomingMessage, MailboxRef, MessageMeta,
256 };
257 use common_meta::instruction::{DowngradeRegion, OpenRegion, UpgradeRegion};
258 use mito2::config::MitoConfig;
259 use mito2::engine::MITO_ENGINE_NAME;
260 use mito2::test_util::{CreateRequestBuilder, TestEnv};
261 use store_api::path_utils::table_dir;
262 use store_api::region_engine::RegionRole;
263 use store_api::region_request::{RegionCloseRequest, RegionRequest};
264 use store_api::storage::RegionId;
265 use tokio::sync::mpsc::{self, Receiver};
266
267 use super::*;
268 use crate::error;
269 use crate::tests::mock_region_server;
270
271 pub struct HeartbeatResponseTestEnv {
272 pub(crate) mailbox: MailboxRef,
273 pub(crate) receiver: Receiver<(MessageMeta, InstructionReply)>,
274 }
275
276 impl HeartbeatResponseTestEnv {
277 pub fn new() -> Self {
278 let (tx, rx) = mpsc::channel(8);
279 let mailbox = Arc::new(HeartbeatMailbox::new(tx));
280
281 HeartbeatResponseTestEnv {
282 mailbox,
283 receiver: rx,
284 }
285 }
286
287 pub fn create_handler_ctx(
288 &self,
289 incoming_message: IncomingMessage,
290 ) -> HeartbeatResponseHandlerContext {
291 HeartbeatResponseHandlerContext {
292 mailbox: self.mailbox.clone(),
293 response: Default::default(),
294 incoming_message: Some(incoming_message),
295 }
296 }
297 }
298
299 #[test]
300 fn test_is_acceptable() {
301 common_telemetry::init_default_ut_logging();
302 let region_server = mock_region_server();
303 let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
304 let heartbeat_env = HeartbeatResponseTestEnv::new();
305 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
306
307 let region_id = RegionId::new(1024, 1);
309 let storage_path = "test";
310 let instruction = open_region_instruction(region_id, storage_path);
311 assert!(
312 heartbeat_handler
313 .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))
314 );
315
316 let instruction = close_region_instruction(region_id);
318 assert!(
319 heartbeat_handler
320 .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))
321 );
322
323 let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
325 region_id: RegionId::new(2048, 1),
326 flush_timeout: Some(Duration::from_secs(1)),
327 }]);
328 assert!(
329 heartbeat_handler
330 .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))
331 );
332
333 let instruction = Instruction::UpgradeRegions(vec![UpgradeRegion {
335 region_id,
336 ..Default::default()
337 }]);
338 assert!(
339 heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((meta, instruction)))
340 );
341 }
342
343 fn close_region_instruction(region_id: RegionId) -> Instruction {
344 Instruction::CloseRegions(vec![RegionIdent {
345 table_id: region_id.table_id(),
346 region_number: region_id.region_number(),
347 datanode_id: 2,
348 engine: MITO_ENGINE_NAME.to_string(),
349 }])
350 }
351
352 fn open_region_instruction(region_id: RegionId, path: &str) -> Instruction {
353 Instruction::OpenRegions(vec![OpenRegion::new(
354 RegionIdent {
355 table_id: region_id.table_id(),
356 region_number: region_id.region_number(),
357 datanode_id: 2,
358 engine: MITO_ENGINE_NAME.to_string(),
359 },
360 path,
361 HashMap::new(),
362 HashMap::new(),
363 false,
364 )])
365 }
366
367 #[tokio::test]
368 async fn test_close_region() {
369 common_telemetry::init_default_ut_logging();
370
371 let mut region_server = mock_region_server();
372 let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
373
374 let mut engine_env = TestEnv::with_prefix("close-region").await;
375 let engine = engine_env.create_engine(MitoConfig::default()).await;
376 region_server.register_engine(Arc::new(engine));
377 let region_id = RegionId::new(1024, 1);
378
379 let builder = CreateRequestBuilder::new();
380 let create_req = builder.build();
381 region_server
382 .handle_request(region_id, RegionRequest::Create(create_req))
383 .await
384 .unwrap();
385
386 let mut heartbeat_env = HeartbeatResponseTestEnv::new();
387
388 for _ in 0..2 {
390 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
391 let instruction = close_region_instruction(region_id);
392
393 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
394 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
395 assert_matches!(control, HandleControl::Continue);
396
397 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
398
399 if let InstructionReply::CloseRegions(reply) = reply {
400 assert!(reply.result);
401 assert!(reply.error.is_none());
402 } else {
403 unreachable!()
404 }
405
406 assert_matches!(
407 region_server
408 .set_region_role(region_id, RegionRole::Leader)
409 .unwrap_err(),
410 error::Error::RegionNotFound { .. }
411 );
412 }
413 }
414
415 #[tokio::test]
416 async fn test_open_region_ok() {
417 common_telemetry::init_default_ut_logging();
418
419 let mut region_server = mock_region_server();
420 let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
421
422 let mut engine_env = TestEnv::with_prefix("open-region").await;
423 let engine = engine_env.create_engine(MitoConfig::default()).await;
424 region_server.register_engine(Arc::new(engine));
425 let region_id = RegionId::new(1024, 1);
426
427 let builder = CreateRequestBuilder::new();
428 let mut create_req = builder.build();
429 let storage_path = "test";
430 create_req.table_dir = table_dir(storage_path, region_id.table_id());
431
432 region_server
433 .handle_request(region_id, RegionRequest::Create(create_req))
434 .await
435 .unwrap();
436
437 region_server
438 .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
439 .await
440 .unwrap();
441 let mut heartbeat_env = HeartbeatResponseTestEnv::new();
442
443 for _ in 0..2 {
445 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
446 let instruction = open_region_instruction(region_id, storage_path);
447
448 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
449 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
450 assert_matches!(control, HandleControl::Continue);
451
452 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
453
454 if let InstructionReply::OpenRegions(reply) = reply {
455 assert!(reply.result);
456 assert!(reply.error.is_none());
457 } else {
458 unreachable!()
459 }
460 }
461 }
462
463 #[tokio::test]
464 async fn test_open_not_exists_region() {
465 common_telemetry::init_default_ut_logging();
466
467 let mut region_server = mock_region_server();
468 let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
469
470 let mut engine_env = TestEnv::with_prefix("open-not-exists-region").await;
471 let engine = engine_env.create_engine(MitoConfig::default()).await;
472 region_server.register_engine(Arc::new(engine));
473 let region_id = RegionId::new(1024, 1);
474 let storage_path = "test";
475
476 let mut heartbeat_env = HeartbeatResponseTestEnv::new();
477
478 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
479 let instruction = open_region_instruction(region_id, storage_path);
480
481 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
482 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
483 assert_matches!(control, HandleControl::Continue);
484
485 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
486
487 if let InstructionReply::OpenRegions(reply) = reply {
488 assert!(!reply.result);
489 assert!(reply.error.is_some());
490 } else {
491 unreachable!()
492 }
493 }
494
495 #[tokio::test]
496 async fn test_downgrade_region() {
497 common_telemetry::init_default_ut_logging();
498
499 let mut region_server = mock_region_server();
500 let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
501
502 let mut engine_env = TestEnv::with_prefix("downgrade-region").await;
503 let engine = engine_env.create_engine(MitoConfig::default()).await;
504 region_server.register_engine(Arc::new(engine));
505 let region_id = RegionId::new(1024, 1);
506
507 let builder = CreateRequestBuilder::new();
508 let mut create_req = builder.build();
509 let storage_path = "test";
510 create_req.table_dir = table_dir(storage_path, region_id.table_id());
511
512 region_server
513 .handle_request(region_id, RegionRequest::Create(create_req))
514 .await
515 .unwrap();
516
517 let mut heartbeat_env = HeartbeatResponseTestEnv::new();
518
519 for _ in 0..2 {
521 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
522 let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
523 region_id,
524 flush_timeout: Some(Duration::from_secs(1)),
525 }]);
526
527 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
528 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
529 assert_matches!(control, HandleControl::Continue);
530
531 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
532
533 let reply = &reply.expect_downgrade_regions_reply()[0];
534 assert!(reply.exists);
535 assert!(reply.error.is_none());
536 assert_eq!(reply.last_entry_id.unwrap(), 0);
537 }
538
539 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
541 let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
542 region_id: RegionId::new(2048, 1),
543 flush_timeout: Some(Duration::from_secs(1)),
544 }]);
545 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
546 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
547 assert_matches!(control, HandleControl::Continue);
548
549 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
550
551 let reply = reply.expect_downgrade_regions_reply();
552 assert!(!reply[0].exists);
553 assert!(reply[0].error.is_none());
554 assert!(reply[0].last_entry_id.is_none());
555 }
556}