Skip to main content

frontend/
heartbeat.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#[cfg(test)]
16mod tests;
17
18use std::sync::Arc;
19
20use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer};
21use common_meta::datanode::EnvVars;
22use common_meta::heartbeat::handler::{
23    HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
24};
25use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
26use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
27use common_stat::ResourceStatRef;
28use common_telemetry::{debug, error, info, warn};
29use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
30use servers::addrs;
31use snafu::ResultExt;
32use tokio::sync::mpsc;
33use tokio::sync::mpsc::Receiver;
34use tokio::time::{Duration, Instant};
35
36use crate::error;
37use crate::error::Result;
38use crate::frontend::FrontendOptions;
39use crate::metrics::{HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
40
41/// The frontend heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background.
42#[derive(Clone)]
43pub struct HeartbeatTask {
44    peer_addr: String,
45    meta_client: Arc<MetaClient>,
46    resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
47    start_time_ms: u64,
48    resource_stat: ResourceStatRef,
49    env_vars: EnvVars,
50}
51
52impl HeartbeatTask {
53    pub fn new(
54        peer_addr: String,
55        opts: &FrontendOptions,
56        meta_client: Arc<MetaClient>,
57        resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
58        resource_stat: ResourceStatRef,
59    ) -> Self {
60        HeartbeatTask {
61            peer_addr,
62            meta_client,
63            resp_handler_executor,
64            start_time_ms: common_time::util::current_time_millis() as u64,
65            resource_stat,
66            env_vars: EnvVars::from_config(&opts.heartbeat_env_vars),
67        }
68    }
69
70    pub async fn start(&self) -> Result<()> {
71        let (req_sender, resp_stream, config) = self
72            .meta_client
73            .heartbeat()
74            .await
75            .context(error::CreateMetaHeartbeatStreamSnafu)?;
76
77        info!("Heartbeat started with Metasrv config: {}", config);
78
79        let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
80        let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
81
82        self.start_handle_resp_stream(resp_stream, mailbox, config.retry_interval);
83
84        self.start_heartbeat_report(req_sender, outgoing_rx, config.interval);
85
86        Ok(())
87    }
88
89    fn start_handle_resp_stream(
90        &self,
91        mut resp_stream: HeartbeatStream,
92        mailbox: MailboxRef,
93        retry_interval: Duration,
94    ) {
95        let capture_self = self.clone();
96
97        let _handle = common_runtime::spawn_hb(async move {
98            loop {
99                match resp_stream.message().await {
100                    Ok(Some(resp)) => {
101                        debug!("Receiving heartbeat response: {:?}", resp);
102                        if let Some(message) = &resp.mailbox_message {
103                            info!("Received mailbox message: {message:?}");
104                        }
105                        let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), resp);
106                        if let Err(e) = capture_self.handle_response(ctx).await {
107                            error!(e; "Error while handling heartbeat response");
108                            HEARTBEAT_RECV_COUNT
109                                .with_label_values(&["processing_error"])
110                                .inc();
111                        } else {
112                            HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
113                        }
114                    }
115                    Ok(None) => {
116                        warn!("Heartbeat response stream closed");
117                        capture_self.start_with_retry(retry_interval).await;
118                        break;
119                    }
120                    Err(e) => {
121                        HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
122                        error!(e; "Occur error while reading heartbeat response");
123                        capture_self.start_with_retry(retry_interval).await;
124
125                        break;
126                    }
127                }
128            }
129        });
130    }
131
132    fn new_heartbeat_request(
133        heartbeat_request: &HeartbeatRequest,
134        message: Option<OutgoingMessage>,
135        cpu_usage: i64,
136        memory_usage: i64,
137    ) -> Option<HeartbeatRequest> {
138        let mailbox_message = match message.map(outgoing_message_to_mailbox_message) {
139            Some(Ok(message)) => Some(message),
140            Some(Err(e)) => {
141                error!(e; "Failed to encode mailbox messages");
142                return None;
143            }
144            None => None,
145        };
146
147        let mut heartbeat_request = HeartbeatRequest {
148            mailbox_message,
149            ..heartbeat_request.clone()
150        };
151
152        if let Some(info) = heartbeat_request.info.as_mut() {
153            info.memory_usage_bytes = memory_usage;
154            info.cpu_usage_millicores = cpu_usage;
155        }
156
157        Some(heartbeat_request)
158    }
159
160    #[allow(deprecated)]
161    fn build_node_info(
162        start_time_ms: u64,
163        total_cpu_millicores: i64,
164        total_memory_bytes: i64,
165    ) -> Option<NodeInfo> {
166        let build_info = common_version::build_info();
167
168        Some(NodeInfo {
169            version: build_info.version.to_string(),
170            git_commit: build_info.commit_short.to_string(),
171            start_time_ms,
172            total_cpu_millicores,
173            total_memory_bytes,
174            cpu_usage_millicores: 0,
175            memory_usage_bytes: 0,
176            // TODO(zyy17): Remove these deprecated fields when the deprecated fields are removed from the proto.
177            cpus: total_cpu_millicores as u32,
178            memory_bytes: total_memory_bytes as u64,
179            hostname: hostname::get()
180                .unwrap_or_default()
181                .to_string_lossy()
182                .to_string(),
183        })
184    }
185
186    fn start_heartbeat_report(
187        &self,
188        req_sender: HeartbeatSender,
189        mut outgoing_rx: Receiver<OutgoingMessage>,
190        report_interval: Duration,
191    ) {
192        let start_time_ms = self.start_time_ms;
193        let self_peer = Some(Peer {
194            // The node id will be actually calculated from its address (by hashing the address
195            // string) in the metasrv. So it can be set to 0 here, as a placeholder.
196            id: 0,
197            addr: self.peer_addr.clone(),
198        });
199        let total_cpu_millicores = self.resource_stat.get_total_cpu_millicores();
200        let total_memory_bytes = self.resource_stat.get_total_memory_bytes();
201        let resource_stat = self.resource_stat.clone();
202        let env_vars = self.env_vars.clone();
203        common_runtime::spawn_hb(async move {
204            let sleep = tokio::time::sleep(Duration::from_millis(0));
205            tokio::pin!(sleep);
206
207            let mut extensions = std::collections::HashMap::new();
208            env_vars.into_extensions(&mut extensions);
209
210            let heartbeat_request = HeartbeatRequest {
211                peer: self_peer,
212                info: Self::build_node_info(
213                    start_time_ms,
214                    total_cpu_millicores,
215                    total_memory_bytes,
216                ),
217                extensions,
218                ..Default::default()
219            };
220
221            loop {
222                let req = tokio::select! {
223                    message = outgoing_rx.recv() => {
224                        if let Some(message) = message {
225                            Self::new_heartbeat_request(&heartbeat_request, Some(message), 0, 0)
226                        } else {
227                            warn!("Sender has been dropped, exiting the heartbeat loop");
228                            // Receives None that means Sender was dropped, we need to break the current loop
229                            break
230                        }
231                    }
232                    _ = &mut sleep => {
233                       sleep.as_mut().reset(Instant::now() + report_interval);
234                       Self::new_heartbeat_request(&heartbeat_request, None, resource_stat.get_cpu_usage_millicores(), resource_stat.get_memory_usage_bytes())
235                    }
236                };
237
238                if let Some(req) = req {
239                    if let Err(e) = req_sender.send(req.clone()).await {
240                        error!(e; "Failed to send heartbeat to metasrv");
241                        break;
242                    } else {
243                        HEARTBEAT_SENT_COUNT.inc();
244                        debug!("Send a heartbeat request to metasrv, content: {:?}", req);
245                    }
246                }
247            }
248        });
249    }
250
251    async fn handle_response(&self, ctx: HeartbeatResponseHandlerContext) -> Result<()> {
252        self.resp_handler_executor
253            .handle(ctx)
254            .await
255            .context(error::HandleHeartbeatResponseSnafu)
256    }
257
258    async fn start_with_retry(&self, retry_interval: Duration) {
259        loop {
260            tokio::time::sleep(retry_interval).await;
261
262            info!("Try to re-establish the heartbeat connection to metasrv.");
263
264            if self.start().await.is_ok() {
265                break;
266            }
267        }
268    }
269}
270
271pub(crate) fn frontend_peer_addr(opts: &FrontendOptions) -> String {
272    // if internal grpc is configured, use its address as the peer address
273    // otherwise use the public grpc address, because peer address only promises to be reachable
274    // by other components, it doesn't matter whether it's internal or external
275    if let Some(internal) = &opts.internal_grpc {
276        addrs::resolve_addr(&internal.bind_addr, Some(&internal.server_addr))
277    } else {
278        addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr))
279    }
280}