frontend/
heartbeat.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(test)]
16mod tests;
17
18use std::sync::Arc;
19
20use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer};
21use common_meta::heartbeat::handler::{
22    HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
23};
24use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
25use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
26use common_stat::ResourceStatRef;
27use common_telemetry::{debug, error, info, warn};
28use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
29use servers::addrs;
30use servers::heartbeat_options::HeartbeatOptions;
31use snafu::ResultExt;
32use tokio::sync::mpsc;
33use tokio::sync::mpsc::Receiver;
34use tokio::time::{Duration, Instant};
35
36use crate::error;
37use crate::error::Result;
38use crate::frontend::FrontendOptions;
39use crate::metrics::{HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
40
41/// The frontend heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background.
42#[derive(Clone)]
43pub struct HeartbeatTask {
44    peer_addr: String,
45    meta_client: Arc<MetaClient>,
46    report_interval: Duration,
47    retry_interval: Duration,
48    resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
49    start_time_ms: u64,
50    resource_stat: ResourceStatRef,
51}
52
53impl HeartbeatTask {
54    pub fn new(
55        opts: &FrontendOptions,
56        meta_client: Arc<MetaClient>,
57        heartbeat_opts: HeartbeatOptions,
58        resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
59        resource_stat: ResourceStatRef,
60    ) -> Self {
61        HeartbeatTask {
62            // if internal grpc is configured, use its address as the peer address
63            // otherwise use the public grpc address, because peer address only promises to be reachable
64            // by other components, it doesn't matter whether it's internal or external
65            peer_addr: if let Some(internal) = &opts.internal_grpc {
66                addrs::resolve_addr(&internal.bind_addr, Some(&internal.server_addr))
67            } else {
68                addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr))
69            },
70            meta_client,
71            report_interval: heartbeat_opts.interval,
72            retry_interval: heartbeat_opts.retry_interval,
73            resp_handler_executor,
74            start_time_ms: common_time::util::current_time_millis() as u64,
75            resource_stat,
76        }
77    }
78
79    pub async fn start(&self) -> Result<()> {
80        let (req_sender, resp_stream) = self
81            .meta_client
82            .heartbeat()
83            .await
84            .context(error::CreateMetaHeartbeatStreamSnafu)?;
85
86        info!("A heartbeat connection has been established with metasrv");
87
88        let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
89        let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
90
91        self.start_handle_resp_stream(resp_stream, mailbox);
92
93        self.start_heartbeat_report(req_sender, outgoing_rx);
94
95        Ok(())
96    }
97
98    fn start_handle_resp_stream(&self, mut resp_stream: HeartbeatStream, mailbox: MailboxRef) {
99        let capture_self = self.clone();
100        let retry_interval = self.retry_interval;
101
102        let _handle = common_runtime::spawn_hb(async move {
103            loop {
104                match resp_stream.message().await {
105                    Ok(Some(resp)) => {
106                        debug!("Receiving heartbeat response: {:?}", resp);
107                        if let Some(message) = &resp.mailbox_message {
108                            info!("Received mailbox message: {message:?}");
109                        }
110                        let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), resp);
111                        if let Err(e) = capture_self.handle_response(ctx).await {
112                            error!(e; "Error while handling heartbeat response");
113                            HEARTBEAT_RECV_COUNT
114                                .with_label_values(&["processing_error"])
115                                .inc();
116                        } else {
117                            HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
118                        }
119                    }
120                    Ok(None) => {
121                        warn!("Heartbeat response stream closed");
122                        capture_self.start_with_retry(retry_interval).await;
123                        break;
124                    }
125                    Err(e) => {
126                        HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
127                        error!(e; "Occur error while reading heartbeat response");
128                        capture_self.start_with_retry(retry_interval).await;
129
130                        break;
131                    }
132                }
133            }
134        });
135    }
136
137    fn new_heartbeat_request(
138        heartbeat_request: &HeartbeatRequest,
139        message: Option<OutgoingMessage>,
140        cpu_usage: i64,
141        memory_usage: i64,
142    ) -> Option<HeartbeatRequest> {
143        let mailbox_message = match message.map(outgoing_message_to_mailbox_message) {
144            Some(Ok(message)) => Some(message),
145            Some(Err(e)) => {
146                error!(e; "Failed to encode mailbox messages");
147                return None;
148            }
149            None => None,
150        };
151
152        let mut heartbeat_request = HeartbeatRequest {
153            mailbox_message,
154            ..heartbeat_request.clone()
155        };
156
157        if let Some(info) = heartbeat_request.info.as_mut() {
158            info.memory_usage_bytes = memory_usage;
159            info.cpu_usage_millicores = cpu_usage;
160        }
161
162        Some(heartbeat_request)
163    }
164
165    #[allow(deprecated)]
166    fn build_node_info(
167        start_time_ms: u64,
168        total_cpu_millicores: i64,
169        total_memory_bytes: i64,
170    ) -> Option<NodeInfo> {
171        let build_info = common_version::build_info();
172
173        Some(NodeInfo {
174            version: build_info.version.to_string(),
175            git_commit: build_info.commit_short.to_string(),
176            start_time_ms,
177            total_cpu_millicores,
178            total_memory_bytes,
179            cpu_usage_millicores: 0,
180            memory_usage_bytes: 0,
181            // TODO(zyy17): Remove these deprecated fields when the deprecated fields are removed from the proto.
182            cpus: total_cpu_millicores as u32,
183            memory_bytes: total_memory_bytes as u64,
184            hostname: hostname::get()
185                .unwrap_or_default()
186                .to_string_lossy()
187                .to_string(),
188        })
189    }
190
191    fn start_heartbeat_report(
192        &self,
193        req_sender: HeartbeatSender,
194        mut outgoing_rx: Receiver<OutgoingMessage>,
195    ) {
196        let report_interval = self.report_interval;
197        let start_time_ms = self.start_time_ms;
198        let self_peer = Some(Peer {
199            // The peer id doesn't make sense for frontend, so we just set it 0.
200            id: 0,
201            addr: self.peer_addr.clone(),
202        });
203        let total_cpu_millicores = self.resource_stat.get_total_cpu_millicores();
204        let total_memory_bytes = self.resource_stat.get_total_memory_bytes();
205        let resource_stat = self.resource_stat.clone();
206        common_runtime::spawn_hb(async move {
207            let sleep = tokio::time::sleep(Duration::from_millis(0));
208            tokio::pin!(sleep);
209
210            let heartbeat_request = HeartbeatRequest {
211                peer: self_peer,
212                info: Self::build_node_info(
213                    start_time_ms,
214                    total_cpu_millicores,
215                    total_memory_bytes,
216                ),
217                ..Default::default()
218            };
219
220            loop {
221                let req = tokio::select! {
222                    message = outgoing_rx.recv() => {
223                        if let Some(message) = message {
224                            Self::new_heartbeat_request(&heartbeat_request, Some(message), 0, 0)
225                        } else {
226                            warn!("Sender has been dropped, exiting the heartbeat loop");
227                            // Receives None that means Sender was dropped, we need to break the current loop
228                            break
229                        }
230                    }
231                    _ = &mut sleep => {
232                       sleep.as_mut().reset(Instant::now() + report_interval);
233                       Self::new_heartbeat_request(&heartbeat_request, None, resource_stat.get_cpu_usage_millicores(), resource_stat.get_memory_usage_bytes())
234                    }
235                };
236
237                if let Some(req) = req {
238                    if let Err(e) = req_sender.send(req.clone()).await {
239                        error!(e; "Failed to send heartbeat to metasrv");
240                        break;
241                    } else {
242                        HEARTBEAT_SENT_COUNT.inc();
243                        debug!("Send a heartbeat request to metasrv, content: {:?}", req);
244                    }
245                }
246            }
247        });
248    }
249
250    async fn handle_response(&self, ctx: HeartbeatResponseHandlerContext) -> Result<()> {
251        self.resp_handler_executor
252            .handle(ctx)
253            .await
254            .context(error::HandleHeartbeatResponseSnafu)
255    }
256
257    async fn start_with_retry(&self, retry_interval: Duration) {
258        loop {
259            tokio::time::sleep(retry_interval).await;
260
261            info!("Try to re-establish the heartbeat connection to metasrv.");
262
263            if self.start().await.is_ok() {
264                break;
265            }
266        }
267    }
268}