flow/
heartbeat.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Send heartbeat from flownode to metasrv
16
17use std::sync::Arc;
18use std::sync::atomic::{AtomicBool, Ordering};
19
20use api::v1::meta::{HeartbeatRequest, Peer};
21use common_error::ext::BoxedError;
22use common_meta::heartbeat::handler::{
23    HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
24};
25use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
26use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
27use common_meta::key::flow::flow_state::FlowStat;
28use common_stat::ResourceStatRef;
29use common_telemetry::{debug, error, info, warn};
30use greptime_proto::v1::meta::NodeInfo;
31use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
32use servers::addrs;
33use snafu::ResultExt;
34use tokio::sync::mpsc;
35use tokio::time::Duration;
36
37use crate::error::ExternalSnafu;
38use crate::utils::SizeReportSender;
39use crate::{Error, FlownodeOptions};
40
41async fn query_flow_state(
42    query_stat_size: &Option<SizeReportSender>,
43    timeout: Duration,
44) -> Option<FlowStat> {
45    if let Some(report_requester) = query_stat_size.as_ref() {
46        let ret = report_requester.query(timeout).await;
47        match ret {
48            Ok(latest) => Some(latest),
49            Err(err) => {
50                error!(err; "Failed to get query stat size");
51                None
52            }
53        }
54    } else {
55        None
56    }
57}
58
59/// The flownode heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background.
60#[derive(Clone)]
61pub struct HeartbeatTask {
62    node_id: u64,
63    node_epoch: u64,
64    peer_addr: String,
65    meta_client: Arc<MetaClient>,
66    resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
67    running: Arc<AtomicBool>,
68    query_stat_size: Option<SizeReportSender>,
69    resource_stat: ResourceStatRef,
70}
71
72impl HeartbeatTask {
73    pub fn with_query_stat_size(mut self, query_stat_size: SizeReportSender) -> Self {
74        self.query_stat_size = Some(query_stat_size);
75        self
76    }
77
78    pub fn new(
79        opts: &FlownodeOptions,
80        meta_client: Arc<MetaClient>,
81        resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
82        resource_stat: ResourceStatRef,
83    ) -> Self {
84        Self {
85            node_id: opts.node_id.unwrap_or(0),
86            node_epoch: common_time::util::current_time_millis() as u64,
87            peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
88            meta_client,
89            resp_handler_executor,
90            running: Arc::new(AtomicBool::new(false)),
91            query_stat_size: None,
92            resource_stat,
93        }
94    }
95
96    pub async fn start(&self) -> Result<(), Error> {
97        if self
98            .running
99            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
100            .is_err()
101        {
102            warn!("Heartbeat task started multiple times");
103            return Ok(());
104        }
105
106        self.create_streams().await
107    }
108
109    async fn create_streams(&self) -> Result<(), Error> {
110        info!("Establishing heartbeat connection to Metasrv...");
111
112        let (req_sender, resp_stream, config) = self
113            .meta_client
114            .heartbeat()
115            .await
116            .map_err(BoxedError::new)
117            .context(ExternalSnafu)?;
118
119        info!(
120            "Heartbeat started for flownode {}, Metasrv config: {}",
121            self.node_id, config
122        );
123
124        let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
125        let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
126
127        self.start_handle_resp_stream(resp_stream, mailbox, config.retry_interval);
128
129        self.start_heartbeat_report(req_sender, outgoing_rx, config.interval);
130
131        Ok(())
132    }
133
134    pub fn shutdown(&self) {
135        info!("Close heartbeat task for flownode");
136        if self
137            .running
138            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
139            .is_err()
140        {
141            warn!("Call close heartbeat task multiple times");
142        }
143    }
144
145    fn new_heartbeat_request(
146        heartbeat_request: &HeartbeatRequest,
147        message: Option<OutgoingMessage>,
148        latest_report: &Option<FlowStat>,
149        cpu_usage: i64,
150        memory_usage: i64,
151    ) -> Option<HeartbeatRequest> {
152        let mailbox_message = match message.map(outgoing_message_to_mailbox_message) {
153            Some(Ok(message)) => Some(message),
154            Some(Err(e)) => {
155                error!(e; "Failed to encode mailbox messages");
156                return None;
157            }
158            None => None,
159        };
160        let flow_stat = latest_report
161            .as_ref()
162            .map(|report| api::v1::meta::FlowStat {
163                flow_stat_size: report
164                    .state_size
165                    .iter()
166                    .map(|(k, v)| (*k, *v as u64))
167                    .collect(),
168                flow_last_exec_time_map: report
169                    .last_exec_time_map
170                    .iter()
171                    .map(|(k, v)| (*k, *v))
172                    .collect(),
173            });
174
175        let mut heartbeat_request = HeartbeatRequest {
176            mailbox_message,
177            flow_stat,
178            ..heartbeat_request.clone()
179        };
180
181        if let Some(info) = heartbeat_request.info.as_mut() {
182            info.cpu_usage_millicores = cpu_usage;
183            info.memory_usage_bytes = memory_usage;
184        }
185
186        Some(heartbeat_request)
187    }
188
189    #[allow(deprecated)]
190    fn build_node_info(
191        start_time_ms: u64,
192        total_cpu_millicores: i64,
193        total_memory_bytes: i64,
194    ) -> Option<NodeInfo> {
195        let build_info = common_version::build_info();
196        Some(NodeInfo {
197            version: build_info.version.to_string(),
198            git_commit: build_info.commit_short.to_string(),
199            start_time_ms,
200            total_cpu_millicores,
201            total_memory_bytes,
202            cpu_usage_millicores: 0,
203            memory_usage_bytes: 0,
204            // TODO(zyy17): Remove these deprecated fields when the deprecated fields are removed from the proto.
205            cpus: total_cpu_millicores as u32,
206            memory_bytes: total_memory_bytes as u64,
207            hostname: hostname::get()
208                .unwrap_or_default()
209                .to_string_lossy()
210                .to_string(),
211        })
212    }
213
214    fn start_heartbeat_report(
215        &self,
216        req_sender: HeartbeatSender,
217        mut outgoing_rx: mpsc::Receiver<OutgoingMessage>,
218        report_interval: Duration,
219    ) {
220        let node_epoch = self.node_epoch;
221        let self_peer = Some(Peer {
222            id: self.node_id,
223            addr: self.peer_addr.clone(),
224        });
225        let total_cpu_millicores = self.resource_stat.get_total_cpu_millicores();
226        let total_memory_bytes = self.resource_stat.get_total_memory_bytes();
227        let resource_stat = self.resource_stat.clone();
228        let query_stat_size = self.query_stat_size.clone();
229
230        common_runtime::spawn_hb(async move {
231            // note that using interval will cause it to first immediately send
232            // a heartbeat
233            let mut interval = tokio::time::interval(report_interval);
234            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
235            let mut latest_report = None;
236
237            let heartbeat_request = HeartbeatRequest {
238                peer: self_peer,
239                node_epoch,
240                info: Self::build_node_info(node_epoch, total_cpu_millicores, total_memory_bytes),
241                ..Default::default()
242            };
243
244            loop {
245                let req = tokio::select! {
246                    message = outgoing_rx.recv() => {
247                        if let Some(message) = message {
248                            Self::new_heartbeat_request(&heartbeat_request, Some(message), &latest_report, 0, 0)
249                        } else {
250                            warn!("Sender has been dropped, exiting the heartbeat loop");
251                            // Receives None that means Sender was dropped, we need to break the current loop
252                            break
253                        }
254                    }
255                    _ = interval.tick() => {
256                        Self::new_heartbeat_request(&heartbeat_request, None, &latest_report, resource_stat.get_cpu_usage_millicores(), resource_stat.get_memory_usage_bytes())
257                    }
258                };
259
260                if let Some(req) = req {
261                    if let Err(e) = req_sender.send(req.clone()).await {
262                        error!(e; "Failed to send heartbeat to metasrv");
263                        break;
264                    } else {
265                        debug!("Send a heartbeat request to metasrv, content: {:?}", req);
266                    }
267                }
268                // after sending heartbeat, try to get the latest report
269                // TODO(discord9): consider a better place to update the size report
270                // set the timeout to half of the report interval so that it wouldn't delay heartbeat if something went horribly wrong
271                latest_report = query_flow_state(&query_stat_size, report_interval / 2).await;
272            }
273
274            info!("flownode heartbeat task stopped.");
275        });
276    }
277
278    fn start_handle_resp_stream(
279        &self,
280        mut resp_stream: HeartbeatStream,
281        mailbox: MailboxRef,
282        retry_interval: Duration,
283    ) {
284        let capture_self = self.clone();
285
286        let _handle = common_runtime::spawn_hb(async move {
287            loop {
288                match resp_stream.message().await {
289                    Ok(Some(resp)) => {
290                        debug!("Receiving heartbeat response: {:?}", resp);
291                        let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), resp);
292                        if let Err(e) = capture_self.handle_response(ctx).await {
293                            error!(e; "Error while handling heartbeat response");
294                        }
295                    }
296                    Ok(None) => {
297                        warn!("Heartbeat response stream closed");
298                        capture_self.start_with_retry(retry_interval).await;
299                        break;
300                    }
301                    Err(e) => {
302                        error!(e; "Occur error while reading heartbeat response");
303                        capture_self.start_with_retry(retry_interval).await;
304
305                        break;
306                    }
307                }
308            }
309        });
310    }
311
312    async fn handle_response(&self, ctx: HeartbeatResponseHandlerContext) -> Result<(), Error> {
313        self.resp_handler_executor
314            .handle(ctx)
315            .await
316            .map_err(BoxedError::new)
317            .context(ExternalSnafu)
318    }
319
320    async fn start_with_retry(&self, retry_interval: Duration) {
321        loop {
322            tokio::time::sleep(retry_interval).await;
323
324            info!("Try to re-establish the heartbeat connection to metasrv.");
325
326            if self.create_streams().await.is_ok() {
327                break;
328            }
329        }
330    }
331}