1use async_trait::async_trait;
16use common_meta::error::{InvalidHeartbeatResponseSnafu, Result as MetaResult};
17use common_meta::heartbeat::handler::{
18 HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
19};
20use common_meta::instruction::{Instruction, InstructionReply};
21use common_meta::kv_backend::KvBackendRef;
22use common_telemetry::error;
23use common_telemetry::tracing_context::FutureExt;
24use snafu::OptionExt;
25use store_api::storage::GcReport;
26use strum::AsRefStr;
27
28mod apply_staging_manifest;
29mod close_region;
30mod downgrade_region;
31mod enter_staging;
32mod file_ref;
33mod flush_region;
34mod gc_worker;
35mod open_region;
36mod remap_manifest;
37mod sync_region;
38mod upgrade_region;
39
40use crate::heartbeat::handler::apply_staging_manifest::ApplyStagingManifestsHandler;
41use crate::heartbeat::handler::close_region::CloseRegionsHandler;
42use crate::heartbeat::handler::downgrade_region::DowngradeRegionsHandler;
43use crate::heartbeat::handler::enter_staging::EnterStagingRegionsHandler;
44use crate::heartbeat::handler::file_ref::GetFileRefsHandler;
45use crate::heartbeat::handler::flush_region::FlushRegionsHandler;
46use crate::heartbeat::handler::gc_worker::GcRegionsHandler;
47use crate::heartbeat::handler::open_region::OpenRegionsHandler;
48use crate::heartbeat::handler::remap_manifest::RemapManifestHandler;
49use crate::heartbeat::handler::sync_region::SyncRegionHandler;
50use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler;
51use crate::heartbeat::task_tracker::TaskTracker;
52use crate::region_server::RegionServer;
53
54#[derive(Clone)]
56pub struct RegionHeartbeatResponseHandler {
57 region_server: RegionServer,
58 downgrade_tasks: TaskTracker<()>,
59 flush_tasks: TaskTracker<()>,
60 open_region_parallelism: usize,
61 gc_tasks: TaskTracker<GcReport>,
62 kv_backend: KvBackendRef,
63}
64
65#[async_trait::async_trait]
66pub trait InstructionHandler: Send + Sync {
67 type Instruction;
68 async fn handle(
69 &self,
70 ctx: &HandlerContext,
71 instruction: Self::Instruction,
72 ) -> Option<InstructionReply>;
73}
74
75#[derive(Clone)]
76pub struct HandlerContext {
77 pub region_server: RegionServer,
78 pub downgrade_tasks: TaskTracker<()>,
79 pub flush_tasks: TaskTracker<()>,
80 pub gc_tasks: TaskTracker<GcReport>,
81 pub kv_backend: KvBackendRef,
82}
83
84impl HandlerContext {
85 #[cfg(test)]
86 pub fn new_for_test(region_server: RegionServer, kv_backend: KvBackendRef) -> Self {
87 Self {
88 region_server,
89 downgrade_tasks: TaskTracker::new(),
90 flush_tasks: TaskTracker::new(),
91 gc_tasks: TaskTracker::new(),
92 kv_backend,
93 }
94 }
95}
96
97impl RegionHeartbeatResponseHandler {
98 pub fn new(region_server: RegionServer, kv_backend: KvBackendRef) -> Self {
100 Self {
101 region_server,
102 downgrade_tasks: TaskTracker::new(),
103 flush_tasks: TaskTracker::new(),
104 open_region_parallelism: (num_cpus::get() / 2).max(1),
106 gc_tasks: TaskTracker::new(),
107 kv_backend,
108 }
109 }
110
111 pub fn with_open_region_parallelism(mut self, parallelism: usize) -> Self {
113 self.open_region_parallelism = parallelism;
114 self
115 }
116
117 fn build_handler(
118 &self,
119 instruction: &Instruction,
120 ) -> MetaResult<Option<Box<InstructionHandlers>>> {
121 match instruction {
122 Instruction::CloseRegions(_) => Ok(Some(Box::new(CloseRegionsHandler.into()))),
123 Instruction::OpenRegions(_) => Ok(Some(Box::new(
124 OpenRegionsHandler {
125 open_region_parallelism: self.open_region_parallelism,
126 }
127 .into(),
128 ))),
129 Instruction::FlushRegions(_) => Ok(Some(Box::new(FlushRegionsHandler.into()))),
130 Instruction::DowngradeRegions(_) => Ok(Some(Box::new(DowngradeRegionsHandler.into()))),
131 Instruction::UpgradeRegions(_) => Ok(Some(Box::new(
132 UpgradeRegionsHandler {
133 upgrade_region_parallelism: self.open_region_parallelism,
134 }
135 .into(),
136 ))),
137 Instruction::GetFileRefs(_) => Ok(Some(Box::new(GetFileRefsHandler.into()))),
138 Instruction::GcRegions(_) => Ok(Some(Box::new(GcRegionsHandler.into()))),
139 Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(),
140 Instruction::Suspend => Ok(None),
141 Instruction::EnterStagingRegions(_) => {
142 Ok(Some(Box::new(EnterStagingRegionsHandler.into())))
143 }
144 Instruction::SyncRegions(_) => Ok(Some(Box::new(SyncRegionHandler.into()))),
145 Instruction::RemapManifest(_) => Ok(Some(Box::new(RemapManifestHandler.into()))),
146 Instruction::ApplyStagingManifests(_) => {
147 Ok(Some(Box::new(ApplyStagingManifestsHandler.into())))
148 }
149 }
150 }
151}
152
153#[allow(clippy::enum_variant_names)]
154#[derive(AsRefStr)]
155pub enum InstructionHandlers {
156 CloseRegions(CloseRegionsHandler),
157 OpenRegions(OpenRegionsHandler),
158 FlushRegions(FlushRegionsHandler),
159 DowngradeRegions(DowngradeRegionsHandler),
160 UpgradeRegions(UpgradeRegionsHandler),
161 GetFileRefs(GetFileRefsHandler),
162 GcRegions(GcRegionsHandler),
163 EnterStagingRegions(EnterStagingRegionsHandler),
164 SyncRegions(SyncRegionHandler),
165 RemapManifest(RemapManifestHandler),
166 ApplyStagingManifests(ApplyStagingManifestsHandler),
167}
168
169impl InstructionHandlers {
170 pub fn as_ref_str(&self) -> &str {
172 self.as_ref()
173 }
174}
175
176macro_rules! impl_from_handler {
177 ($($handler:ident => $variant:ident),*) => {
178 $(
179 impl From<$handler> for InstructionHandlers {
180 fn from(handler: $handler) -> Self {
181 InstructionHandlers::$variant(handler)
182 }
183 }
184 )*
185 };
186}
187
188impl_from_handler!(
189 CloseRegionsHandler => CloseRegions,
190 OpenRegionsHandler => OpenRegions,
191 FlushRegionsHandler => FlushRegions,
192 DowngradeRegionsHandler => DowngradeRegions,
193 UpgradeRegionsHandler => UpgradeRegions,
194 GetFileRefsHandler => GetFileRefs,
195 GcRegionsHandler => GcRegions,
196 EnterStagingRegionsHandler => EnterStagingRegions,
197 SyncRegionHandler => SyncRegions,
198 RemapManifestHandler => RemapManifest,
199 ApplyStagingManifestsHandler => ApplyStagingManifests
200);
201
202macro_rules! dispatch_instr {
203 (
204 $( $instr_variant:ident => $handler_variant:ident ),* $(,)?
205 ) => {
206 impl InstructionHandlers {
207 pub async fn handle(
208 &self,
209 ctx: &HandlerContext,
210 instruction: Instruction,
211 ) -> Option<InstructionReply> {
212 match (self, instruction) {
213 $(
214 (
215 InstructionHandlers::$handler_variant(handler),
216 Instruction::$instr_variant(instr),
217 ) => handler.handle(ctx, instr).await,
218 )*
219 _ => unreachable!(),
221 }
222 }
223 pub fn is_acceptable(instruction: &Instruction) -> bool {
225 matches!(
226 instruction,
227 $(
228 Instruction::$instr_variant { .. }
229 )|*
230 )
231 }
232 }
233 };
234}
235
236dispatch_instr!(
237 CloseRegions => CloseRegions,
238 OpenRegions => OpenRegions,
239 FlushRegions => FlushRegions,
240 DowngradeRegions => DowngradeRegions,
241 UpgradeRegions => UpgradeRegions,
242 GetFileRefs => GetFileRefs,
243 GcRegions => GcRegions,
244 EnterStagingRegions => EnterStagingRegions,
245 SyncRegions => SyncRegions,
246 RemapManifest => RemapManifest,
247 ApplyStagingManifests => ApplyStagingManifests,
248);
249
250#[async_trait]
251impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
252 fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
253 if let Some((_, _, instruction)) = ctx.incoming_message.as_ref() {
254 return InstructionHandlers::is_acceptable(instruction);
255 }
256 false
257 }
258
259 async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {
260 let (meta, tracing_ctx, instruction) = ctx
261 .incoming_message
262 .take()
263 .context(InvalidHeartbeatResponseSnafu)?;
264
265 let mailbox = ctx.mailbox.clone();
266 if let Some(handler) = self.build_handler(&instruction)? {
267 let context = HandlerContext {
268 region_server: self.region_server.clone(),
269 downgrade_tasks: self.downgrade_tasks.clone(),
270 flush_tasks: self.flush_tasks.clone(),
271 gc_tasks: self.gc_tasks.clone(),
272 kv_backend: self.kv_backend.clone(),
273 };
274 let span = tracing_ctx.attach(tracing::info_span!(
275 "RegionHeartbeatResponseHandler::handle",
276 from = %meta.from,
277 to = %meta.to,
278 handler = %handler.as_ref_str(),
279 ));
280 let _handle = common_runtime::spawn_global(async move {
281 let reply = handler.handle(&context, instruction).trace(span).await;
282 if let Some(reply) = reply
283 && let Err(e) = mailbox.send((meta, reply)).await
284 {
285 let error = e.to_string();
286 let (meta, reply) = e.0;
287 error!("Failed to send reply {reply} to {meta:?}: {error}");
288 }
289 });
290 }
291
292 Ok(HandleControl::Continue)
293 }
294}
295
296#[cfg(test)]
297mod tests {
298 use std::assert_matches::assert_matches;
299 use std::collections::HashMap;
300 use std::sync::Arc;
301 use std::time::Duration;
302
303 use common_meta::RegionIdent;
304 use common_meta::heartbeat::mailbox::{
305 HeartbeatMailbox, IncomingMessage, MailboxRef, MessageMeta,
306 };
307 use common_meta::instruction::{
308 DowngradeRegion, EnterStagingRegion, OpenRegion, StagingPartitionDirective, UpgradeRegion,
309 };
310 use common_meta::kv_backend::memory::MemoryKvBackend;
311 use mito2::config::MitoConfig;
312 use mito2::engine::MITO_ENGINE_NAME;
313 use mito2::test_util::{CreateRequestBuilder, TestEnv};
314 use store_api::path_utils::table_dir;
315 use store_api::region_engine::RegionRole;
316 use store_api::region_request::{RegionCloseRequest, RegionRequest};
317 use store_api::storage::RegionId;
318 use tokio::sync::mpsc::{self, Receiver};
319
320 use super::*;
321 use crate::error;
322 use crate::tests::mock_region_server;
323
324 pub struct HeartbeatResponseTestEnv {
325 pub(crate) mailbox: MailboxRef,
326 pub(crate) receiver: Receiver<(MessageMeta, InstructionReply)>,
327 }
328
329 impl HeartbeatResponseTestEnv {
330 pub fn new() -> Self {
331 let (tx, rx) = mpsc::channel(8);
332 let mailbox = Arc::new(HeartbeatMailbox::new(tx));
333
334 HeartbeatResponseTestEnv {
335 mailbox,
336 receiver: rx,
337 }
338 }
339
340 pub fn create_handler_ctx(
341 &self,
342 incoming_message: IncomingMessage,
343 ) -> HeartbeatResponseHandlerContext {
344 HeartbeatResponseHandlerContext {
345 mailbox: self.mailbox.clone(),
346 response: Default::default(),
347 incoming_message: Some(incoming_message),
348 }
349 }
350 }
351
352 #[test]
353 fn test_is_acceptable() {
354 common_telemetry::init_default_ut_logging();
355 let region_server = mock_region_server();
356 let kv_backend = Arc::new(MemoryKvBackend::new());
357 let heartbeat_handler =
358 RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend);
359 let heartbeat_env = HeartbeatResponseTestEnv::new();
360 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
361
362 let region_id = RegionId::new(1024, 1);
364 let storage_path = "test";
365 let instruction = open_region_instruction(region_id, storage_path);
366 assert!(
367 heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((
368 meta.clone(),
369 Default::default(),
370 instruction
371 )))
372 );
373
374 let instruction = close_region_instruction(region_id);
376 assert!(
377 heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((
378 meta.clone(),
379 Default::default(),
380 instruction
381 )))
382 );
383
384 let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
386 region_id: RegionId::new(2048, 1),
387 flush_timeout: Some(Duration::from_secs(1)),
388 }]);
389 assert!(
390 heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((
391 meta.clone(),
392 Default::default(),
393 instruction
394 )))
395 );
396
397 let instruction = Instruction::UpgradeRegions(vec![UpgradeRegion {
399 region_id,
400 ..Default::default()
401 }]);
402 assert!(
403 heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((
404 meta.clone(),
405 Default::default(),
406 instruction
407 )))
408 );
409
410 let instruction = Instruction::EnterStagingRegions(vec![EnterStagingRegion {
412 region_id,
413 partition_directive: StagingPartitionDirective::UpdatePartitionExpr("".to_string()),
414 }]);
415 assert!(
416 heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((
417 meta,
418 Default::default(),
419 instruction
420 )))
421 );
422 }
423
424 fn close_region_instruction(region_id: RegionId) -> Instruction {
425 Instruction::CloseRegions(vec![RegionIdent {
426 table_id: region_id.table_id(),
427 region_number: region_id.region_number(),
428 datanode_id: 2,
429 engine: MITO_ENGINE_NAME.to_string(),
430 }])
431 }
432
433 fn open_region_instruction(region_id: RegionId, path: &str) -> Instruction {
434 Instruction::OpenRegions(vec![OpenRegion::new(
435 RegionIdent {
436 table_id: region_id.table_id(),
437 region_number: region_id.region_number(),
438 datanode_id: 2,
439 engine: MITO_ENGINE_NAME.to_string(),
440 },
441 path,
442 HashMap::new(),
443 HashMap::new(),
444 false,
445 )])
446 }
447
448 #[tokio::test]
449 async fn test_close_region() {
450 common_telemetry::init_default_ut_logging();
451
452 let mut region_server = mock_region_server();
453 let kv_backend = Arc::new(MemoryKvBackend::new());
454 let heartbeat_handler =
455 RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend);
456
457 let mut engine_env = TestEnv::with_prefix("close-region").await;
458 let engine = engine_env.create_engine(MitoConfig::default()).await;
459 region_server.register_engine(Arc::new(engine));
460 let region_id = RegionId::new(1024, 1);
461
462 let builder = CreateRequestBuilder::new();
463 let create_req = builder.build();
464 region_server
465 .handle_request(region_id, RegionRequest::Create(create_req))
466 .await
467 .unwrap();
468
469 let mut heartbeat_env = HeartbeatResponseTestEnv::new();
470
471 for _ in 0..2 {
473 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
474 let instruction = close_region_instruction(region_id);
475
476 let mut ctx = heartbeat_env.create_handler_ctx((meta, Default::default(), instruction));
477 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
478 assert_matches!(control, HandleControl::Continue);
479
480 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
481
482 if let InstructionReply::CloseRegions(reply) = reply {
483 assert!(reply.result);
484 assert!(reply.error.is_none());
485 } else {
486 unreachable!()
487 }
488
489 assert_matches!(
490 region_server
491 .set_region_role(region_id, RegionRole::Leader)
492 .unwrap_err(),
493 error::Error::RegionNotFound { .. }
494 );
495 }
496 }
497
498 #[tokio::test]
499 async fn test_open_region_ok() {
500 common_telemetry::init_default_ut_logging();
501
502 let mut region_server = mock_region_server();
503 let kv_backend = Arc::new(MemoryKvBackend::new());
504 let heartbeat_handler =
505 RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend);
506
507 let mut engine_env = TestEnv::with_prefix("open-region").await;
508 let engine = engine_env.create_engine(MitoConfig::default()).await;
509 region_server.register_engine(Arc::new(engine));
510 let region_id = RegionId::new(1024, 1);
511
512 let builder = CreateRequestBuilder::new();
513 let mut create_req = builder.build();
514 let storage_path = "test";
515 create_req.table_dir = table_dir(storage_path, region_id.table_id());
516
517 region_server
518 .handle_request(region_id, RegionRequest::Create(create_req))
519 .await
520 .unwrap();
521
522 region_server
523 .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
524 .await
525 .unwrap();
526 let mut heartbeat_env = HeartbeatResponseTestEnv::new();
527
528 for _ in 0..2 {
530 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
531 let instruction = open_region_instruction(region_id, storage_path);
532
533 let mut ctx = heartbeat_env.create_handler_ctx((meta, Default::default(), instruction));
534 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
535 assert_matches!(control, HandleControl::Continue);
536
537 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
538
539 if let InstructionReply::OpenRegions(reply) = reply {
540 assert!(reply.result);
541 assert!(reply.error.is_none());
542 } else {
543 unreachable!()
544 }
545 }
546 }
547
548 #[tokio::test]
549 async fn test_open_not_exists_region() {
550 common_telemetry::init_default_ut_logging();
551
552 let mut region_server = mock_region_server();
553 let kv_backend = Arc::new(MemoryKvBackend::new());
554 let heartbeat_handler =
555 RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend);
556
557 let mut engine_env = TestEnv::with_prefix("open-not-exists-region").await;
558 let engine = engine_env.create_engine(MitoConfig::default()).await;
559 region_server.register_engine(Arc::new(engine));
560 let region_id = RegionId::new(1024, 1);
561 let storage_path = "test";
562
563 let mut heartbeat_env = HeartbeatResponseTestEnv::new();
564
565 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
566 let instruction = open_region_instruction(region_id, storage_path);
567
568 let mut ctx = heartbeat_env.create_handler_ctx((meta, Default::default(), instruction));
569 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
570 assert_matches!(control, HandleControl::Continue);
571
572 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
573
574 if let InstructionReply::OpenRegions(reply) = reply {
575 assert!(!reply.result);
576 assert!(reply.error.is_some());
577 } else {
578 unreachable!()
579 }
580 }
581
582 #[tokio::test]
583 async fn test_downgrade_region() {
584 common_telemetry::init_default_ut_logging();
585
586 let mut region_server = mock_region_server();
587 let kv_backend = Arc::new(MemoryKvBackend::new());
588 let heartbeat_handler =
589 RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend);
590
591 let mut engine_env = TestEnv::with_prefix("downgrade-region").await;
592 let engine = engine_env.create_engine(MitoConfig::default()).await;
593 region_server.register_engine(Arc::new(engine));
594 let region_id = RegionId::new(1024, 1);
595
596 let builder = CreateRequestBuilder::new();
597 let mut create_req = builder.build();
598 let storage_path = "test";
599 create_req.table_dir = table_dir(storage_path, region_id.table_id());
600
601 region_server
602 .handle_request(region_id, RegionRequest::Create(create_req))
603 .await
604 .unwrap();
605
606 let mut heartbeat_env = HeartbeatResponseTestEnv::new();
607
608 for _ in 0..2 {
610 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
611 let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
612 region_id,
613 flush_timeout: Some(Duration::from_secs(1)),
614 }]);
615
616 let mut ctx = heartbeat_env.create_handler_ctx((meta, Default::default(), instruction));
617 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
618 assert_matches!(control, HandleControl::Continue);
619
620 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
621
622 let reply = &reply.expect_downgrade_regions_reply()[0];
623 assert!(reply.exists);
624 assert!(reply.error.is_none());
625 assert_eq!(reply.last_entry_id.unwrap(), 0);
626 }
627
628 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
630 let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
631 region_id: RegionId::new(2048, 1),
632 flush_timeout: Some(Duration::from_secs(1)),
633 }]);
634 let mut ctx = heartbeat_env.create_handler_ctx((meta, Default::default(), instruction));
635 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
636 assert_matches!(control, HandleControl::Continue);
637
638 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
639
640 let reply = reply.expect_downgrade_regions_reply();
641 assert!(!reply[0].exists);
642 assert!(reply[0].error.is_none());
643 assert!(reply[0].last_entry_id.is_none());
644 }
645}