frontend/
heartbeat.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(test)]
16mod tests;
17
18use std::sync::Arc;
19
20use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer};
21use common_config::utils::ResourceSpec;
22use common_meta::heartbeat::handler::{
23    HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
24};
25use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
26use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
27use common_telemetry::{debug, error, info, warn};
28use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
29use servers::addrs;
30use servers::heartbeat_options::HeartbeatOptions;
31use snafu::ResultExt;
32use tokio::sync::mpsc;
33use tokio::sync::mpsc::Receiver;
34use tokio::time::{Duration, Instant};
35
36use crate::error;
37use crate::error::Result;
38use crate::frontend::FrontendOptions;
39use crate::metrics::{HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
40
41/// The frontend heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background.
42#[derive(Clone)]
43pub struct HeartbeatTask {
44    peer_addr: String,
45    meta_client: Arc<MetaClient>,
46    report_interval: Duration,
47    retry_interval: Duration,
48    resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
49    start_time_ms: u64,
50    resource_spec: ResourceSpec,
51}
52
53impl HeartbeatTask {
54    pub fn new(
55        opts: &FrontendOptions,
56        meta_client: Arc<MetaClient>,
57        heartbeat_opts: HeartbeatOptions,
58        resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
59    ) -> Self {
60        HeartbeatTask {
61            // if internal grpc is configured, use its address as the peer address
62            // otherwise use the public grpc address, because peer address only promises to be reachable
63            // by other components, it doesn't matter whether it's internal or external
64            peer_addr: if let Some(internal) = &opts.internal_grpc {
65                addrs::resolve_addr(&internal.bind_addr, Some(&internal.server_addr))
66            } else {
67                addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr))
68            },
69            meta_client,
70            report_interval: heartbeat_opts.interval,
71            retry_interval: heartbeat_opts.retry_interval,
72            resp_handler_executor,
73            start_time_ms: common_time::util::current_time_millis() as u64,
74            resource_spec: Default::default(),
75        }
76    }
77
78    pub async fn start(&self) -> Result<()> {
79        let (req_sender, resp_stream) = self
80            .meta_client
81            .heartbeat()
82            .await
83            .context(error::CreateMetaHeartbeatStreamSnafu)?;
84
85        info!("A heartbeat connection has been established with metasrv");
86
87        let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
88        let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
89
90        self.start_handle_resp_stream(resp_stream, mailbox);
91
92        self.start_heartbeat_report(req_sender, outgoing_rx);
93
94        Ok(())
95    }
96
97    fn start_handle_resp_stream(&self, mut resp_stream: HeartbeatStream, mailbox: MailboxRef) {
98        let capture_self = self.clone();
99        let retry_interval = self.retry_interval;
100
101        let _handle = common_runtime::spawn_hb(async move {
102            loop {
103                match resp_stream.message().await {
104                    Ok(Some(resp)) => {
105                        debug!("Receiving heartbeat response: {:?}", resp);
106                        let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), resp);
107                        if let Err(e) = capture_self.handle_response(ctx).await {
108                            error!(e; "Error while handling heartbeat response");
109                            HEARTBEAT_RECV_COUNT
110                                .with_label_values(&["processing_error"])
111                                .inc();
112                        } else {
113                            HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
114                        }
115                    }
116                    Ok(None) => {
117                        warn!("Heartbeat response stream closed");
118                        capture_self.start_with_retry(retry_interval).await;
119                        break;
120                    }
121                    Err(e) => {
122                        HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
123                        error!(e; "Occur error while reading heartbeat response");
124                        capture_self.start_with_retry(retry_interval).await;
125
126                        break;
127                    }
128                }
129            }
130        });
131    }
132
133    fn new_heartbeat_request(
134        heartbeat_request: &HeartbeatRequest,
135        message: Option<OutgoingMessage>,
136    ) -> Option<HeartbeatRequest> {
137        let mailbox_message = match message.map(outgoing_message_to_mailbox_message) {
138            Some(Ok(message)) => Some(message),
139            Some(Err(e)) => {
140                error!(e; "Failed to encode mailbox messages");
141                return None;
142            }
143            None => None,
144        };
145
146        Some(HeartbeatRequest {
147            mailbox_message,
148            ..heartbeat_request.clone()
149        })
150    }
151
152    fn build_node_info(start_time_ms: u64, cpus: u32, memory_bytes: u64) -> Option<NodeInfo> {
153        let build_info = common_version::build_info();
154
155        Some(NodeInfo {
156            version: build_info.version.to_string(),
157            git_commit: build_info.commit_short.to_string(),
158            start_time_ms,
159            cpus,
160            memory_bytes,
161        })
162    }
163
164    fn start_heartbeat_report(
165        &self,
166        req_sender: HeartbeatSender,
167        mut outgoing_rx: Receiver<OutgoingMessage>,
168    ) {
169        let report_interval = self.report_interval;
170        let start_time_ms = self.start_time_ms;
171        let self_peer = Some(Peer {
172            // The peer id doesn't make sense for frontend, so we just set it 0.
173            id: 0,
174            addr: self.peer_addr.clone(),
175        });
176        let cpus = self.resource_spec.cpus as u32;
177        let memory_bytes = self.resource_spec.memory.unwrap_or_default().as_bytes();
178
179        common_runtime::spawn_hb(async move {
180            let sleep = tokio::time::sleep(Duration::from_millis(0));
181            tokio::pin!(sleep);
182
183            let heartbeat_request = HeartbeatRequest {
184                peer: self_peer,
185                info: Self::build_node_info(start_time_ms, cpus, memory_bytes),
186                ..Default::default()
187            };
188
189            loop {
190                let req = tokio::select! {
191                    message = outgoing_rx.recv() => {
192                        if let Some(message) = message {
193                            Self::new_heartbeat_request(&heartbeat_request, Some(message))
194                        } else {
195                            warn!("Sender has been dropped, exiting the heartbeat loop");
196                            // Receives None that means Sender was dropped, we need to break the current loop
197                            break
198                        }
199                    }
200                    _ = &mut sleep => {
201                        sleep.as_mut().reset(Instant::now() + report_interval);
202                       Self::new_heartbeat_request(&heartbeat_request, None)
203                    }
204                };
205
206                if let Some(req) = req {
207                    if let Err(e) = req_sender.send(req.clone()).await {
208                        error!(e; "Failed to send heartbeat to metasrv");
209                        break;
210                    } else {
211                        HEARTBEAT_SENT_COUNT.inc();
212                        debug!("Send a heartbeat request to metasrv, content: {:?}", req);
213                    }
214                }
215            }
216        });
217    }
218
219    async fn handle_response(&self, ctx: HeartbeatResponseHandlerContext) -> Result<()> {
220        self.resp_handler_executor
221            .handle(ctx)
222            .await
223            .context(error::HandleHeartbeatResponseSnafu)
224    }
225
226    async fn start_with_retry(&self, retry_interval: Duration) {
227        loop {
228            tokio::time::sleep(retry_interval).await;
229
230            info!("Try to re-establish the heartbeat connection to metasrv.");
231
232            if self.start().await.is_ok() {
233                break;
234            }
235        }
236    }
237}