common_meta/heartbeat/
handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::v1::meta::HeartbeatResponse;
18use async_trait::async_trait;
19use common_telemetry::error;
20
21use crate::error::Result;
22use crate::heartbeat::mailbox::{IncomingMessage, MailboxRef};
23
24pub mod invalidate_table_cache;
25pub mod parse_mailbox_message;
26#[cfg(test)]
27mod tests;
28
29pub type HeartbeatResponseHandlerExecutorRef = Arc<dyn HeartbeatResponseHandlerExecutor>;
30pub type HeartbeatResponseHandlerRef = Arc<dyn HeartbeatResponseHandler>;
31
32pub struct HeartbeatResponseHandlerContext {
33    pub mailbox: MailboxRef,
34    pub response: HeartbeatResponse,
35    pub incoming_message: Option<IncomingMessage>,
36}
37
38/// HandleControl
39///
40/// Controls process of handling heartbeat response.
41#[derive(Debug, PartialEq)]
42pub enum HandleControl {
43    Continue,
44    Done,
45}
46
47impl HeartbeatResponseHandlerContext {
48    pub fn new(mailbox: MailboxRef, response: HeartbeatResponse) -> Self {
49        Self {
50            mailbox,
51            response,
52            incoming_message: None,
53        }
54    }
55}
56
57/// HeartbeatResponseHandler
58///
59/// [`HeartbeatResponseHandler::is_acceptable`] returns true if handler can handle incoming [`HeartbeatResponseHandlerContext`].
60///
61/// [`HeartbeatResponseHandler::handle`] handles all or part of incoming [`HeartbeatResponseHandlerContext`].
62#[async_trait]
63pub trait HeartbeatResponseHandler: Send + Sync {
64    fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool;
65
66    async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> Result<HandleControl>;
67}
68
69#[async_trait]
70pub trait HeartbeatResponseHandlerExecutor: Send + Sync {
71    async fn handle(&self, ctx: HeartbeatResponseHandlerContext) -> Result<()>;
72}
73
74pub struct HandlerGroupExecutor {
75    handlers: Vec<HeartbeatResponseHandlerRef>,
76}
77
78impl HandlerGroupExecutor {
79    pub fn new(handlers: Vec<HeartbeatResponseHandlerRef>) -> Self {
80        Self { handlers }
81    }
82}
83
84#[async_trait]
85impl HeartbeatResponseHandlerExecutor for HandlerGroupExecutor {
86    async fn handle(&self, mut ctx: HeartbeatResponseHandlerContext) -> Result<()> {
87        for handler in &self.handlers {
88            if !handler.is_acceptable(&ctx) {
89                continue;
90            }
91
92            match handler.handle(&mut ctx).await {
93                Ok(HandleControl::Done) => break,
94                Ok(HandleControl::Continue) => {}
95                Err(e) => {
96                    error!(e;"Error while handling: {:?}", ctx.response);
97                    break;
98                }
99            }
100        }
101        Ok(())
102    }
103}