common_meta/heartbeat/
handler.rsuse std::sync::Arc;
use api::v1::meta::HeartbeatResponse;
use async_trait::async_trait;
use common_telemetry::error;
use crate::error::Result;
use crate::heartbeat::mailbox::{IncomingMessage, MailboxRef};
pub mod invalidate_table_cache;
pub mod parse_mailbox_message;
#[cfg(test)]
mod tests;
pub type HeartbeatResponseHandlerExecutorRef = Arc<dyn HeartbeatResponseHandlerExecutor>;
pub type HeartbeatResponseHandlerRef = Arc<dyn HeartbeatResponseHandler>;
pub struct HeartbeatResponseHandlerContext {
pub mailbox: MailboxRef,
pub response: HeartbeatResponse,
pub incoming_message: Option<IncomingMessage>,
}
#[derive(Debug, PartialEq)]
pub enum HandleControl {
Continue,
Done,
}
impl HeartbeatResponseHandlerContext {
pub fn new(mailbox: MailboxRef, response: HeartbeatResponse) -> Self {
Self {
mailbox,
response,
incoming_message: None,
}
}
}
#[async_trait]
pub trait HeartbeatResponseHandler: Send + Sync {
fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool;
async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> Result<HandleControl>;
}
#[async_trait]
pub trait HeartbeatResponseHandlerExecutor: Send + Sync {
async fn handle(&self, ctx: HeartbeatResponseHandlerContext) -> Result<()>;
}
pub struct HandlerGroupExecutor {
handlers: Vec<HeartbeatResponseHandlerRef>,
}
impl HandlerGroupExecutor {
pub fn new(handlers: Vec<HeartbeatResponseHandlerRef>) -> Self {
Self { handlers }
}
}
#[async_trait]
impl HeartbeatResponseHandlerExecutor for HandlerGroupExecutor {
async fn handle(&self, mut ctx: HeartbeatResponseHandlerContext) -> Result<()> {
for handler in &self.handlers {
if !handler.is_acceptable(&ctx) {
continue;
}
match handler.handle(&mut ctx).await {
Ok(HandleControl::Done) => break,
Ok(HandleControl::Continue) => {}
Err(e) => {
error!(e;"Error while handling: {:?}", ctx.response);
break;
}
}
}
Ok(())
}
}