frontend/
heartbeat.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(test)]
16mod tests;
17
18use std::sync::Arc;
19
20use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer};
21use common_meta::heartbeat::handler::{
22    HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
23};
24use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
25use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
26use common_telemetry::{debug, error, info};
27use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
28use servers::addrs;
29use servers::heartbeat_options::HeartbeatOptions;
30use snafu::ResultExt;
31use tokio::sync::mpsc;
32use tokio::sync::mpsc::Receiver;
33use tokio::time::{Duration, Instant};
34
35use crate::error;
36use crate::error::Result;
37use crate::frontend::FrontendOptions;
38use crate::metrics::{HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
39
40/// The frontend heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background.
41#[derive(Clone)]
42pub struct HeartbeatTask {
43    peer_addr: String,
44    meta_client: Arc<MetaClient>,
45    report_interval: u64,
46    retry_interval: u64,
47    resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
48    start_time_ms: u64,
49}
50
51impl HeartbeatTask {
52    pub fn new(
53        opts: &FrontendOptions,
54        meta_client: Arc<MetaClient>,
55        heartbeat_opts: HeartbeatOptions,
56        resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
57    ) -> Self {
58        HeartbeatTask {
59            peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
60            meta_client,
61            report_interval: heartbeat_opts.interval.as_millis() as u64,
62            retry_interval: heartbeat_opts.retry_interval.as_millis() as u64,
63            resp_handler_executor,
64            start_time_ms: common_time::util::current_time_millis() as u64,
65        }
66    }
67
68    pub async fn start(&self) -> Result<()> {
69        let (req_sender, resp_stream) = self
70            .meta_client
71            .heartbeat()
72            .await
73            .context(error::CreateMetaHeartbeatStreamSnafu)?;
74
75        info!("A heartbeat connection has been established with metasrv");
76
77        let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
78        let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
79
80        self.start_handle_resp_stream(resp_stream, mailbox);
81
82        self.start_heartbeat_report(req_sender, outgoing_rx);
83
84        Ok(())
85    }
86
87    fn start_handle_resp_stream(&self, mut resp_stream: HeartbeatStream, mailbox: MailboxRef) {
88        let capture_self = self.clone();
89        let retry_interval = self.retry_interval;
90
91        let _handle = common_runtime::spawn_hb(async move {
92            loop {
93                match resp_stream.message().await {
94                    Ok(Some(resp)) => {
95                        debug!("Receiving heartbeat response: {:?}", resp);
96                        let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), resp);
97                        if let Err(e) = capture_self.handle_response(ctx).await {
98                            error!(e; "Error while handling heartbeat response");
99                            HEARTBEAT_RECV_COUNT
100                                .with_label_values(&["processing_error"])
101                                .inc();
102                        } else {
103                            HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
104                        }
105                    }
106                    Ok(None) => break,
107                    Err(e) => {
108                        HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
109                        error!(e; "Occur error while reading heartbeat response");
110                        capture_self
111                            .start_with_retry(Duration::from_millis(retry_interval))
112                            .await;
113
114                        break;
115                    }
116                }
117            }
118        });
119    }
120
121    fn new_heartbeat_request(
122        heartbeat_request: &HeartbeatRequest,
123        message: Option<OutgoingMessage>,
124    ) -> Option<HeartbeatRequest> {
125        let mailbox_message = match message.map(outgoing_message_to_mailbox_message) {
126            Some(Ok(message)) => Some(message),
127            Some(Err(e)) => {
128                error!(e; "Failed to encode mailbox messages");
129                return None;
130            }
131            None => None,
132        };
133
134        Some(HeartbeatRequest {
135            mailbox_message,
136            ..heartbeat_request.clone()
137        })
138    }
139
140    fn build_node_info(start_time_ms: u64) -> Option<NodeInfo> {
141        let build_info = common_version::build_info();
142
143        Some(NodeInfo {
144            version: build_info.version.to_string(),
145            git_commit: build_info.commit_short.to_string(),
146            start_time_ms,
147            cpus: num_cpus::get() as u32,
148        })
149    }
150
151    fn start_heartbeat_report(
152        &self,
153        req_sender: HeartbeatSender,
154        mut outgoing_rx: Receiver<OutgoingMessage>,
155    ) {
156        let report_interval = self.report_interval;
157        let start_time_ms = self.start_time_ms;
158        let self_peer = Some(Peer {
159            // The peer id doesn't make sense for frontend, so we just set it 0.
160            id: 0,
161            addr: self.peer_addr.clone(),
162        });
163
164        common_runtime::spawn_hb(async move {
165            let sleep = tokio::time::sleep(Duration::from_millis(0));
166            tokio::pin!(sleep);
167
168            let heartbeat_request = HeartbeatRequest {
169                peer: self_peer,
170                info: Self::build_node_info(start_time_ms),
171                ..Default::default()
172            };
173
174            loop {
175                let req = tokio::select! {
176                    message = outgoing_rx.recv() => {
177                        if let Some(message) = message {
178                            Self::new_heartbeat_request(&heartbeat_request, Some(message))
179                        } else {
180                            // Receives None that means Sender was dropped, we need to break the current loop
181                            break
182                        }
183                    }
184                    _ = &mut sleep => {
185                        sleep.as_mut().reset(Instant::now() + Duration::from_millis(report_interval));
186                       Self::new_heartbeat_request(&heartbeat_request, None)
187                    }
188                };
189
190                if let Some(req) = req {
191                    if let Err(e) = req_sender.send(req.clone()).await {
192                        error!(e; "Failed to send heartbeat to metasrv");
193                        break;
194                    } else {
195                        HEARTBEAT_SENT_COUNT.inc();
196                        debug!("Send a heartbeat request to metasrv, content: {:?}", req);
197                    }
198                }
199            }
200        });
201    }
202
203    async fn handle_response(&self, ctx: HeartbeatResponseHandlerContext) -> Result<()> {
204        self.resp_handler_executor
205            .handle(ctx)
206            .await
207            .context(error::HandleHeartbeatResponseSnafu)
208    }
209
210    async fn start_with_retry(&self, retry_interval: Duration) {
211        loop {
212            tokio::time::sleep(retry_interval).await;
213
214            info!("Try to re-establish the heartbeat connection to metasrv.");
215
216            if self.start().await.is_ok() {
217                break;
218            }
219        }
220    }
221}