1use async_trait::async_trait;
16use common_meta::error::{InvalidHeartbeatResponseSnafu, Result as MetaResult};
17use common_meta::heartbeat::handler::{
18 HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
19};
20use common_meta::instruction::{Instruction, InstructionReply};
21use common_telemetry::error;
22use snafu::OptionExt;
23use store_api::storage::GcReport;
24
25mod apply_staging_manifest;
26mod close_region;
27mod downgrade_region;
28mod enter_staging;
29mod file_ref;
30mod flush_region;
31mod gc_worker;
32mod open_region;
33mod remap_manifest;
34mod sync_region;
35mod upgrade_region;
36
37use crate::heartbeat::handler::apply_staging_manifest::ApplyStagingManifestsHandler;
38use crate::heartbeat::handler::close_region::CloseRegionsHandler;
39use crate::heartbeat::handler::downgrade_region::DowngradeRegionsHandler;
40use crate::heartbeat::handler::enter_staging::EnterStagingRegionsHandler;
41use crate::heartbeat::handler::file_ref::GetFileRefsHandler;
42use crate::heartbeat::handler::flush_region::FlushRegionsHandler;
43use crate::heartbeat::handler::gc_worker::GcRegionsHandler;
44use crate::heartbeat::handler::open_region::OpenRegionsHandler;
45use crate::heartbeat::handler::remap_manifest::RemapManifestHandler;
46use crate::heartbeat::handler::sync_region::SyncRegionHandler;
47use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler;
48use crate::heartbeat::task_tracker::TaskTracker;
49use crate::region_server::RegionServer;
50
51#[derive(Clone)]
53pub struct RegionHeartbeatResponseHandler {
54 region_server: RegionServer,
55 downgrade_tasks: TaskTracker<()>,
56 flush_tasks: TaskTracker<()>,
57 open_region_parallelism: usize,
58 gc_tasks: TaskTracker<GcReport>,
59}
60
61#[async_trait::async_trait]
62pub trait InstructionHandler: Send + Sync {
63 type Instruction;
64 async fn handle(
65 &self,
66 ctx: &HandlerContext,
67 instruction: Self::Instruction,
68 ) -> Option<InstructionReply>;
69}
70
71#[derive(Clone)]
72pub struct HandlerContext {
73 region_server: RegionServer,
74 downgrade_tasks: TaskTracker<()>,
75 flush_tasks: TaskTracker<()>,
76 gc_tasks: TaskTracker<GcReport>,
77}
78
79impl HandlerContext {
80 #[cfg(test)]
81 pub fn new_for_test(region_server: RegionServer) -> Self {
82 Self {
83 region_server,
84 downgrade_tasks: TaskTracker::new(),
85 flush_tasks: TaskTracker::new(),
86 gc_tasks: TaskTracker::new(),
87 }
88 }
89}
90
91impl RegionHeartbeatResponseHandler {
92 pub fn new(region_server: RegionServer) -> Self {
94 Self {
95 region_server,
96 downgrade_tasks: TaskTracker::new(),
97 flush_tasks: TaskTracker::new(),
98 open_region_parallelism: (num_cpus::get() / 2).max(1),
100 gc_tasks: TaskTracker::new(),
101 }
102 }
103
104 pub fn with_open_region_parallelism(mut self, parallelism: usize) -> Self {
106 self.open_region_parallelism = parallelism;
107 self
108 }
109
110 fn build_handler(
111 &self,
112 instruction: &Instruction,
113 ) -> MetaResult<Option<Box<InstructionHandlers>>> {
114 match instruction {
115 Instruction::CloseRegions(_) => Ok(Some(Box::new(CloseRegionsHandler.into()))),
116 Instruction::OpenRegions(_) => Ok(Some(Box::new(
117 OpenRegionsHandler {
118 open_region_parallelism: self.open_region_parallelism,
119 }
120 .into(),
121 ))),
122 Instruction::FlushRegions(_) => Ok(Some(Box::new(FlushRegionsHandler.into()))),
123 Instruction::DowngradeRegions(_) => Ok(Some(Box::new(DowngradeRegionsHandler.into()))),
124 Instruction::UpgradeRegions(_) => Ok(Some(Box::new(
125 UpgradeRegionsHandler {
126 upgrade_region_parallelism: self.open_region_parallelism,
127 }
128 .into(),
129 ))),
130 Instruction::GetFileRefs(_) => Ok(Some(Box::new(GetFileRefsHandler.into()))),
131 Instruction::GcRegions(_) => Ok(Some(Box::new(GcRegionsHandler.into()))),
132 Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(),
133 Instruction::Suspend => Ok(None),
134 Instruction::EnterStagingRegions(_) => {
135 Ok(Some(Box::new(EnterStagingRegionsHandler.into())))
136 }
137 Instruction::SyncRegions(_) => Ok(Some(Box::new(SyncRegionHandler.into()))),
138 Instruction::RemapManifest(_) => Ok(Some(Box::new(RemapManifestHandler.into()))),
139 Instruction::ApplyStagingManifests(_) => {
140 Ok(Some(Box::new(ApplyStagingManifestsHandler.into())))
141 }
142 }
143 }
144}
145
146#[allow(clippy::enum_variant_names)]
147pub enum InstructionHandlers {
148 CloseRegions(CloseRegionsHandler),
149 OpenRegions(OpenRegionsHandler),
150 FlushRegions(FlushRegionsHandler),
151 DowngradeRegions(DowngradeRegionsHandler),
152 UpgradeRegions(UpgradeRegionsHandler),
153 GetFileRefs(GetFileRefsHandler),
154 GcRegions(GcRegionsHandler),
155 EnterStagingRegions(EnterStagingRegionsHandler),
156 SyncRegions(SyncRegionHandler),
157 RemapManifest(RemapManifestHandler),
158 ApplyStagingManifests(ApplyStagingManifestsHandler),
159}
160
161macro_rules! impl_from_handler {
162 ($($handler:ident => $variant:ident),*) => {
163 $(
164 impl From<$handler> for InstructionHandlers {
165 fn from(handler: $handler) -> Self {
166 InstructionHandlers::$variant(handler)
167 }
168 }
169 )*
170 };
171}
172
173impl_from_handler!(
174 CloseRegionsHandler => CloseRegions,
175 OpenRegionsHandler => OpenRegions,
176 FlushRegionsHandler => FlushRegions,
177 DowngradeRegionsHandler => DowngradeRegions,
178 UpgradeRegionsHandler => UpgradeRegions,
179 GetFileRefsHandler => GetFileRefs,
180 GcRegionsHandler => GcRegions,
181 EnterStagingRegionsHandler => EnterStagingRegions,
182 SyncRegionHandler => SyncRegions,
183 RemapManifestHandler => RemapManifest,
184 ApplyStagingManifestsHandler => ApplyStagingManifests
185);
186
187macro_rules! dispatch_instr {
188 (
189 $( $instr_variant:ident => $handler_variant:ident ),* $(,)?
190 ) => {
191 impl InstructionHandlers {
192 pub async fn handle(
193 &self,
194 ctx: &HandlerContext,
195 instruction: Instruction,
196 ) -> Option<InstructionReply> {
197 match (self, instruction) {
198 $(
199 (
200 InstructionHandlers::$handler_variant(handler),
201 Instruction::$instr_variant(instr),
202 ) => handler.handle(ctx, instr).await,
203 )*
204 _ => unreachable!(),
206 }
207 }
208 pub fn is_acceptable(instruction: &Instruction) -> bool {
210 matches!(
211 instruction,
212 $(
213 Instruction::$instr_variant { .. }
214 )|*
215 )
216 }
217 }
218 };
219}
220
221dispatch_instr!(
222 CloseRegions => CloseRegions,
223 OpenRegions => OpenRegions,
224 FlushRegions => FlushRegions,
225 DowngradeRegions => DowngradeRegions,
226 UpgradeRegions => UpgradeRegions,
227 GetFileRefs => GetFileRefs,
228 GcRegions => GcRegions,
229 EnterStagingRegions => EnterStagingRegions,
230 SyncRegions => SyncRegions,
231 RemapManifest => RemapManifest,
232 ApplyStagingManifests => ApplyStagingManifests,
233);
234
235#[async_trait]
236impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
237 fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
238 if let Some((_, instruction)) = ctx.incoming_message.as_ref() {
239 return InstructionHandlers::is_acceptable(instruction);
240 }
241 false
242 }
243
244 async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {
245 let (meta, instruction) = ctx
246 .incoming_message
247 .take()
248 .context(InvalidHeartbeatResponseSnafu)?;
249
250 let mailbox = ctx.mailbox.clone();
251 if let Some(handler) = self.build_handler(&instruction)? {
252 let context = HandlerContext {
253 region_server: self.region_server.clone(),
254 downgrade_tasks: self.downgrade_tasks.clone(),
255 flush_tasks: self.flush_tasks.clone(),
256 gc_tasks: self.gc_tasks.clone(),
257 };
258 let _handle = common_runtime::spawn_global(async move {
259 let reply = handler.handle(&context, instruction).await;
260 if let Some(reply) = reply
261 && let Err(e) = mailbox.send((meta, reply)).await
262 {
263 let error = e.to_string();
264 let (meta, reply) = e.0;
265 error!("Failed to send reply {reply} to {meta:?}: {error}");
266 }
267 });
268 }
269
270 Ok(HandleControl::Continue)
271 }
272}
273
274#[cfg(test)]
275mod tests {
276 use std::assert_matches::assert_matches;
277 use std::collections::HashMap;
278 use std::sync::Arc;
279 use std::time::Duration;
280
281 use common_meta::RegionIdent;
282 use common_meta::heartbeat::mailbox::{
283 HeartbeatMailbox, IncomingMessage, MailboxRef, MessageMeta,
284 };
285 use common_meta::instruction::{
286 DowngradeRegion, EnterStagingRegion, OpenRegion, UpgradeRegion,
287 };
288 use mito2::config::MitoConfig;
289 use mito2::engine::MITO_ENGINE_NAME;
290 use mito2::test_util::{CreateRequestBuilder, TestEnv};
291 use store_api::path_utils::table_dir;
292 use store_api::region_engine::RegionRole;
293 use store_api::region_request::{RegionCloseRequest, RegionRequest};
294 use store_api::storage::RegionId;
295 use tokio::sync::mpsc::{self, Receiver};
296
297 use super::*;
298 use crate::error;
299 use crate::tests::mock_region_server;
300
301 pub struct HeartbeatResponseTestEnv {
302 pub(crate) mailbox: MailboxRef,
303 pub(crate) receiver: Receiver<(MessageMeta, InstructionReply)>,
304 }
305
306 impl HeartbeatResponseTestEnv {
307 pub fn new() -> Self {
308 let (tx, rx) = mpsc::channel(8);
309 let mailbox = Arc::new(HeartbeatMailbox::new(tx));
310
311 HeartbeatResponseTestEnv {
312 mailbox,
313 receiver: rx,
314 }
315 }
316
317 pub fn create_handler_ctx(
318 &self,
319 incoming_message: IncomingMessage,
320 ) -> HeartbeatResponseHandlerContext {
321 HeartbeatResponseHandlerContext {
322 mailbox: self.mailbox.clone(),
323 response: Default::default(),
324 incoming_message: Some(incoming_message),
325 }
326 }
327 }
328
329 #[test]
330 fn test_is_acceptable() {
331 common_telemetry::init_default_ut_logging();
332 let region_server = mock_region_server();
333 let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
334 let heartbeat_env = HeartbeatResponseTestEnv::new();
335 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
336
337 let region_id = RegionId::new(1024, 1);
339 let storage_path = "test";
340 let instruction = open_region_instruction(region_id, storage_path);
341 assert!(
342 heartbeat_handler
343 .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))
344 );
345
346 let instruction = close_region_instruction(region_id);
348 assert!(
349 heartbeat_handler
350 .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))
351 );
352
353 let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
355 region_id: RegionId::new(2048, 1),
356 flush_timeout: Some(Duration::from_secs(1)),
357 }]);
358 assert!(
359 heartbeat_handler
360 .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))
361 );
362
363 let instruction = Instruction::UpgradeRegions(vec![UpgradeRegion {
365 region_id,
366 ..Default::default()
367 }]);
368 assert!(
369 heartbeat_handler
370 .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))
371 );
372
373 let instruction = Instruction::EnterStagingRegions(vec![EnterStagingRegion {
375 region_id,
376 partition_expr: "".to_string(),
377 }]);
378 assert!(
379 heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((meta, instruction)))
380 );
381 }
382
383 fn close_region_instruction(region_id: RegionId) -> Instruction {
384 Instruction::CloseRegions(vec![RegionIdent {
385 table_id: region_id.table_id(),
386 region_number: region_id.region_number(),
387 datanode_id: 2,
388 engine: MITO_ENGINE_NAME.to_string(),
389 }])
390 }
391
392 fn open_region_instruction(region_id: RegionId, path: &str) -> Instruction {
393 Instruction::OpenRegions(vec![OpenRegion::new(
394 RegionIdent {
395 table_id: region_id.table_id(),
396 region_number: region_id.region_number(),
397 datanode_id: 2,
398 engine: MITO_ENGINE_NAME.to_string(),
399 },
400 path,
401 HashMap::new(),
402 HashMap::new(),
403 false,
404 )])
405 }
406
407 #[tokio::test]
408 async fn test_close_region() {
409 common_telemetry::init_default_ut_logging();
410
411 let mut region_server = mock_region_server();
412 let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
413
414 let mut engine_env = TestEnv::with_prefix("close-region").await;
415 let engine = engine_env.create_engine(MitoConfig::default()).await;
416 region_server.register_engine(Arc::new(engine));
417 let region_id = RegionId::new(1024, 1);
418
419 let builder = CreateRequestBuilder::new();
420 let create_req = builder.build();
421 region_server
422 .handle_request(region_id, RegionRequest::Create(create_req))
423 .await
424 .unwrap();
425
426 let mut heartbeat_env = HeartbeatResponseTestEnv::new();
427
428 for _ in 0..2 {
430 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
431 let instruction = close_region_instruction(region_id);
432
433 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
434 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
435 assert_matches!(control, HandleControl::Continue);
436
437 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
438
439 if let InstructionReply::CloseRegions(reply) = reply {
440 assert!(reply.result);
441 assert!(reply.error.is_none());
442 } else {
443 unreachable!()
444 }
445
446 assert_matches!(
447 region_server
448 .set_region_role(region_id, RegionRole::Leader)
449 .unwrap_err(),
450 error::Error::RegionNotFound { .. }
451 );
452 }
453 }
454
455 #[tokio::test]
456 async fn test_open_region_ok() {
457 common_telemetry::init_default_ut_logging();
458
459 let mut region_server = mock_region_server();
460 let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
461
462 let mut engine_env = TestEnv::with_prefix("open-region").await;
463 let engine = engine_env.create_engine(MitoConfig::default()).await;
464 region_server.register_engine(Arc::new(engine));
465 let region_id = RegionId::new(1024, 1);
466
467 let builder = CreateRequestBuilder::new();
468 let mut create_req = builder.build();
469 let storage_path = "test";
470 create_req.table_dir = table_dir(storage_path, region_id.table_id());
471
472 region_server
473 .handle_request(region_id, RegionRequest::Create(create_req))
474 .await
475 .unwrap();
476
477 region_server
478 .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
479 .await
480 .unwrap();
481 let mut heartbeat_env = HeartbeatResponseTestEnv::new();
482
483 for _ in 0..2 {
485 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
486 let instruction = open_region_instruction(region_id, storage_path);
487
488 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
489 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
490 assert_matches!(control, HandleControl::Continue);
491
492 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
493
494 if let InstructionReply::OpenRegions(reply) = reply {
495 assert!(reply.result);
496 assert!(reply.error.is_none());
497 } else {
498 unreachable!()
499 }
500 }
501 }
502
503 #[tokio::test]
504 async fn test_open_not_exists_region() {
505 common_telemetry::init_default_ut_logging();
506
507 let mut region_server = mock_region_server();
508 let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
509
510 let mut engine_env = TestEnv::with_prefix("open-not-exists-region").await;
511 let engine = engine_env.create_engine(MitoConfig::default()).await;
512 region_server.register_engine(Arc::new(engine));
513 let region_id = RegionId::new(1024, 1);
514 let storage_path = "test";
515
516 let mut heartbeat_env = HeartbeatResponseTestEnv::new();
517
518 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
519 let instruction = open_region_instruction(region_id, storage_path);
520
521 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
522 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
523 assert_matches!(control, HandleControl::Continue);
524
525 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
526
527 if let InstructionReply::OpenRegions(reply) = reply {
528 assert!(!reply.result);
529 assert!(reply.error.is_some());
530 } else {
531 unreachable!()
532 }
533 }
534
535 #[tokio::test]
536 async fn test_downgrade_region() {
537 common_telemetry::init_default_ut_logging();
538
539 let mut region_server = mock_region_server();
540 let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
541
542 let mut engine_env = TestEnv::with_prefix("downgrade-region").await;
543 let engine = engine_env.create_engine(MitoConfig::default()).await;
544 region_server.register_engine(Arc::new(engine));
545 let region_id = RegionId::new(1024, 1);
546
547 let builder = CreateRequestBuilder::new();
548 let mut create_req = builder.build();
549 let storage_path = "test";
550 create_req.table_dir = table_dir(storage_path, region_id.table_id());
551
552 region_server
553 .handle_request(region_id, RegionRequest::Create(create_req))
554 .await
555 .unwrap();
556
557 let mut heartbeat_env = HeartbeatResponseTestEnv::new();
558
559 for _ in 0..2 {
561 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
562 let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
563 region_id,
564 flush_timeout: Some(Duration::from_secs(1)),
565 }]);
566
567 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
568 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
569 assert_matches!(control, HandleControl::Continue);
570
571 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
572
573 let reply = &reply.expect_downgrade_regions_reply()[0];
574 assert!(reply.exists);
575 assert!(reply.error.is_none());
576 assert_eq!(reply.last_entry_id.unwrap(), 0);
577 }
578
579 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
581 let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
582 region_id: RegionId::new(2048, 1),
583 flush_timeout: Some(Duration::from_secs(1)),
584 }]);
585 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
586 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
587 assert_matches!(control, HandleControl::Continue);
588
589 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
590
591 let reply = reply.expect_downgrade_regions_reply();
592 assert!(!reply[0].exists);
593 assert!(reply[0].error.is_none());
594 assert!(reply[0].last_entry_id.is_none());
595 }
596}