common_meta/heartbeat/
handler.rs1use std::sync::Arc;
16
17use api::v1::meta::HeartbeatResponse;
18use async_trait::async_trait;
19use common_telemetry::error;
20
21use crate::error::Result;
22use crate::heartbeat::mailbox::{IncomingMessage, MailboxRef};
23
24pub mod invalidate_table_cache;
25pub mod parse_mailbox_message;
26#[cfg(test)]
27mod tests;
28
29pub type HeartbeatResponseHandlerExecutorRef = Arc<dyn HeartbeatResponseHandlerExecutor>;
30pub type HeartbeatResponseHandlerRef = Arc<dyn HeartbeatResponseHandler>;
31
32pub struct HeartbeatResponseHandlerContext {
33 pub mailbox: MailboxRef,
34 pub response: HeartbeatResponse,
35 pub incoming_message: Option<IncomingMessage>,
36}
37
38#[derive(Debug, PartialEq)]
42pub enum HandleControl {
43 Continue,
44 Done,
45}
46
47impl HeartbeatResponseHandlerContext {
48 pub fn new(mailbox: MailboxRef, response: HeartbeatResponse) -> Self {
49 Self {
50 mailbox,
51 response,
52 incoming_message: None,
53 }
54 }
55}
56
57#[async_trait]
63pub trait HeartbeatResponseHandler: Send + Sync {
64 fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool;
65
66 async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> Result<HandleControl>;
67}
68
69#[async_trait]
70pub trait HeartbeatResponseHandlerExecutor: Send + Sync {
71 async fn handle(&self, ctx: HeartbeatResponseHandlerContext) -> Result<()>;
72}
73
74pub struct HandlerGroupExecutor {
75 handlers: Vec<HeartbeatResponseHandlerRef>,
76}
77
78impl HandlerGroupExecutor {
79 pub fn new(handlers: Vec<HeartbeatResponseHandlerRef>) -> Self {
80 Self { handlers }
81 }
82}
83
84#[async_trait]
85impl HeartbeatResponseHandlerExecutor for HandlerGroupExecutor {
86 async fn handle(&self, mut ctx: HeartbeatResponseHandlerContext) -> Result<()> {
87 for handler in &self.handlers {
88 if !handler.is_acceptable(&ctx) {
89 continue;
90 }
91
92 match handler.handle(&mut ctx).await {
93 Ok(HandleControl::Done) => break,
94 Ok(HandleControl::Continue) => {}
95 Err(e) => {
96 error!(e;"Error while handling: {:?}", ctx.response);
97 break;
98 }
99 }
100 }
101 Ok(())
102 }
103}