flow/
heartbeat.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Send heartbeat from flownode to metasrv
16
17use std::sync::Arc;
18use std::sync::atomic::{AtomicBool, Ordering};
19
20use api::v1::meta::{HeartbeatRequest, Peer};
21use common_error::ext::BoxedError;
22use common_meta::heartbeat::handler::{
23    HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
24};
25use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
26use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
27use common_meta::key::flow::flow_state::FlowStat;
28use common_stat::ResourceStatRef;
29use common_telemetry::{debug, error, info, warn};
30use greptime_proto::v1::meta::NodeInfo;
31use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
32use servers::addrs;
33use servers::heartbeat_options::HeartbeatOptions;
34use snafu::ResultExt;
35use tokio::sync::mpsc;
36use tokio::time::Duration;
37
38use crate::error::ExternalSnafu;
39use crate::utils::SizeReportSender;
40use crate::{Error, FlownodeOptions};
41
42async fn query_flow_state(
43    query_stat_size: &Option<SizeReportSender>,
44    timeout: Duration,
45) -> Option<FlowStat> {
46    if let Some(report_requester) = query_stat_size.as_ref() {
47        let ret = report_requester.query(timeout).await;
48        match ret {
49            Ok(latest) => Some(latest),
50            Err(err) => {
51                error!(err; "Failed to get query stat size");
52                None
53            }
54        }
55    } else {
56        None
57    }
58}
59
60/// The flownode heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background.
61#[derive(Clone)]
62pub struct HeartbeatTask {
63    node_id: u64,
64    node_epoch: u64,
65    peer_addr: String,
66    meta_client: Arc<MetaClient>,
67    report_interval: Duration,
68    retry_interval: Duration,
69    resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
70    running: Arc<AtomicBool>,
71    query_stat_size: Option<SizeReportSender>,
72    resource_stat: ResourceStatRef,
73}
74
75impl HeartbeatTask {
76    pub fn with_query_stat_size(mut self, query_stat_size: SizeReportSender) -> Self {
77        self.query_stat_size = Some(query_stat_size);
78        self
79    }
80
81    pub fn new(
82        opts: &FlownodeOptions,
83        meta_client: Arc<MetaClient>,
84        heartbeat_opts: HeartbeatOptions,
85        resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
86        resource_stat: ResourceStatRef,
87    ) -> Self {
88        Self {
89            node_id: opts.node_id.unwrap_or(0),
90            node_epoch: common_time::util::current_time_millis() as u64,
91            peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
92            meta_client,
93            report_interval: heartbeat_opts.interval,
94            retry_interval: heartbeat_opts.retry_interval,
95            resp_handler_executor,
96            running: Arc::new(AtomicBool::new(false)),
97            query_stat_size: None,
98            resource_stat,
99        }
100    }
101
102    pub async fn start(&self) -> Result<(), Error> {
103        if self
104            .running
105            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
106            .is_err()
107        {
108            warn!("Heartbeat task started multiple times");
109            return Ok(());
110        }
111
112        self.create_streams().await
113    }
114
115    async fn create_streams(&self) -> Result<(), Error> {
116        info!("Start to establish the heartbeat connection to metasrv.");
117        let (req_sender, resp_stream) = self
118            .meta_client
119            .heartbeat()
120            .await
121            .map_err(BoxedError::new)
122            .context(ExternalSnafu)?;
123
124        info!("Flownode's heartbeat connection has been established with metasrv");
125
126        let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
127        let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
128
129        self.start_handle_resp_stream(resp_stream, mailbox);
130
131        self.start_heartbeat_report(req_sender, outgoing_rx);
132
133        Ok(())
134    }
135
136    pub fn shutdown(&self) {
137        info!("Close heartbeat task for flownode");
138        if self
139            .running
140            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
141            .is_err()
142        {
143            warn!("Call close heartbeat task multiple times");
144        }
145    }
146
147    fn new_heartbeat_request(
148        heartbeat_request: &HeartbeatRequest,
149        message: Option<OutgoingMessage>,
150        latest_report: &Option<FlowStat>,
151        cpu_usage: i64,
152        memory_usage: i64,
153    ) -> Option<HeartbeatRequest> {
154        let mailbox_message = match message.map(outgoing_message_to_mailbox_message) {
155            Some(Ok(message)) => Some(message),
156            Some(Err(e)) => {
157                error!(e; "Failed to encode mailbox messages");
158                return None;
159            }
160            None => None,
161        };
162        let flow_stat = latest_report
163            .as_ref()
164            .map(|report| api::v1::meta::FlowStat {
165                flow_stat_size: report
166                    .state_size
167                    .iter()
168                    .map(|(k, v)| (*k, *v as u64))
169                    .collect(),
170                flow_last_exec_time_map: report
171                    .last_exec_time_map
172                    .iter()
173                    .map(|(k, v)| (*k, *v))
174                    .collect(),
175            });
176
177        let mut heartbeat_request = HeartbeatRequest {
178            mailbox_message,
179            flow_stat,
180            ..heartbeat_request.clone()
181        };
182
183        if let Some(info) = heartbeat_request.info.as_mut() {
184            info.cpu_usage_millicores = cpu_usage;
185            info.memory_usage_bytes = memory_usage;
186        }
187
188        Some(heartbeat_request)
189    }
190
191    #[allow(deprecated)]
192    fn build_node_info(
193        start_time_ms: u64,
194        total_cpu_millicores: i64,
195        total_memory_bytes: i64,
196    ) -> Option<NodeInfo> {
197        let build_info = common_version::build_info();
198        Some(NodeInfo {
199            version: build_info.version.to_string(),
200            git_commit: build_info.commit_short.to_string(),
201            start_time_ms,
202            total_cpu_millicores,
203            total_memory_bytes,
204            cpu_usage_millicores: 0,
205            memory_usage_bytes: 0,
206            // TODO(zyy17): Remove these deprecated fields when the deprecated fields are removed from the proto.
207            cpus: total_cpu_millicores as u32,
208            memory_bytes: total_memory_bytes as u64,
209            hostname: hostname::get()
210                .unwrap_or_default()
211                .to_string_lossy()
212                .to_string(),
213        })
214    }
215
216    fn start_heartbeat_report(
217        &self,
218        req_sender: HeartbeatSender,
219        mut outgoing_rx: mpsc::Receiver<OutgoingMessage>,
220    ) {
221        let report_interval = self.report_interval;
222        let node_epoch = self.node_epoch;
223        let self_peer = Some(Peer {
224            id: self.node_id,
225            addr: self.peer_addr.clone(),
226        });
227        let total_cpu_millicores = self.resource_stat.get_total_cpu_millicores();
228        let total_memory_bytes = self.resource_stat.get_total_memory_bytes();
229        let resource_stat = self.resource_stat.clone();
230        let query_stat_size = self.query_stat_size.clone();
231
232        common_runtime::spawn_hb(async move {
233            // note that using interval will cause it to first immediately send
234            // a heartbeat
235            let mut interval = tokio::time::interval(report_interval);
236            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
237            let mut latest_report = None;
238
239            let heartbeat_request = HeartbeatRequest {
240                peer: self_peer,
241                node_epoch,
242                info: Self::build_node_info(node_epoch, total_cpu_millicores, total_memory_bytes),
243                ..Default::default()
244            };
245
246            loop {
247                let req = tokio::select! {
248                    message = outgoing_rx.recv() => {
249                        if let Some(message) = message {
250                            Self::new_heartbeat_request(&heartbeat_request, Some(message), &latest_report, 0, 0)
251                        } else {
252                            warn!("Sender has been dropped, exiting the heartbeat loop");
253                            // Receives None that means Sender was dropped, we need to break the current loop
254                            break
255                        }
256                    }
257                    _ = interval.tick() => {
258                        Self::new_heartbeat_request(&heartbeat_request, None, &latest_report, resource_stat.get_cpu_usage_millicores(), resource_stat.get_memory_usage_bytes())
259                    }
260                };
261
262                if let Some(req) = req {
263                    if let Err(e) = req_sender.send(req.clone()).await {
264                        error!(e; "Failed to send heartbeat to metasrv");
265                        break;
266                    } else {
267                        debug!("Send a heartbeat request to metasrv, content: {:?}", req);
268                    }
269                }
270                // after sending heartbeat, try to get the latest report
271                // TODO(discord9): consider a better place to update the size report
272                // set the timeout to half of the report interval so that it wouldn't delay heartbeat if something went horribly wrong
273                latest_report = query_flow_state(&query_stat_size, report_interval / 2).await;
274            }
275
276            info!("flownode heartbeat task stopped.");
277        });
278    }
279
280    fn start_handle_resp_stream(&self, mut resp_stream: HeartbeatStream, mailbox: MailboxRef) {
281        let capture_self = self.clone();
282        let retry_interval = self.retry_interval;
283
284        let _handle = common_runtime::spawn_hb(async move {
285            loop {
286                match resp_stream.message().await {
287                    Ok(Some(resp)) => {
288                        debug!("Receiving heartbeat response: {:?}", resp);
289                        let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), resp);
290                        if let Err(e) = capture_self.handle_response(ctx).await {
291                            error!(e; "Error while handling heartbeat response");
292                        }
293                    }
294                    Ok(None) => {
295                        warn!("Heartbeat response stream closed");
296                        capture_self.start_with_retry(retry_interval).await;
297                        break;
298                    }
299                    Err(e) => {
300                        error!(e; "Occur error while reading heartbeat response");
301                        capture_self.start_with_retry(retry_interval).await;
302
303                        break;
304                    }
305                }
306            }
307        });
308    }
309
310    async fn handle_response(&self, ctx: HeartbeatResponseHandlerContext) -> Result<(), Error> {
311        self.resp_handler_executor
312            .handle(ctx)
313            .await
314            .map_err(BoxedError::new)
315            .context(ExternalSnafu)
316    }
317
318    async fn start_with_retry(&self, retry_interval: Duration) {
319        loop {
320            tokio::time::sleep(retry_interval).await;
321
322            info!("Try to re-establish the heartbeat connection to metasrv.");
323
324            if self.create_streams().await.is_ok() {
325                break;
326            }
327        }
328    }
329}