1use async_trait::async_trait;
16use common_meta::error::{InvalidHeartbeatResponseSnafu, Result as MetaResult};
17use common_meta::heartbeat::handler::{
18 HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
19};
20use common_meta::instruction::{Instruction, InstructionReply};
21use common_telemetry::error;
22use snafu::OptionExt;
23use store_api::storage::GcReport;
24
25mod close_region;
26mod downgrade_region;
27mod file_ref;
28mod flush_region;
29mod gc_worker;
30mod open_region;
31mod upgrade_region;
32
33use crate::heartbeat::handler::close_region::CloseRegionsHandler;
34use crate::heartbeat::handler::downgrade_region::DowngradeRegionsHandler;
35use crate::heartbeat::handler::file_ref::GetFileRefsHandler;
36use crate::heartbeat::handler::flush_region::FlushRegionsHandler;
37use crate::heartbeat::handler::gc_worker::GcRegionsHandler;
38use crate::heartbeat::handler::open_region::OpenRegionsHandler;
39use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler;
40use crate::heartbeat::task_tracker::TaskTracker;
41use crate::region_server::RegionServer;
42
43#[derive(Clone)]
45pub struct RegionHeartbeatResponseHandler {
46 region_server: RegionServer,
47 downgrade_tasks: TaskTracker<()>,
48 flush_tasks: TaskTracker<()>,
49 open_region_parallelism: usize,
50 gc_tasks: TaskTracker<GcReport>,
51}
52
53#[async_trait::async_trait]
54pub trait InstructionHandler: Send + Sync {
55 type Instruction;
56 async fn handle(
57 &self,
58 ctx: &HandlerContext,
59 instruction: Self::Instruction,
60 ) -> Option<InstructionReply>;
61}
62
63#[derive(Clone)]
64pub struct HandlerContext {
65 region_server: RegionServer,
66 downgrade_tasks: TaskTracker<()>,
67 flush_tasks: TaskTracker<()>,
68 gc_tasks: TaskTracker<GcReport>,
69}
70
71impl HandlerContext {
72 #[cfg(test)]
73 pub fn new_for_test(region_server: RegionServer) -> Self {
74 Self {
75 region_server,
76 downgrade_tasks: TaskTracker::new(),
77 flush_tasks: TaskTracker::new(),
78 gc_tasks: TaskTracker::new(),
79 }
80 }
81}
82
83impl RegionHeartbeatResponseHandler {
84 pub fn new(region_server: RegionServer) -> Self {
86 Self {
87 region_server,
88 downgrade_tasks: TaskTracker::new(),
89 flush_tasks: TaskTracker::new(),
90 open_region_parallelism: (num_cpus::get() / 2).max(1),
92 gc_tasks: TaskTracker::new(),
93 }
94 }
95
96 pub fn with_open_region_parallelism(mut self, parallelism: usize) -> Self {
98 self.open_region_parallelism = parallelism;
99 self
100 }
101
102 fn build_handler(&self, instruction: &Instruction) -> MetaResult<Box<InstructionHandlers>> {
103 match instruction {
104 Instruction::CloseRegions(_) => Ok(Box::new(CloseRegionsHandler.into())),
105 Instruction::OpenRegions(_) => Ok(Box::new(
106 OpenRegionsHandler {
107 open_region_parallelism: self.open_region_parallelism,
108 }
109 .into(),
110 )),
111 Instruction::FlushRegions(_) => Ok(Box::new(FlushRegionsHandler.into())),
112 Instruction::DowngradeRegions(_) => Ok(Box::new(DowngradeRegionsHandler.into())),
113 Instruction::UpgradeRegions(_) => Ok(Box::new(
114 UpgradeRegionsHandler {
115 upgrade_region_parallelism: self.open_region_parallelism,
116 }
117 .into(),
118 )),
119 Instruction::GetFileRefs(_) => Ok(Box::new(GetFileRefsHandler.into())),
120 Instruction::GcRegions(_) => Ok(Box::new(GcRegionsHandler.into())),
121 Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(),
122 }
123 }
124}
125
126#[allow(clippy::enum_variant_names)]
127pub enum InstructionHandlers {
128 CloseRegions(CloseRegionsHandler),
129 OpenRegions(OpenRegionsHandler),
130 FlushRegions(FlushRegionsHandler),
131 DowngradeRegions(DowngradeRegionsHandler),
132 UpgradeRegions(UpgradeRegionsHandler),
133 GetFileRefs(GetFileRefsHandler),
134 GcRegions(GcRegionsHandler),
135}
136
137macro_rules! impl_from_handler {
138 ($($handler:ident => $variant:ident),*) => {
139 $(
140 impl From<$handler> for InstructionHandlers {
141 fn from(handler: $handler) -> Self {
142 InstructionHandlers::$variant(handler)
143 }
144 }
145 )*
146 };
147}
148
149impl_from_handler!(
150 CloseRegionsHandler => CloseRegions,
151 OpenRegionsHandler => OpenRegions,
152 FlushRegionsHandler => FlushRegions,
153 DowngradeRegionsHandler => DowngradeRegions,
154 UpgradeRegionsHandler => UpgradeRegions,
155 GetFileRefsHandler => GetFileRefs,
156 GcRegionsHandler => GcRegions
157);
158
159macro_rules! dispatch_instr {
160 (
161 $( $instr_variant:ident => $handler_variant:ident ),* $(,)?
162 ) => {
163 impl InstructionHandlers {
164 pub async fn handle(
165 &self,
166 ctx: &HandlerContext,
167 instruction: Instruction,
168 ) -> Option<InstructionReply> {
169 match (self, instruction) {
170 $(
171 (
172 InstructionHandlers::$handler_variant(handler),
173 Instruction::$instr_variant(instr),
174 ) => handler.handle(ctx, instr).await,
175 )*
176 _ => unreachable!(),
178 }
179 }
180 pub fn is_acceptable(instruction: &Instruction) -> bool {
182 matches!(
183 instruction,
184 $(
185 Instruction::$instr_variant { .. }
186 )|*
187 )
188 }
189 }
190 };
191}
192
193dispatch_instr!(
194 CloseRegions => CloseRegions,
195 OpenRegions => OpenRegions,
196 FlushRegions => FlushRegions,
197 DowngradeRegions => DowngradeRegions,
198 UpgradeRegions => UpgradeRegions,
199 GetFileRefs => GetFileRefs,
200 GcRegions => GcRegions,
201);
202
203#[async_trait]
204impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
205 fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
206 if let Some((_, instruction)) = ctx.incoming_message.as_ref() {
207 return InstructionHandlers::is_acceptable(instruction);
208 }
209 false
210 }
211
212 async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {
213 let (meta, instruction) = ctx
214 .incoming_message
215 .take()
216 .context(InvalidHeartbeatResponseSnafu)?;
217
218 let mailbox = ctx.mailbox.clone();
219 let region_server = self.region_server.clone();
220 let downgrade_tasks = self.downgrade_tasks.clone();
221 let flush_tasks = self.flush_tasks.clone();
222 let gc_tasks = self.gc_tasks.clone();
223 let handler = self.build_handler(&instruction)?;
224 let _handle = common_runtime::spawn_global(async move {
225 let reply = handler
226 .handle(
227 &HandlerContext {
228 region_server,
229 downgrade_tasks,
230 flush_tasks,
231 gc_tasks,
232 },
233 instruction,
234 )
235 .await;
236
237 if let Some(reply) = reply
238 && let Err(e) = mailbox.send((meta, reply)).await
239 {
240 error!(e; "Failed to send reply to mailbox");
241 }
242 });
243
244 Ok(HandleControl::Continue)
245 }
246}
247
248#[cfg(test)]
249mod tests {
250 use std::assert_matches::assert_matches;
251 use std::collections::HashMap;
252 use std::sync::Arc;
253 use std::time::Duration;
254
255 use common_meta::RegionIdent;
256 use common_meta::heartbeat::mailbox::{
257 HeartbeatMailbox, IncomingMessage, MailboxRef, MessageMeta,
258 };
259 use common_meta::instruction::{DowngradeRegion, OpenRegion, UpgradeRegion};
260 use mito2::config::MitoConfig;
261 use mito2::engine::MITO_ENGINE_NAME;
262 use mito2::test_util::{CreateRequestBuilder, TestEnv};
263 use store_api::path_utils::table_dir;
264 use store_api::region_engine::RegionRole;
265 use store_api::region_request::{RegionCloseRequest, RegionRequest};
266 use store_api::storage::RegionId;
267 use tokio::sync::mpsc::{self, Receiver};
268
269 use super::*;
270 use crate::error;
271 use crate::tests::mock_region_server;
272
273 pub struct HeartbeatResponseTestEnv {
274 pub(crate) mailbox: MailboxRef,
275 pub(crate) receiver: Receiver<(MessageMeta, InstructionReply)>,
276 }
277
278 impl HeartbeatResponseTestEnv {
279 pub fn new() -> Self {
280 let (tx, rx) = mpsc::channel(8);
281 let mailbox = Arc::new(HeartbeatMailbox::new(tx));
282
283 HeartbeatResponseTestEnv {
284 mailbox,
285 receiver: rx,
286 }
287 }
288
289 pub fn create_handler_ctx(
290 &self,
291 incoming_message: IncomingMessage,
292 ) -> HeartbeatResponseHandlerContext {
293 HeartbeatResponseHandlerContext {
294 mailbox: self.mailbox.clone(),
295 response: Default::default(),
296 incoming_message: Some(incoming_message),
297 }
298 }
299 }
300
301 #[test]
302 fn test_is_acceptable() {
303 common_telemetry::init_default_ut_logging();
304 let region_server = mock_region_server();
305 let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
306 let heartbeat_env = HeartbeatResponseTestEnv::new();
307 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
308
309 let region_id = RegionId::new(1024, 1);
311 let storage_path = "test";
312 let instruction = open_region_instruction(region_id, storage_path);
313 assert!(
314 heartbeat_handler
315 .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))
316 );
317
318 let instruction = close_region_instruction(region_id);
320 assert!(
321 heartbeat_handler
322 .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))
323 );
324
325 let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
327 region_id: RegionId::new(2048, 1),
328 flush_timeout: Some(Duration::from_secs(1)),
329 }]);
330 assert!(
331 heartbeat_handler
332 .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))
333 );
334
335 let instruction = Instruction::UpgradeRegions(vec![UpgradeRegion {
337 region_id,
338 ..Default::default()
339 }]);
340 assert!(
341 heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((meta, instruction)))
342 );
343 }
344
345 fn close_region_instruction(region_id: RegionId) -> Instruction {
346 Instruction::CloseRegions(vec![RegionIdent {
347 table_id: region_id.table_id(),
348 region_number: region_id.region_number(),
349 datanode_id: 2,
350 engine: MITO_ENGINE_NAME.to_string(),
351 }])
352 }
353
354 fn open_region_instruction(region_id: RegionId, path: &str) -> Instruction {
355 Instruction::OpenRegions(vec![OpenRegion::new(
356 RegionIdent {
357 table_id: region_id.table_id(),
358 region_number: region_id.region_number(),
359 datanode_id: 2,
360 engine: MITO_ENGINE_NAME.to_string(),
361 },
362 path,
363 HashMap::new(),
364 HashMap::new(),
365 false,
366 )])
367 }
368
369 #[tokio::test]
370 async fn test_close_region() {
371 common_telemetry::init_default_ut_logging();
372
373 let mut region_server = mock_region_server();
374 let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
375
376 let mut engine_env = TestEnv::with_prefix("close-region").await;
377 let engine = engine_env.create_engine(MitoConfig::default()).await;
378 region_server.register_engine(Arc::new(engine));
379 let region_id = RegionId::new(1024, 1);
380
381 let builder = CreateRequestBuilder::new();
382 let create_req = builder.build();
383 region_server
384 .handle_request(region_id, RegionRequest::Create(create_req))
385 .await
386 .unwrap();
387
388 let mut heartbeat_env = HeartbeatResponseTestEnv::new();
389
390 for _ in 0..2 {
392 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
393 let instruction = close_region_instruction(region_id);
394
395 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
396 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
397 assert_matches!(control, HandleControl::Continue);
398
399 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
400
401 if let InstructionReply::CloseRegions(reply) = reply {
402 assert!(reply.result);
403 assert!(reply.error.is_none());
404 } else {
405 unreachable!()
406 }
407
408 assert_matches!(
409 region_server
410 .set_region_role(region_id, RegionRole::Leader)
411 .unwrap_err(),
412 error::Error::RegionNotFound { .. }
413 );
414 }
415 }
416
417 #[tokio::test]
418 async fn test_open_region_ok() {
419 common_telemetry::init_default_ut_logging();
420
421 let mut region_server = mock_region_server();
422 let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
423
424 let mut engine_env = TestEnv::with_prefix("open-region").await;
425 let engine = engine_env.create_engine(MitoConfig::default()).await;
426 region_server.register_engine(Arc::new(engine));
427 let region_id = RegionId::new(1024, 1);
428
429 let builder = CreateRequestBuilder::new();
430 let mut create_req = builder.build();
431 let storage_path = "test";
432 create_req.table_dir = table_dir(storage_path, region_id.table_id());
433
434 region_server
435 .handle_request(region_id, RegionRequest::Create(create_req))
436 .await
437 .unwrap();
438
439 region_server
440 .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
441 .await
442 .unwrap();
443 let mut heartbeat_env = HeartbeatResponseTestEnv::new();
444
445 for _ in 0..2 {
447 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
448 let instruction = open_region_instruction(region_id, storage_path);
449
450 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
451 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
452 assert_matches!(control, HandleControl::Continue);
453
454 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
455
456 if let InstructionReply::OpenRegions(reply) = reply {
457 assert!(reply.result);
458 assert!(reply.error.is_none());
459 } else {
460 unreachable!()
461 }
462 }
463 }
464
465 #[tokio::test]
466 async fn test_open_not_exists_region() {
467 common_telemetry::init_default_ut_logging();
468
469 let mut region_server = mock_region_server();
470 let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
471
472 let mut engine_env = TestEnv::with_prefix("open-not-exists-region").await;
473 let engine = engine_env.create_engine(MitoConfig::default()).await;
474 region_server.register_engine(Arc::new(engine));
475 let region_id = RegionId::new(1024, 1);
476 let storage_path = "test";
477
478 let mut heartbeat_env = HeartbeatResponseTestEnv::new();
479
480 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
481 let instruction = open_region_instruction(region_id, storage_path);
482
483 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
484 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
485 assert_matches!(control, HandleControl::Continue);
486
487 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
488
489 if let InstructionReply::OpenRegions(reply) = reply {
490 assert!(!reply.result);
491 assert!(reply.error.is_some());
492 } else {
493 unreachable!()
494 }
495 }
496
497 #[tokio::test]
498 async fn test_downgrade_region() {
499 common_telemetry::init_default_ut_logging();
500
501 let mut region_server = mock_region_server();
502 let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
503
504 let mut engine_env = TestEnv::with_prefix("downgrade-region").await;
505 let engine = engine_env.create_engine(MitoConfig::default()).await;
506 region_server.register_engine(Arc::new(engine));
507 let region_id = RegionId::new(1024, 1);
508
509 let builder = CreateRequestBuilder::new();
510 let mut create_req = builder.build();
511 let storage_path = "test";
512 create_req.table_dir = table_dir(storage_path, region_id.table_id());
513
514 region_server
515 .handle_request(region_id, RegionRequest::Create(create_req))
516 .await
517 .unwrap();
518
519 let mut heartbeat_env = HeartbeatResponseTestEnv::new();
520
521 for _ in 0..2 {
523 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
524 let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
525 region_id,
526 flush_timeout: Some(Duration::from_secs(1)),
527 }]);
528
529 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
530 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
531 assert_matches!(control, HandleControl::Continue);
532
533 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
534
535 let reply = &reply.expect_downgrade_regions_reply()[0];
536 assert!(reply.exists);
537 assert!(reply.error.is_none());
538 assert_eq!(reply.last_entry_id.unwrap(), 0);
539 }
540
541 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
543 let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
544 region_id: RegionId::new(2048, 1),
545 flush_timeout: Some(Duration::from_secs(1)),
546 }]);
547 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
548 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
549 assert_matches!(control, HandleControl::Continue);
550
551 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
552
553 let reply = reply.expect_downgrade_regions_reply();
554 assert!(!reply[0].exists);
555 assert!(reply[0].error.is_none());
556 assert!(reply[0].last_entry_id.is_none());
557 }
558}