frontend/
heartbeat.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(test)]
16mod tests;
17
18use std::sync::Arc;
19
20use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer};
21use common_meta::heartbeat::handler::{
22    HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
23};
24use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
25use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
26use common_telemetry::{debug, error, info, warn};
27use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
28use servers::addrs;
29use servers::heartbeat_options::HeartbeatOptions;
30use snafu::ResultExt;
31use tokio::sync::mpsc;
32use tokio::sync::mpsc::Receiver;
33use tokio::time::{Duration, Instant};
34
35use crate::error;
36use crate::error::Result;
37use crate::frontend::FrontendOptions;
38use crate::metrics::{HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
39
40/// The frontend heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background.
41#[derive(Clone)]
42pub struct HeartbeatTask {
43    peer_addr: String,
44    meta_client: Arc<MetaClient>,
45    report_interval: Duration,
46    retry_interval: Duration,
47    resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
48    start_time_ms: u64,
49}
50
51impl HeartbeatTask {
52    pub fn new(
53        opts: &FrontendOptions,
54        meta_client: Arc<MetaClient>,
55        heartbeat_opts: HeartbeatOptions,
56        resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
57    ) -> Self {
58        HeartbeatTask {
59            // if internal grpc is configured, use its address as the peer address
60            // otherwise use the public grpc address, because peer address only promises to be reachable
61            // by other components, it doesn't matter whether it's internal or external
62            peer_addr: if let Some(internal) = &opts.internal_grpc {
63                addrs::resolve_addr(&internal.bind_addr, Some(&internal.server_addr))
64            } else {
65                addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr))
66            },
67            meta_client,
68            report_interval: heartbeat_opts.interval,
69            retry_interval: heartbeat_opts.retry_interval,
70            resp_handler_executor,
71            start_time_ms: common_time::util::current_time_millis() as u64,
72        }
73    }
74
75    pub async fn start(&self) -> Result<()> {
76        let (req_sender, resp_stream) = self
77            .meta_client
78            .heartbeat()
79            .await
80            .context(error::CreateMetaHeartbeatStreamSnafu)?;
81
82        info!("A heartbeat connection has been established with metasrv");
83
84        let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
85        let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
86
87        self.start_handle_resp_stream(resp_stream, mailbox);
88
89        self.start_heartbeat_report(req_sender, outgoing_rx);
90
91        Ok(())
92    }
93
94    fn start_handle_resp_stream(&self, mut resp_stream: HeartbeatStream, mailbox: MailboxRef) {
95        let capture_self = self.clone();
96        let retry_interval = self.retry_interval;
97
98        let _handle = common_runtime::spawn_hb(async move {
99            loop {
100                match resp_stream.message().await {
101                    Ok(Some(resp)) => {
102                        debug!("Receiving heartbeat response: {:?}", resp);
103                        let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), resp);
104                        if let Err(e) = capture_self.handle_response(ctx).await {
105                            error!(e; "Error while handling heartbeat response");
106                            HEARTBEAT_RECV_COUNT
107                                .with_label_values(&["processing_error"])
108                                .inc();
109                        } else {
110                            HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
111                        }
112                    }
113                    Ok(None) => {
114                        warn!("Heartbeat response stream closed");
115                        capture_self.start_with_retry(retry_interval).await;
116                        break;
117                    }
118                    Err(e) => {
119                        HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
120                        error!(e; "Occur error while reading heartbeat response");
121                        capture_self.start_with_retry(retry_interval).await;
122
123                        break;
124                    }
125                }
126            }
127        });
128    }
129
130    fn new_heartbeat_request(
131        heartbeat_request: &HeartbeatRequest,
132        message: Option<OutgoingMessage>,
133    ) -> Option<HeartbeatRequest> {
134        let mailbox_message = match message.map(outgoing_message_to_mailbox_message) {
135            Some(Ok(message)) => Some(message),
136            Some(Err(e)) => {
137                error!(e; "Failed to encode mailbox messages");
138                return None;
139            }
140            None => None,
141        };
142
143        Some(HeartbeatRequest {
144            mailbox_message,
145            ..heartbeat_request.clone()
146        })
147    }
148
149    fn build_node_info(start_time_ms: u64) -> Option<NodeInfo> {
150        let build_info = common_version::build_info();
151
152        Some(NodeInfo {
153            version: build_info.version.to_string(),
154            git_commit: build_info.commit_short.to_string(),
155            start_time_ms,
156            cpus: num_cpus::get() as u32,
157        })
158    }
159
160    fn start_heartbeat_report(
161        &self,
162        req_sender: HeartbeatSender,
163        mut outgoing_rx: Receiver<OutgoingMessage>,
164    ) {
165        let report_interval = self.report_interval;
166        let start_time_ms = self.start_time_ms;
167        let self_peer = Some(Peer {
168            // The peer id doesn't make sense for frontend, so we just set it 0.
169            id: 0,
170            addr: self.peer_addr.clone(),
171        });
172
173        common_runtime::spawn_hb(async move {
174            let sleep = tokio::time::sleep(Duration::from_millis(0));
175            tokio::pin!(sleep);
176
177            let heartbeat_request = HeartbeatRequest {
178                peer: self_peer,
179                info: Self::build_node_info(start_time_ms),
180                ..Default::default()
181            };
182
183            loop {
184                let req = tokio::select! {
185                    message = outgoing_rx.recv() => {
186                        if let Some(message) = message {
187                            Self::new_heartbeat_request(&heartbeat_request, Some(message))
188                        } else {
189                            warn!("Sender has been dropped, exiting the heartbeat loop");
190                            // Receives None that means Sender was dropped, we need to break the current loop
191                            break
192                        }
193                    }
194                    _ = &mut sleep => {
195                        sleep.as_mut().reset(Instant::now() + report_interval);
196                       Self::new_heartbeat_request(&heartbeat_request, None)
197                    }
198                };
199
200                if let Some(req) = req {
201                    if let Err(e) = req_sender.send(req.clone()).await {
202                        error!(e; "Failed to send heartbeat to metasrv");
203                        break;
204                    } else {
205                        HEARTBEAT_SENT_COUNT.inc();
206                        debug!("Send a heartbeat request to metasrv, content: {:?}", req);
207                    }
208                }
209            }
210        });
211    }
212
213    async fn handle_response(&self, ctx: HeartbeatResponseHandlerContext) -> Result<()> {
214        self.resp_handler_executor
215            .handle(ctx)
216            .await
217            .context(error::HandleHeartbeatResponseSnafu)
218    }
219
220    async fn start_with_retry(&self, retry_interval: Duration) {
221        loop {
222            tokio::time::sleep(retry_interval).await;
223
224            info!("Try to re-establish the heartbeat connection to metasrv.");
225
226            if self.start().await.is_ok() {
227                break;
228            }
229        }
230    }
231}