common_meta/heartbeat/
handler.rs1use std::sync::Arc;
16
17use api::v1::meta::HeartbeatResponse;
18use async_trait::async_trait;
19use common_telemetry::error;
20
21use crate::error::Result;
22use crate::heartbeat::mailbox::{IncomingMessage, MailboxRef};
23
24pub mod invalidate_table_cache;
25pub mod parse_mailbox_message;
26pub mod suspend;
27#[cfg(test)]
28mod tests;
29
30pub type HeartbeatResponseHandlerExecutorRef = Arc<dyn HeartbeatResponseHandlerExecutor>;
31pub type HeartbeatResponseHandlerRef = Arc<dyn HeartbeatResponseHandler>;
32
33pub struct HeartbeatResponseHandlerContext {
34 pub mailbox: MailboxRef,
35 pub response: HeartbeatResponse,
36 pub incoming_message: Option<IncomingMessage>,
37}
38
39#[derive(Debug, PartialEq)]
43pub enum HandleControl {
44 Continue,
45 Done,
46}
47
48impl HeartbeatResponseHandlerContext {
49 pub fn new(mailbox: MailboxRef, response: HeartbeatResponse) -> Self {
50 Self {
51 mailbox,
52 response,
53 incoming_message: None,
54 }
55 }
56}
57
58#[async_trait]
64pub trait HeartbeatResponseHandler: Send + Sync {
65 fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool;
66
67 async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> Result<HandleControl>;
68}
69
70#[async_trait]
71pub trait HeartbeatResponseHandlerExecutor: Send + Sync {
72 async fn handle(&self, ctx: HeartbeatResponseHandlerContext) -> Result<()>;
73}
74
75pub struct HandlerGroupExecutor {
76 handlers: Vec<HeartbeatResponseHandlerRef>,
77}
78
79impl HandlerGroupExecutor {
80 pub fn new(handlers: Vec<HeartbeatResponseHandlerRef>) -> Self {
81 Self { handlers }
82 }
83}
84
85#[async_trait]
86impl HeartbeatResponseHandlerExecutor for HandlerGroupExecutor {
87 async fn handle(&self, mut ctx: HeartbeatResponseHandlerContext) -> Result<()> {
88 for handler in &self.handlers {
89 if !handler.is_acceptable(&ctx) {
90 continue;
91 }
92
93 match handler.handle(&mut ctx).await {
94 Ok(HandleControl::Done) => break,
95 Ok(HandleControl::Continue) => {}
96 Err(e) => {
97 error!(e;"Error while handling: {:?}", ctx.response);
98 break;
99 }
100 }
101 }
102 Ok(())
103 }
104}