flow/
heartbeat.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Send heartbeat from flownode to metasrv
16
17use std::sync::Arc;
18use std::sync::atomic::{AtomicBool, Ordering};
19
20use api::v1::meta::{HeartbeatRequest, Peer};
21use common_config::utils::ResourceSpec;
22use common_error::ext::BoxedError;
23use common_meta::heartbeat::handler::{
24    HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
25};
26use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
27use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
28use common_meta::key::flow::flow_state::FlowStat;
29use common_telemetry::{debug, error, info, warn};
30use greptime_proto::v1::meta::NodeInfo;
31use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
32use servers::addrs;
33use servers::heartbeat_options::HeartbeatOptions;
34use snafu::ResultExt;
35use tokio::sync::mpsc;
36use tokio::time::Duration;
37
38use crate::error::ExternalSnafu;
39use crate::utils::SizeReportSender;
40use crate::{Error, FlownodeOptions};
41
42async fn query_flow_state(
43    query_stat_size: &Option<SizeReportSender>,
44    timeout: Duration,
45) -> Option<FlowStat> {
46    if let Some(report_requester) = query_stat_size.as_ref() {
47        let ret = report_requester.query(timeout).await;
48        match ret {
49            Ok(latest) => Some(latest),
50            Err(err) => {
51                error!(err; "Failed to get query stat size");
52                None
53            }
54        }
55    } else {
56        None
57    }
58}
59
60/// The flownode heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background.
61#[derive(Clone)]
62pub struct HeartbeatTask {
63    node_id: u64,
64    node_epoch: u64,
65    peer_addr: String,
66    meta_client: Arc<MetaClient>,
67    report_interval: Duration,
68    retry_interval: Duration,
69    resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
70    running: Arc<AtomicBool>,
71    query_stat_size: Option<SizeReportSender>,
72    resource_spec: ResourceSpec,
73}
74
75impl HeartbeatTask {
76    pub fn with_query_stat_size(mut self, query_stat_size: SizeReportSender) -> Self {
77        self.query_stat_size = Some(query_stat_size);
78        self
79    }
80    pub fn new(
81        opts: &FlownodeOptions,
82        meta_client: Arc<MetaClient>,
83        heartbeat_opts: HeartbeatOptions,
84        resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
85    ) -> Self {
86        Self {
87            node_id: opts.node_id.unwrap_or(0),
88            node_epoch: common_time::util::current_time_millis() as u64,
89            peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
90            meta_client,
91            report_interval: heartbeat_opts.interval,
92            retry_interval: heartbeat_opts.retry_interval,
93            resp_handler_executor,
94            running: Arc::new(AtomicBool::new(false)),
95            query_stat_size: None,
96            resource_spec: Default::default(),
97        }
98    }
99
100    pub async fn start(&self) -> Result<(), Error> {
101        if self
102            .running
103            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
104            .is_err()
105        {
106            warn!("Heartbeat task started multiple times");
107            return Ok(());
108        }
109
110        self.create_streams().await
111    }
112
113    async fn create_streams(&self) -> Result<(), Error> {
114        info!("Start to establish the heartbeat connection to metasrv.");
115        let (req_sender, resp_stream) = self
116            .meta_client
117            .heartbeat()
118            .await
119            .map_err(BoxedError::new)
120            .context(ExternalSnafu)?;
121
122        info!("Flownode's heartbeat connection has been established with metasrv");
123
124        let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
125        let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
126
127        self.start_handle_resp_stream(resp_stream, mailbox);
128
129        self.start_heartbeat_report(req_sender, outgoing_rx);
130
131        Ok(())
132    }
133
134    pub fn shutdown(&self) {
135        info!("Close heartbeat task for flownode");
136        if self
137            .running
138            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
139            .is_err()
140        {
141            warn!("Call close heartbeat task multiple times");
142        }
143    }
144
145    fn new_heartbeat_request(
146        heartbeat_request: &HeartbeatRequest,
147        message: Option<OutgoingMessage>,
148        latest_report: &Option<FlowStat>,
149    ) -> Option<HeartbeatRequest> {
150        let mailbox_message = match message.map(outgoing_message_to_mailbox_message) {
151            Some(Ok(message)) => Some(message),
152            Some(Err(e)) => {
153                error!(e; "Failed to encode mailbox messages");
154                return None;
155            }
156            None => None,
157        };
158        let flow_stat = latest_report
159            .as_ref()
160            .map(|report| api::v1::meta::FlowStat {
161                flow_stat_size: report
162                    .state_size
163                    .iter()
164                    .map(|(k, v)| (*k, *v as u64))
165                    .collect(),
166                flow_last_exec_time_map: report
167                    .last_exec_time_map
168                    .iter()
169                    .map(|(k, v)| (*k, *v))
170                    .collect(),
171            });
172
173        Some(HeartbeatRequest {
174            mailbox_message,
175            flow_stat,
176            ..heartbeat_request.clone()
177        })
178    }
179
180    fn build_node_info(start_time_ms: u64, cpus: u32, memory_bytes: u64) -> Option<NodeInfo> {
181        let build_info = common_version::build_info();
182        Some(NodeInfo {
183            version: build_info.version.to_string(),
184            git_commit: build_info.commit_short.to_string(),
185            start_time_ms,
186            cpus,
187            memory_bytes,
188        })
189    }
190
191    fn start_heartbeat_report(
192        &self,
193        req_sender: HeartbeatSender,
194        mut outgoing_rx: mpsc::Receiver<OutgoingMessage>,
195    ) {
196        let report_interval = self.report_interval;
197        let node_epoch = self.node_epoch;
198        let self_peer = Some(Peer {
199            id: self.node_id,
200            addr: self.peer_addr.clone(),
201        });
202        let cpus = self.resource_spec.cpus as u32;
203        let memory_bytes = self.resource_spec.memory.unwrap_or_default().as_bytes();
204
205        let query_stat_size = self.query_stat_size.clone();
206
207        common_runtime::spawn_hb(async move {
208            // note that using interval will cause it to first immediately send
209            // a heartbeat
210            let mut interval = tokio::time::interval(report_interval);
211            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
212            let mut latest_report = None;
213
214            let heartbeat_request = HeartbeatRequest {
215                peer: self_peer,
216                node_epoch,
217                info: Self::build_node_info(node_epoch, cpus, memory_bytes),
218                ..Default::default()
219            };
220
221            loop {
222                let req = tokio::select! {
223                    message = outgoing_rx.recv() => {
224                        if let Some(message) = message {
225                            Self::new_heartbeat_request(&heartbeat_request, Some(message), &latest_report)
226                        } else {
227                            warn!("Sender has been dropped, exiting the heartbeat loop");
228                            // Receives None that means Sender was dropped, we need to break the current loop
229                            break
230                        }
231                    }
232                    _ = interval.tick() => {
233                        Self::new_heartbeat_request(&heartbeat_request, None, &latest_report)
234                    }
235                };
236
237                if let Some(req) = req {
238                    if let Err(e) = req_sender.send(req.clone()).await {
239                        error!(e; "Failed to send heartbeat to metasrv");
240                        break;
241                    } else {
242                        debug!("Send a heartbeat request to metasrv, content: {:?}", req);
243                    }
244                }
245                // after sending heartbeat, try to get the latest report
246                // TODO(discord9): consider a better place to update the size report
247                // set the timeout to half of the report interval so that it wouldn't delay heartbeat if something went horribly wrong
248                latest_report = query_flow_state(&query_stat_size, report_interval / 2).await;
249            }
250
251            info!("flownode heartbeat task stopped.");
252        });
253    }
254
255    fn start_handle_resp_stream(&self, mut resp_stream: HeartbeatStream, mailbox: MailboxRef) {
256        let capture_self = self.clone();
257        let retry_interval = self.retry_interval;
258
259        let _handle = common_runtime::spawn_hb(async move {
260            loop {
261                match resp_stream.message().await {
262                    Ok(Some(resp)) => {
263                        debug!("Receiving heartbeat response: {:?}", resp);
264                        let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), resp);
265                        if let Err(e) = capture_self.handle_response(ctx).await {
266                            error!(e; "Error while handling heartbeat response");
267                        }
268                    }
269                    Ok(None) => {
270                        warn!("Heartbeat response stream closed");
271                        capture_self.start_with_retry(retry_interval).await;
272                        break;
273                    }
274                    Err(e) => {
275                        error!(e; "Occur error while reading heartbeat response");
276                        capture_self.start_with_retry(retry_interval).await;
277
278                        break;
279                    }
280                }
281            }
282        });
283    }
284
285    async fn handle_response(&self, ctx: HeartbeatResponseHandlerContext) -> Result<(), Error> {
286        self.resp_handler_executor
287            .handle(ctx)
288            .await
289            .map_err(BoxedError::new)
290            .context(ExternalSnafu)
291    }
292
293    async fn start_with_retry(&self, retry_interval: Duration) {
294        loop {
295            tokio::time::sleep(retry_interval).await;
296
297            info!("Try to re-establish the heartbeat connection to metasrv.");
298
299            if self.create_streams().await.is_ok() {
300                break;
301            }
302        }
303    }
304}