frontend/
heartbeat.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(test)]
16mod tests;
17
18use std::sync::Arc;
19
20use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer};
21use common_meta::heartbeat::handler::{
22    HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
23};
24use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
25use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
26use common_stat::ResourceStatRef;
27use common_telemetry::{debug, error, info, warn};
28use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
29use servers::addrs;
30use snafu::ResultExt;
31use tokio::sync::mpsc;
32use tokio::sync::mpsc::Receiver;
33use tokio::time::{Duration, Instant};
34
35use crate::error;
36use crate::error::Result;
37use crate::frontend::FrontendOptions;
38use crate::metrics::{HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
39
40/// The frontend heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background.
41#[derive(Clone)]
42pub struct HeartbeatTask {
43    peer_addr: String,
44    meta_client: Arc<MetaClient>,
45    report_interval: Duration,
46    retry_interval: Duration,
47    resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
48    start_time_ms: u64,
49    resource_stat: ResourceStatRef,
50}
51
52impl HeartbeatTask {
53    pub fn new(
54        opts: &FrontendOptions,
55        meta_client: Arc<MetaClient>,
56        resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
57        resource_stat: ResourceStatRef,
58    ) -> Self {
59        HeartbeatTask {
60            // if internal grpc is configured, use its address as the peer address
61            // otherwise use the public grpc address, because peer address only promises to be reachable
62            // by other components, it doesn't matter whether it's internal or external
63            peer_addr: if let Some(internal) = &opts.internal_grpc {
64                addrs::resolve_addr(&internal.bind_addr, Some(&internal.server_addr))
65            } else {
66                addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr))
67            },
68            meta_client,
69            report_interval: opts.heartbeat.interval,
70            retry_interval: opts.heartbeat.retry_interval,
71            resp_handler_executor,
72            start_time_ms: common_time::util::current_time_millis() as u64,
73            resource_stat,
74        }
75    }
76
77    pub async fn start(&self) -> Result<()> {
78        let (req_sender, resp_stream) = self
79            .meta_client
80            .heartbeat()
81            .await
82            .context(error::CreateMetaHeartbeatStreamSnafu)?;
83
84        info!("A heartbeat connection has been established with metasrv");
85
86        let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
87        let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
88
89        self.start_handle_resp_stream(resp_stream, mailbox);
90
91        self.start_heartbeat_report(req_sender, outgoing_rx);
92
93        Ok(())
94    }
95
96    fn start_handle_resp_stream(&self, mut resp_stream: HeartbeatStream, mailbox: MailboxRef) {
97        let capture_self = self.clone();
98        let retry_interval = self.retry_interval;
99
100        let _handle = common_runtime::spawn_hb(async move {
101            loop {
102                match resp_stream.message().await {
103                    Ok(Some(resp)) => {
104                        debug!("Receiving heartbeat response: {:?}", resp);
105                        if let Some(message) = &resp.mailbox_message {
106                            info!("Received mailbox message: {message:?}");
107                        }
108                        let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), resp);
109                        if let Err(e) = capture_self.handle_response(ctx).await {
110                            error!(e; "Error while handling heartbeat response");
111                            HEARTBEAT_RECV_COUNT
112                                .with_label_values(&["processing_error"])
113                                .inc();
114                        } else {
115                            HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
116                        }
117                    }
118                    Ok(None) => {
119                        warn!("Heartbeat response stream closed");
120                        capture_self.start_with_retry(retry_interval).await;
121                        break;
122                    }
123                    Err(e) => {
124                        HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
125                        error!(e; "Occur error while reading heartbeat response");
126                        capture_self.start_with_retry(retry_interval).await;
127
128                        break;
129                    }
130                }
131            }
132        });
133    }
134
135    fn new_heartbeat_request(
136        heartbeat_request: &HeartbeatRequest,
137        message: Option<OutgoingMessage>,
138        cpu_usage: i64,
139        memory_usage: i64,
140    ) -> Option<HeartbeatRequest> {
141        let mailbox_message = match message.map(outgoing_message_to_mailbox_message) {
142            Some(Ok(message)) => Some(message),
143            Some(Err(e)) => {
144                error!(e; "Failed to encode mailbox messages");
145                return None;
146            }
147            None => None,
148        };
149
150        let mut heartbeat_request = HeartbeatRequest {
151            mailbox_message,
152            ..heartbeat_request.clone()
153        };
154
155        if let Some(info) = heartbeat_request.info.as_mut() {
156            info.memory_usage_bytes = memory_usage;
157            info.cpu_usage_millicores = cpu_usage;
158        }
159
160        Some(heartbeat_request)
161    }
162
163    #[allow(deprecated)]
164    fn build_node_info(
165        start_time_ms: u64,
166        total_cpu_millicores: i64,
167        total_memory_bytes: i64,
168    ) -> Option<NodeInfo> {
169        let build_info = common_version::build_info();
170
171        Some(NodeInfo {
172            version: build_info.version.to_string(),
173            git_commit: build_info.commit_short.to_string(),
174            start_time_ms,
175            total_cpu_millicores,
176            total_memory_bytes,
177            cpu_usage_millicores: 0,
178            memory_usage_bytes: 0,
179            // TODO(zyy17): Remove these deprecated fields when the deprecated fields are removed from the proto.
180            cpus: total_cpu_millicores as u32,
181            memory_bytes: total_memory_bytes as u64,
182            hostname: hostname::get()
183                .unwrap_or_default()
184                .to_string_lossy()
185                .to_string(),
186        })
187    }
188
189    fn start_heartbeat_report(
190        &self,
191        req_sender: HeartbeatSender,
192        mut outgoing_rx: Receiver<OutgoingMessage>,
193    ) {
194        let report_interval = self.report_interval;
195        let start_time_ms = self.start_time_ms;
196        let self_peer = Some(Peer {
197            // The node id will be actually calculated from its address (by hashing the address
198            // string) in the metasrv. So it can be set to 0 here, as a placeholder.
199            id: 0,
200            addr: self.peer_addr.clone(),
201        });
202        let total_cpu_millicores = self.resource_stat.get_total_cpu_millicores();
203        let total_memory_bytes = self.resource_stat.get_total_memory_bytes();
204        let resource_stat = self.resource_stat.clone();
205        common_runtime::spawn_hb(async move {
206            let sleep = tokio::time::sleep(Duration::from_millis(0));
207            tokio::pin!(sleep);
208
209            let heartbeat_request = HeartbeatRequest {
210                peer: self_peer,
211                info: Self::build_node_info(
212                    start_time_ms,
213                    total_cpu_millicores,
214                    total_memory_bytes,
215                ),
216                ..Default::default()
217            };
218
219            loop {
220                let req = tokio::select! {
221                    message = outgoing_rx.recv() => {
222                        if let Some(message) = message {
223                            Self::new_heartbeat_request(&heartbeat_request, Some(message), 0, 0)
224                        } else {
225                            warn!("Sender has been dropped, exiting the heartbeat loop");
226                            // Receives None that means Sender was dropped, we need to break the current loop
227                            break
228                        }
229                    }
230                    _ = &mut sleep => {
231                       sleep.as_mut().reset(Instant::now() + report_interval);
232                       Self::new_heartbeat_request(&heartbeat_request, None, resource_stat.get_cpu_usage_millicores(), resource_stat.get_memory_usage_bytes())
233                    }
234                };
235
236                if let Some(req) = req {
237                    if let Err(e) = req_sender.send(req.clone()).await {
238                        error!(e; "Failed to send heartbeat to metasrv");
239                        break;
240                    } else {
241                        HEARTBEAT_SENT_COUNT.inc();
242                        debug!("Send a heartbeat request to metasrv, content: {:?}", req);
243                    }
244                }
245            }
246        });
247    }
248
249    async fn handle_response(&self, ctx: HeartbeatResponseHandlerContext) -> Result<()> {
250        self.resp_handler_executor
251            .handle(ctx)
252            .await
253            .context(error::HandleHeartbeatResponseSnafu)
254    }
255
256    async fn start_with_retry(&self, retry_interval: Duration) {
257        loop {
258            tokio::time::sleep(retry_interval).await;
259
260            info!("Try to re-establish the heartbeat connection to metasrv.");
261
262            if self.start().await.is_ok() {
263                break;
264            }
265        }
266    }
267}