flow/
heartbeat.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Send heartbeat from flownode to metasrv
16
17use std::sync::atomic::{AtomicBool, Ordering};
18use std::sync::Arc;
19
20use api::v1::meta::{HeartbeatRequest, Peer};
21use common_error::ext::BoxedError;
22use common_meta::heartbeat::handler::{
23    HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
24};
25use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
26use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
27use common_meta::key::flow::flow_state::FlowStat;
28use common_telemetry::{debug, error, info, warn};
29use greptime_proto::v1::meta::NodeInfo;
30use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
31use servers::addrs;
32use servers::heartbeat_options::HeartbeatOptions;
33use snafu::ResultExt;
34use tokio::sync::mpsc;
35use tokio::time::Duration;
36
37use crate::error::ExternalSnafu;
38use crate::utils::SizeReportSender;
39use crate::{Error, FlownodeOptions};
40
41async fn query_flow_state(
42    query_stat_size: &Option<SizeReportSender>,
43    timeout: Duration,
44) -> Option<FlowStat> {
45    if let Some(report_requester) = query_stat_size.as_ref() {
46        let ret = report_requester.query(timeout).await;
47        match ret {
48            Ok(latest) => Some(latest),
49            Err(err) => {
50                error!(err; "Failed to get query stat size");
51                None
52            }
53        }
54    } else {
55        None
56    }
57}
58
59/// The flownode heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background.
60#[derive(Clone)]
61pub struct HeartbeatTask {
62    node_id: u64,
63    node_epoch: u64,
64    peer_addr: String,
65    meta_client: Arc<MetaClient>,
66    report_interval: Duration,
67    retry_interval: Duration,
68    resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
69    running: Arc<AtomicBool>,
70    query_stat_size: Option<SizeReportSender>,
71}
72
73impl HeartbeatTask {
74    pub fn with_query_stat_size(mut self, query_stat_size: SizeReportSender) -> Self {
75        self.query_stat_size = Some(query_stat_size);
76        self
77    }
78    pub fn new(
79        opts: &FlownodeOptions,
80        meta_client: Arc<MetaClient>,
81        heartbeat_opts: HeartbeatOptions,
82        resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
83    ) -> Self {
84        Self {
85            node_id: opts.node_id.unwrap_or(0),
86            node_epoch: common_time::util::current_time_millis() as u64,
87            peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
88            meta_client,
89            report_interval: heartbeat_opts.interval,
90            retry_interval: heartbeat_opts.retry_interval,
91            resp_handler_executor,
92            running: Arc::new(AtomicBool::new(false)),
93            query_stat_size: None,
94        }
95    }
96
97    pub async fn start(&self) -> Result<(), Error> {
98        if self
99            .running
100            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
101            .is_err()
102        {
103            warn!("Heartbeat task started multiple times");
104            return Ok(());
105        }
106
107        self.create_streams().await
108    }
109
110    async fn create_streams(&self) -> Result<(), Error> {
111        info!("Start to establish the heartbeat connection to metasrv.");
112        let (req_sender, resp_stream) = self
113            .meta_client
114            .heartbeat()
115            .await
116            .map_err(BoxedError::new)
117            .context(ExternalSnafu)?;
118
119        info!("Flownode's heartbeat connection has been established with metasrv");
120
121        let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
122        let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
123
124        self.start_handle_resp_stream(resp_stream, mailbox);
125
126        self.start_heartbeat_report(req_sender, outgoing_rx);
127
128        Ok(())
129    }
130
131    pub fn shutdown(&self) {
132        info!("Close heartbeat task for flownode");
133        if self
134            .running
135            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
136            .is_err()
137        {
138            warn!("Call close heartbeat task multiple times");
139        }
140    }
141
142    fn new_heartbeat_request(
143        heartbeat_request: &HeartbeatRequest,
144        message: Option<OutgoingMessage>,
145        latest_report: &Option<FlowStat>,
146    ) -> Option<HeartbeatRequest> {
147        let mailbox_message = match message.map(outgoing_message_to_mailbox_message) {
148            Some(Ok(message)) => Some(message),
149            Some(Err(e)) => {
150                error!(e; "Failed to encode mailbox messages");
151                return None;
152            }
153            None => None,
154        };
155        let flow_stat = latest_report
156            .as_ref()
157            .map(|report| api::v1::meta::FlowStat {
158                flow_stat_size: report
159                    .state_size
160                    .iter()
161                    .map(|(k, v)| (*k, *v as u64))
162                    .collect(),
163                flow_last_exec_time_map: report
164                    .last_exec_time_map
165                    .iter()
166                    .map(|(k, v)| (*k, *v))
167                    .collect(),
168            });
169
170        Some(HeartbeatRequest {
171            mailbox_message,
172            flow_stat,
173            ..heartbeat_request.clone()
174        })
175    }
176
177    fn build_node_info(start_time_ms: u64) -> Option<NodeInfo> {
178        let build_info = common_version::build_info();
179        Some(NodeInfo {
180            version: build_info.version.to_string(),
181            git_commit: build_info.commit_short.to_string(),
182            start_time_ms,
183            cpus: num_cpus::get() as u32,
184        })
185    }
186
187    fn start_heartbeat_report(
188        &self,
189        req_sender: HeartbeatSender,
190        mut outgoing_rx: mpsc::Receiver<OutgoingMessage>,
191    ) {
192        let report_interval = self.report_interval;
193        let node_epoch = self.node_epoch;
194        let self_peer = Some(Peer {
195            id: self.node_id,
196            addr: self.peer_addr.clone(),
197        });
198
199        let query_stat_size = self.query_stat_size.clone();
200
201        common_runtime::spawn_hb(async move {
202            // note that using interval will cause it to first immediately send
203            // a heartbeat
204            let mut interval = tokio::time::interval(report_interval);
205            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
206            let mut latest_report = None;
207
208            let heartbeat_request = HeartbeatRequest {
209                peer: self_peer,
210                node_epoch,
211                info: Self::build_node_info(node_epoch),
212                ..Default::default()
213            };
214
215            loop {
216                let req = tokio::select! {
217                    message = outgoing_rx.recv() => {
218                        if let Some(message) = message {
219                            Self::new_heartbeat_request(&heartbeat_request, Some(message), &latest_report)
220                        } else {
221                            // Receives None that means Sender was dropped, we need to break the current loop
222                            break
223                        }
224                    }
225                    _ = interval.tick() => {
226                        Self::new_heartbeat_request(&heartbeat_request, None, &latest_report)
227                    }
228                };
229
230                if let Some(req) = req {
231                    if let Err(e) = req_sender.send(req.clone()).await {
232                        error!(e; "Failed to send heartbeat to metasrv");
233                        break;
234                    } else {
235                        debug!("Send a heartbeat request to metasrv, content: {:?}", req);
236                    }
237                }
238                // after sending heartbeat, try to get the latest report
239                // TODO(discord9): consider a better place to update the size report
240                // set the timeout to half of the report interval so that it wouldn't delay heartbeat if something went horribly wrong
241                latest_report = query_flow_state(&query_stat_size, report_interval / 2).await;
242            }
243
244            info!("flownode heartbeat task stopped.");
245        });
246    }
247
248    fn start_handle_resp_stream(&self, mut resp_stream: HeartbeatStream, mailbox: MailboxRef) {
249        let capture_self = self.clone();
250        let retry_interval = self.retry_interval;
251
252        let _handle = common_runtime::spawn_hb(async move {
253            loop {
254                match resp_stream.message().await {
255                    Ok(Some(resp)) => {
256                        debug!("Receiving heartbeat response: {:?}", resp);
257                        let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), resp);
258                        if let Err(e) = capture_self.handle_response(ctx).await {
259                            error!(e; "Error while handling heartbeat response");
260                        }
261                    }
262                    Ok(None) => break,
263                    Err(e) => {
264                        error!(e; "Occur error while reading heartbeat response");
265                        capture_self.start_with_retry(retry_interval).await;
266
267                        break;
268                    }
269                }
270            }
271        });
272    }
273
274    async fn handle_response(&self, ctx: HeartbeatResponseHandlerContext) -> Result<(), Error> {
275        self.resp_handler_executor
276            .handle(ctx)
277            .await
278            .map_err(BoxedError::new)
279            .context(ExternalSnafu)
280    }
281
282    async fn start_with_retry(&self, retry_interval: Duration) {
283        loop {
284            tokio::time::sleep(retry_interval).await;
285
286            info!("Try to re-establish the heartbeat connection to metasrv.");
287
288            if self.create_streams().await.is_ok() {
289                break;
290            }
291        }
292    }
293}