1use async_trait::async_trait;
16use common_meta::error::{InvalidHeartbeatResponseSnafu, Result as MetaResult};
17use common_meta::heartbeat::handler::{
18 HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext,
19};
20use common_meta::instruction::{Instruction, InstructionReply};
21use common_meta::kv_backend::KvBackendRef;
22use common_telemetry::error;
23use snafu::OptionExt;
24use store_api::storage::GcReport;
25
26mod apply_staging_manifest;
27mod close_region;
28mod downgrade_region;
29mod enter_staging;
30mod file_ref;
31mod flush_region;
32mod gc_worker;
33mod open_region;
34mod remap_manifest;
35mod sync_region;
36mod upgrade_region;
37
38use crate::heartbeat::handler::apply_staging_manifest::ApplyStagingManifestsHandler;
39use crate::heartbeat::handler::close_region::CloseRegionsHandler;
40use crate::heartbeat::handler::downgrade_region::DowngradeRegionsHandler;
41use crate::heartbeat::handler::enter_staging::EnterStagingRegionsHandler;
42use crate::heartbeat::handler::file_ref::GetFileRefsHandler;
43use crate::heartbeat::handler::flush_region::FlushRegionsHandler;
44use crate::heartbeat::handler::gc_worker::GcRegionsHandler;
45use crate::heartbeat::handler::open_region::OpenRegionsHandler;
46use crate::heartbeat::handler::remap_manifest::RemapManifestHandler;
47use crate::heartbeat::handler::sync_region::SyncRegionHandler;
48use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler;
49use crate::heartbeat::task_tracker::TaskTracker;
50use crate::region_server::RegionServer;
51
52#[derive(Clone)]
54pub struct RegionHeartbeatResponseHandler {
55 region_server: RegionServer,
56 downgrade_tasks: TaskTracker<()>,
57 flush_tasks: TaskTracker<()>,
58 open_region_parallelism: usize,
59 gc_tasks: TaskTracker<GcReport>,
60 kv_backend: KvBackendRef,
61}
62
63#[async_trait::async_trait]
64pub trait InstructionHandler: Send + Sync {
65 type Instruction;
66 async fn handle(
67 &self,
68 ctx: &HandlerContext,
69 instruction: Self::Instruction,
70 ) -> Option<InstructionReply>;
71}
72
73#[derive(Clone)]
74pub struct HandlerContext {
75 pub region_server: RegionServer,
76 pub downgrade_tasks: TaskTracker<()>,
77 pub flush_tasks: TaskTracker<()>,
78 pub gc_tasks: TaskTracker<GcReport>,
79 pub kv_backend: KvBackendRef,
80}
81
82impl HandlerContext {
83 #[cfg(test)]
84 pub fn new_for_test(region_server: RegionServer, kv_backend: KvBackendRef) -> Self {
85 Self {
86 region_server,
87 downgrade_tasks: TaskTracker::new(),
88 flush_tasks: TaskTracker::new(),
89 gc_tasks: TaskTracker::new(),
90 kv_backend,
91 }
92 }
93}
94
95impl RegionHeartbeatResponseHandler {
96 pub fn new(region_server: RegionServer, kv_backend: KvBackendRef) -> Self {
98 Self {
99 region_server,
100 downgrade_tasks: TaskTracker::new(),
101 flush_tasks: TaskTracker::new(),
102 open_region_parallelism: (num_cpus::get() / 2).max(1),
104 gc_tasks: TaskTracker::new(),
105 kv_backend,
106 }
107 }
108
109 pub fn with_open_region_parallelism(mut self, parallelism: usize) -> Self {
111 self.open_region_parallelism = parallelism;
112 self
113 }
114
115 fn build_handler(
116 &self,
117 instruction: &Instruction,
118 ) -> MetaResult<Option<Box<InstructionHandlers>>> {
119 match instruction {
120 Instruction::CloseRegions(_) => Ok(Some(Box::new(CloseRegionsHandler.into()))),
121 Instruction::OpenRegions(_) => Ok(Some(Box::new(
122 OpenRegionsHandler {
123 open_region_parallelism: self.open_region_parallelism,
124 }
125 .into(),
126 ))),
127 Instruction::FlushRegions(_) => Ok(Some(Box::new(FlushRegionsHandler.into()))),
128 Instruction::DowngradeRegions(_) => Ok(Some(Box::new(DowngradeRegionsHandler.into()))),
129 Instruction::UpgradeRegions(_) => Ok(Some(Box::new(
130 UpgradeRegionsHandler {
131 upgrade_region_parallelism: self.open_region_parallelism,
132 }
133 .into(),
134 ))),
135 Instruction::GetFileRefs(_) => Ok(Some(Box::new(GetFileRefsHandler.into()))),
136 Instruction::GcRegions(_) => Ok(Some(Box::new(GcRegionsHandler.into()))),
137 Instruction::InvalidateCaches(_) => InvalidHeartbeatResponseSnafu.fail(),
138 Instruction::Suspend => Ok(None),
139 Instruction::EnterStagingRegions(_) => {
140 Ok(Some(Box::new(EnterStagingRegionsHandler.into())))
141 }
142 Instruction::SyncRegions(_) => Ok(Some(Box::new(SyncRegionHandler.into()))),
143 Instruction::RemapManifest(_) => Ok(Some(Box::new(RemapManifestHandler.into()))),
144 Instruction::ApplyStagingManifests(_) => {
145 Ok(Some(Box::new(ApplyStagingManifestsHandler.into())))
146 }
147 }
148 }
149}
150
151#[allow(clippy::enum_variant_names)]
152pub enum InstructionHandlers {
153 CloseRegions(CloseRegionsHandler),
154 OpenRegions(OpenRegionsHandler),
155 FlushRegions(FlushRegionsHandler),
156 DowngradeRegions(DowngradeRegionsHandler),
157 UpgradeRegions(UpgradeRegionsHandler),
158 GetFileRefs(GetFileRefsHandler),
159 GcRegions(GcRegionsHandler),
160 EnterStagingRegions(EnterStagingRegionsHandler),
161 SyncRegions(SyncRegionHandler),
162 RemapManifest(RemapManifestHandler),
163 ApplyStagingManifests(ApplyStagingManifestsHandler),
164}
165
166macro_rules! impl_from_handler {
167 ($($handler:ident => $variant:ident),*) => {
168 $(
169 impl From<$handler> for InstructionHandlers {
170 fn from(handler: $handler) -> Self {
171 InstructionHandlers::$variant(handler)
172 }
173 }
174 )*
175 };
176}
177
178impl_from_handler!(
179 CloseRegionsHandler => CloseRegions,
180 OpenRegionsHandler => OpenRegions,
181 FlushRegionsHandler => FlushRegions,
182 DowngradeRegionsHandler => DowngradeRegions,
183 UpgradeRegionsHandler => UpgradeRegions,
184 GetFileRefsHandler => GetFileRefs,
185 GcRegionsHandler => GcRegions,
186 EnterStagingRegionsHandler => EnterStagingRegions,
187 SyncRegionHandler => SyncRegions,
188 RemapManifestHandler => RemapManifest,
189 ApplyStagingManifestsHandler => ApplyStagingManifests
190);
191
192macro_rules! dispatch_instr {
193 (
194 $( $instr_variant:ident => $handler_variant:ident ),* $(,)?
195 ) => {
196 impl InstructionHandlers {
197 pub async fn handle(
198 &self,
199 ctx: &HandlerContext,
200 instruction: Instruction,
201 ) -> Option<InstructionReply> {
202 match (self, instruction) {
203 $(
204 (
205 InstructionHandlers::$handler_variant(handler),
206 Instruction::$instr_variant(instr),
207 ) => handler.handle(ctx, instr).await,
208 )*
209 _ => unreachable!(),
211 }
212 }
213 pub fn is_acceptable(instruction: &Instruction) -> bool {
215 matches!(
216 instruction,
217 $(
218 Instruction::$instr_variant { .. }
219 )|*
220 )
221 }
222 }
223 };
224}
225
226dispatch_instr!(
227 CloseRegions => CloseRegions,
228 OpenRegions => OpenRegions,
229 FlushRegions => FlushRegions,
230 DowngradeRegions => DowngradeRegions,
231 UpgradeRegions => UpgradeRegions,
232 GetFileRefs => GetFileRefs,
233 GcRegions => GcRegions,
234 EnterStagingRegions => EnterStagingRegions,
235 SyncRegions => SyncRegions,
236 RemapManifest => RemapManifest,
237 ApplyStagingManifests => ApplyStagingManifests,
238);
239
240#[async_trait]
241impl HeartbeatResponseHandler for RegionHeartbeatResponseHandler {
242 fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
243 if let Some((_, instruction)) = ctx.incoming_message.as_ref() {
244 return InstructionHandlers::is_acceptable(instruction);
245 }
246 false
247 }
248
249 async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> MetaResult<HandleControl> {
250 let (meta, instruction) = ctx
251 .incoming_message
252 .take()
253 .context(InvalidHeartbeatResponseSnafu)?;
254
255 let mailbox = ctx.mailbox.clone();
256 if let Some(handler) = self.build_handler(&instruction)? {
257 let context = HandlerContext {
258 region_server: self.region_server.clone(),
259 downgrade_tasks: self.downgrade_tasks.clone(),
260 flush_tasks: self.flush_tasks.clone(),
261 gc_tasks: self.gc_tasks.clone(),
262 kv_backend: self.kv_backend.clone(),
263 };
264 let _handle = common_runtime::spawn_global(async move {
265 let reply = handler.handle(&context, instruction).await;
266 if let Some(reply) = reply
267 && let Err(e) = mailbox.send((meta, reply)).await
268 {
269 let error = e.to_string();
270 let (meta, reply) = e.0;
271 error!("Failed to send reply {reply} to {meta:?}: {error}");
272 }
273 });
274 }
275
276 Ok(HandleControl::Continue)
277 }
278}
279
280#[cfg(test)]
281mod tests {
282 use std::assert_matches::assert_matches;
283 use std::collections::HashMap;
284 use std::sync::Arc;
285 use std::time::Duration;
286
287 use common_meta::RegionIdent;
288 use common_meta::heartbeat::mailbox::{
289 HeartbeatMailbox, IncomingMessage, MailboxRef, MessageMeta,
290 };
291 use common_meta::instruction::{
292 DowngradeRegion, EnterStagingRegion, OpenRegion, UpgradeRegion,
293 };
294 use common_meta::kv_backend::memory::MemoryKvBackend;
295 use mito2::config::MitoConfig;
296 use mito2::engine::MITO_ENGINE_NAME;
297 use mito2::test_util::{CreateRequestBuilder, TestEnv};
298 use store_api::path_utils::table_dir;
299 use store_api::region_engine::RegionRole;
300 use store_api::region_request::{RegionCloseRequest, RegionRequest};
301 use store_api::storage::RegionId;
302 use tokio::sync::mpsc::{self, Receiver};
303
304 use super::*;
305 use crate::error;
306 use crate::tests::mock_region_server;
307
308 pub struct HeartbeatResponseTestEnv {
309 pub(crate) mailbox: MailboxRef,
310 pub(crate) receiver: Receiver<(MessageMeta, InstructionReply)>,
311 }
312
313 impl HeartbeatResponseTestEnv {
314 pub fn new() -> Self {
315 let (tx, rx) = mpsc::channel(8);
316 let mailbox = Arc::new(HeartbeatMailbox::new(tx));
317
318 HeartbeatResponseTestEnv {
319 mailbox,
320 receiver: rx,
321 }
322 }
323
324 pub fn create_handler_ctx(
325 &self,
326 incoming_message: IncomingMessage,
327 ) -> HeartbeatResponseHandlerContext {
328 HeartbeatResponseHandlerContext {
329 mailbox: self.mailbox.clone(),
330 response: Default::default(),
331 incoming_message: Some(incoming_message),
332 }
333 }
334 }
335
336 #[test]
337 fn test_is_acceptable() {
338 common_telemetry::init_default_ut_logging();
339 let region_server = mock_region_server();
340 let kv_backend = Arc::new(MemoryKvBackend::new());
341 let heartbeat_handler =
342 RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend);
343 let heartbeat_env = HeartbeatResponseTestEnv::new();
344 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
345
346 let region_id = RegionId::new(1024, 1);
348 let storage_path = "test";
349 let instruction = open_region_instruction(region_id, storage_path);
350 assert!(
351 heartbeat_handler
352 .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))
353 );
354
355 let instruction = close_region_instruction(region_id);
357 assert!(
358 heartbeat_handler
359 .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))
360 );
361
362 let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
364 region_id: RegionId::new(2048, 1),
365 flush_timeout: Some(Duration::from_secs(1)),
366 }]);
367 assert!(
368 heartbeat_handler
369 .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))
370 );
371
372 let instruction = Instruction::UpgradeRegions(vec![UpgradeRegion {
374 region_id,
375 ..Default::default()
376 }]);
377 assert!(
378 heartbeat_handler
379 .is_acceptable(&heartbeat_env.create_handler_ctx((meta.clone(), instruction)))
380 );
381
382 let instruction = Instruction::EnterStagingRegions(vec![EnterStagingRegion {
384 region_id,
385 partition_expr: "".to_string(),
386 }]);
387 assert!(
388 heartbeat_handler.is_acceptable(&heartbeat_env.create_handler_ctx((meta, instruction)))
389 );
390 }
391
392 fn close_region_instruction(region_id: RegionId) -> Instruction {
393 Instruction::CloseRegions(vec![RegionIdent {
394 table_id: region_id.table_id(),
395 region_number: region_id.region_number(),
396 datanode_id: 2,
397 engine: MITO_ENGINE_NAME.to_string(),
398 }])
399 }
400
401 fn open_region_instruction(region_id: RegionId, path: &str) -> Instruction {
402 Instruction::OpenRegions(vec![OpenRegion::new(
403 RegionIdent {
404 table_id: region_id.table_id(),
405 region_number: region_id.region_number(),
406 datanode_id: 2,
407 engine: MITO_ENGINE_NAME.to_string(),
408 },
409 path,
410 HashMap::new(),
411 HashMap::new(),
412 false,
413 )])
414 }
415
416 #[tokio::test]
417 async fn test_close_region() {
418 common_telemetry::init_default_ut_logging();
419
420 let mut region_server = mock_region_server();
421 let kv_backend = Arc::new(MemoryKvBackend::new());
422 let heartbeat_handler =
423 RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend);
424
425 let mut engine_env = TestEnv::with_prefix("close-region").await;
426 let engine = engine_env.create_engine(MitoConfig::default()).await;
427 region_server.register_engine(Arc::new(engine));
428 let region_id = RegionId::new(1024, 1);
429
430 let builder = CreateRequestBuilder::new();
431 let create_req = builder.build();
432 region_server
433 .handle_request(region_id, RegionRequest::Create(create_req))
434 .await
435 .unwrap();
436
437 let mut heartbeat_env = HeartbeatResponseTestEnv::new();
438
439 for _ in 0..2 {
441 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
442 let instruction = close_region_instruction(region_id);
443
444 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
445 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
446 assert_matches!(control, HandleControl::Continue);
447
448 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
449
450 if let InstructionReply::CloseRegions(reply) = reply {
451 assert!(reply.result);
452 assert!(reply.error.is_none());
453 } else {
454 unreachable!()
455 }
456
457 assert_matches!(
458 region_server
459 .set_region_role(region_id, RegionRole::Leader)
460 .unwrap_err(),
461 error::Error::RegionNotFound { .. }
462 );
463 }
464 }
465
466 #[tokio::test]
467 async fn test_open_region_ok() {
468 common_telemetry::init_default_ut_logging();
469
470 let mut region_server = mock_region_server();
471 let kv_backend = Arc::new(MemoryKvBackend::new());
472 let heartbeat_handler =
473 RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend);
474
475 let mut engine_env = TestEnv::with_prefix("open-region").await;
476 let engine = engine_env.create_engine(MitoConfig::default()).await;
477 region_server.register_engine(Arc::new(engine));
478 let region_id = RegionId::new(1024, 1);
479
480 let builder = CreateRequestBuilder::new();
481 let mut create_req = builder.build();
482 let storage_path = "test";
483 create_req.table_dir = table_dir(storage_path, region_id.table_id());
484
485 region_server
486 .handle_request(region_id, RegionRequest::Create(create_req))
487 .await
488 .unwrap();
489
490 region_server
491 .handle_request(region_id, RegionRequest::Close(RegionCloseRequest {}))
492 .await
493 .unwrap();
494 let mut heartbeat_env = HeartbeatResponseTestEnv::new();
495
496 for _ in 0..2 {
498 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
499 let instruction = open_region_instruction(region_id, storage_path);
500
501 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
502 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
503 assert_matches!(control, HandleControl::Continue);
504
505 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
506
507 if let InstructionReply::OpenRegions(reply) = reply {
508 assert!(reply.result);
509 assert!(reply.error.is_none());
510 } else {
511 unreachable!()
512 }
513 }
514 }
515
516 #[tokio::test]
517 async fn test_open_not_exists_region() {
518 common_telemetry::init_default_ut_logging();
519
520 let mut region_server = mock_region_server();
521 let kv_backend = Arc::new(MemoryKvBackend::new());
522 let heartbeat_handler =
523 RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend);
524
525 let mut engine_env = TestEnv::with_prefix("open-not-exists-region").await;
526 let engine = engine_env.create_engine(MitoConfig::default()).await;
527 region_server.register_engine(Arc::new(engine));
528 let region_id = RegionId::new(1024, 1);
529 let storage_path = "test";
530
531 let mut heartbeat_env = HeartbeatResponseTestEnv::new();
532
533 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
534 let instruction = open_region_instruction(region_id, storage_path);
535
536 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
537 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
538 assert_matches!(control, HandleControl::Continue);
539
540 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
541
542 if let InstructionReply::OpenRegions(reply) = reply {
543 assert!(!reply.result);
544 assert!(reply.error.is_some());
545 } else {
546 unreachable!()
547 }
548 }
549
550 #[tokio::test]
551 async fn test_downgrade_region() {
552 common_telemetry::init_default_ut_logging();
553
554 let mut region_server = mock_region_server();
555 let kv_backend = Arc::new(MemoryKvBackend::new());
556 let heartbeat_handler =
557 RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend);
558
559 let mut engine_env = TestEnv::with_prefix("downgrade-region").await;
560 let engine = engine_env.create_engine(MitoConfig::default()).await;
561 region_server.register_engine(Arc::new(engine));
562 let region_id = RegionId::new(1024, 1);
563
564 let builder = CreateRequestBuilder::new();
565 let mut create_req = builder.build();
566 let storage_path = "test";
567 create_req.table_dir = table_dir(storage_path, region_id.table_id());
568
569 region_server
570 .handle_request(region_id, RegionRequest::Create(create_req))
571 .await
572 .unwrap();
573
574 let mut heartbeat_env = HeartbeatResponseTestEnv::new();
575
576 for _ in 0..2 {
578 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
579 let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
580 region_id,
581 flush_timeout: Some(Duration::from_secs(1)),
582 }]);
583
584 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
585 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
586 assert_matches!(control, HandleControl::Continue);
587
588 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
589
590 let reply = &reply.expect_downgrade_regions_reply()[0];
591 assert!(reply.exists);
592 assert!(reply.error.is_none());
593 assert_eq!(reply.last_entry_id.unwrap(), 0);
594 }
595
596 let meta = MessageMeta::new_test(1, "test", "dn-1", "me-0");
598 let instruction = Instruction::DowngradeRegions(vec![DowngradeRegion {
599 region_id: RegionId::new(2048, 1),
600 flush_timeout: Some(Duration::from_secs(1)),
601 }]);
602 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
603 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
604 assert_matches!(control, HandleControl::Continue);
605
606 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
607
608 let reply = reply.expect_downgrade_regions_reply();
609 assert!(!reply[0].exists);
610 assert!(reply[0].error.is_none());
611 assert!(reply[0].last_entry_id.is_none());
612 }
613}