frontend/
heartbeat.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(test)]
16mod tests;
17
18use std::sync::Arc;
19
20use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer};
21use common_config::utils::ResourceSpec;
22use common_meta::heartbeat::handler::{
23    HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
24};
25use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
26use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
27use common_telemetry::{debug, error, info, warn};
28use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
29use servers::addrs;
30use servers::heartbeat_options::HeartbeatOptions;
31use snafu::ResultExt;
32use tokio::sync::mpsc;
33use tokio::sync::mpsc::Receiver;
34use tokio::time::{Duration, Instant};
35
36use crate::error;
37use crate::error::Result;
38use crate::frontend::FrontendOptions;
39use crate::metrics::{HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
40
41/// The frontend heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background.
42#[derive(Clone)]
43pub struct HeartbeatTask {
44    peer_addr: String,
45    meta_client: Arc<MetaClient>,
46    report_interval: Duration,
47    retry_interval: Duration,
48    resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
49    start_time_ms: u64,
50    resource_spec: ResourceSpec,
51}
52
53impl HeartbeatTask {
54    pub fn new(
55        opts: &FrontendOptions,
56        meta_client: Arc<MetaClient>,
57        heartbeat_opts: HeartbeatOptions,
58        resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
59    ) -> Self {
60        HeartbeatTask {
61            // if internal grpc is configured, use its address as the peer address
62            // otherwise use the public grpc address, because peer address only promises to be reachable
63            // by other components, it doesn't matter whether it's internal or external
64            peer_addr: if let Some(internal) = &opts.internal_grpc {
65                addrs::resolve_addr(&internal.bind_addr, Some(&internal.server_addr))
66            } else {
67                addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr))
68            },
69            meta_client,
70            report_interval: heartbeat_opts.interval,
71            retry_interval: heartbeat_opts.retry_interval,
72            resp_handler_executor,
73            start_time_ms: common_time::util::current_time_millis() as u64,
74            resource_spec: Default::default(),
75        }
76    }
77
78    pub async fn start(&self) -> Result<()> {
79        let (req_sender, resp_stream) = self
80            .meta_client
81            .heartbeat()
82            .await
83            .context(error::CreateMetaHeartbeatStreamSnafu)?;
84
85        info!("A heartbeat connection has been established with metasrv");
86
87        let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
88        let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
89
90        self.start_handle_resp_stream(resp_stream, mailbox);
91
92        self.start_heartbeat_report(req_sender, outgoing_rx);
93
94        Ok(())
95    }
96
97    fn start_handle_resp_stream(&self, mut resp_stream: HeartbeatStream, mailbox: MailboxRef) {
98        let capture_self = self.clone();
99        let retry_interval = self.retry_interval;
100
101        let _handle = common_runtime::spawn_hb(async move {
102            loop {
103                match resp_stream.message().await {
104                    Ok(Some(resp)) => {
105                        debug!("Receiving heartbeat response: {:?}", resp);
106                        let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), resp);
107                        if let Err(e) = capture_self.handle_response(ctx).await {
108                            error!(e; "Error while handling heartbeat response");
109                            HEARTBEAT_RECV_COUNT
110                                .with_label_values(&["processing_error"])
111                                .inc();
112                        } else {
113                            HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
114                        }
115                    }
116                    Ok(None) => {
117                        warn!("Heartbeat response stream closed");
118                        capture_self.start_with_retry(retry_interval).await;
119                        break;
120                    }
121                    Err(e) => {
122                        HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
123                        error!(e; "Occur error while reading heartbeat response");
124                        capture_self.start_with_retry(retry_interval).await;
125
126                        break;
127                    }
128                }
129            }
130        });
131    }
132
133    fn new_heartbeat_request(
134        heartbeat_request: &HeartbeatRequest,
135        message: Option<OutgoingMessage>,
136    ) -> Option<HeartbeatRequest> {
137        let mailbox_message = match message.map(outgoing_message_to_mailbox_message) {
138            Some(Ok(message)) => Some(message),
139            Some(Err(e)) => {
140                error!(e; "Failed to encode mailbox messages");
141                return None;
142            }
143            None => None,
144        };
145
146        Some(HeartbeatRequest {
147            mailbox_message,
148            ..heartbeat_request.clone()
149        })
150    }
151
152    fn build_node_info(start_time_ms: u64, cpus: u32, memory_bytes: u64) -> Option<NodeInfo> {
153        let build_info = common_version::build_info();
154
155        Some(NodeInfo {
156            version: build_info.version.to_string(),
157            git_commit: build_info.commit_short.to_string(),
158            start_time_ms,
159            cpus,
160            memory_bytes,
161            hostname: hostname::get()
162                .unwrap_or_default()
163                .to_string_lossy()
164                .to_string(),
165        })
166    }
167
168    fn start_heartbeat_report(
169        &self,
170        req_sender: HeartbeatSender,
171        mut outgoing_rx: Receiver<OutgoingMessage>,
172    ) {
173        let report_interval = self.report_interval;
174        let start_time_ms = self.start_time_ms;
175        let self_peer = Some(Peer {
176            // The peer id doesn't make sense for frontend, so we just set it 0.
177            id: 0,
178            addr: self.peer_addr.clone(),
179        });
180        let cpus = self.resource_spec.cpus as u32;
181        let memory_bytes = self.resource_spec.memory.unwrap_or_default().as_bytes();
182
183        common_runtime::spawn_hb(async move {
184            let sleep = tokio::time::sleep(Duration::from_millis(0));
185            tokio::pin!(sleep);
186
187            let heartbeat_request = HeartbeatRequest {
188                peer: self_peer,
189                info: Self::build_node_info(start_time_ms, cpus, memory_bytes),
190                ..Default::default()
191            };
192
193            loop {
194                let req = tokio::select! {
195                    message = outgoing_rx.recv() => {
196                        if let Some(message) = message {
197                            Self::new_heartbeat_request(&heartbeat_request, Some(message))
198                        } else {
199                            warn!("Sender has been dropped, exiting the heartbeat loop");
200                            // Receives None that means Sender was dropped, we need to break the current loop
201                            break
202                        }
203                    }
204                    _ = &mut sleep => {
205                        sleep.as_mut().reset(Instant::now() + report_interval);
206                       Self::new_heartbeat_request(&heartbeat_request, None)
207                    }
208                };
209
210                if let Some(req) = req {
211                    if let Err(e) = req_sender.send(req.clone()).await {
212                        error!(e; "Failed to send heartbeat to metasrv");
213                        break;
214                    } else {
215                        HEARTBEAT_SENT_COUNT.inc();
216                        debug!("Send a heartbeat request to metasrv, content: {:?}", req);
217                    }
218                }
219            }
220        });
221    }
222
223    async fn handle_response(&self, ctx: HeartbeatResponseHandlerContext) -> Result<()> {
224        self.resp_handler_executor
225            .handle(ctx)
226            .await
227            .context(error::HandleHeartbeatResponseSnafu)
228    }
229
230    async fn start_with_retry(&self, retry_interval: Duration) {
231        loop {
232            tokio::time::sleep(retry_interval).await;
233
234            info!("Try to re-establish the heartbeat connection to metasrv.");
235
236            if self.start().await.is_ok() {
237                break;
238            }
239        }
240    }
241}