common_meta/heartbeat/
handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::v1::meta::HeartbeatResponse;
18use async_trait::async_trait;
19use common_telemetry::error;
20
21use crate::error::Result;
22use crate::heartbeat::mailbox::{IncomingMessage, MailboxRef};
23
24pub mod invalidate_table_cache;
25pub mod parse_mailbox_message;
26pub mod suspend;
27#[cfg(test)]
28mod tests;
29
30pub type HeartbeatResponseHandlerExecutorRef = Arc<dyn HeartbeatResponseHandlerExecutor>;
31pub type HeartbeatResponseHandlerRef = Arc<dyn HeartbeatResponseHandler>;
32
33pub struct HeartbeatResponseHandlerContext {
34    pub mailbox: MailboxRef,
35    pub response: HeartbeatResponse,
36    pub incoming_message: Option<IncomingMessage>,
37}
38
39/// HandleControl
40///
41/// Controls process of handling heartbeat response.
42#[derive(Debug, PartialEq)]
43pub enum HandleControl {
44    Continue,
45    Done,
46}
47
48impl HeartbeatResponseHandlerContext {
49    pub fn new(mailbox: MailboxRef, response: HeartbeatResponse) -> Self {
50        Self {
51            mailbox,
52            response,
53            incoming_message: None,
54        }
55    }
56}
57
58/// HeartbeatResponseHandler
59///
60/// [`HeartbeatResponseHandler::is_acceptable`] returns true if handler can handle incoming [`HeartbeatResponseHandlerContext`].
61///
62/// [`HeartbeatResponseHandler::handle`] handles all or part of incoming [`HeartbeatResponseHandlerContext`].
63#[async_trait]
64pub trait HeartbeatResponseHandler: Send + Sync {
65    fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool;
66
67    async fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> Result<HandleControl>;
68}
69
70#[async_trait]
71pub trait HeartbeatResponseHandlerExecutor: Send + Sync {
72    async fn handle(&self, ctx: HeartbeatResponseHandlerContext) -> Result<()>;
73}
74
75pub struct HandlerGroupExecutor {
76    handlers: Vec<HeartbeatResponseHandlerRef>,
77}
78
79impl HandlerGroupExecutor {
80    pub fn new(handlers: Vec<HeartbeatResponseHandlerRef>) -> Self {
81        Self { handlers }
82    }
83}
84
85#[async_trait]
86impl HeartbeatResponseHandlerExecutor for HandlerGroupExecutor {
87    async fn handle(&self, mut ctx: HeartbeatResponseHandlerContext) -> Result<()> {
88        for handler in &self.handlers {
89            if !handler.is_acceptable(&ctx) {
90                continue;
91            }
92
93            match handler.handle(&mut ctx).await {
94                Ok(HandleControl::Done) => break,
95                Ok(HandleControl::Continue) => {}
96                Err(e) => {
97                    error!(e;"Error while handling: {:?}", ctx.response);
98                    break;
99                }
100            }
101        }
102        Ok(())
103    }
104}