1use std::sync::Arc;
18use std::sync::atomic::{AtomicBool, Ordering};
19
20use api::v1::meta::{HeartbeatRequest, Peer};
21use common_error::ext::BoxedError;
22use common_meta::heartbeat::handler::{
23 HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
24};
25use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
26use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
27use common_meta::key::flow::flow_state::FlowStat;
28use common_stat::ResourceStatRef;
29use common_telemetry::{debug, error, info, warn};
30use greptime_proto::v1::meta::NodeInfo;
31use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
32use servers::addrs;
33use snafu::ResultExt;
34use tokio::sync::mpsc;
35use tokio::time::Duration;
36
37use crate::error::ExternalSnafu;
38use crate::utils::SizeReportSender;
39use crate::{Error, FlownodeOptions};
40
41async fn query_flow_state(
42 query_stat_size: &Option<SizeReportSender>,
43 timeout: Duration,
44) -> Option<FlowStat> {
45 if let Some(report_requester) = query_stat_size.as_ref() {
46 let ret = report_requester.query(timeout).await;
47 match ret {
48 Ok(latest) => Some(latest),
49 Err(err) => {
50 error!(err; "Failed to get query stat size");
51 None
52 }
53 }
54 } else {
55 None
56 }
57}
58
59#[derive(Clone)]
61pub struct HeartbeatTask {
62 node_id: u64,
63 node_epoch: u64,
64 peer_addr: String,
65 meta_client: Arc<MetaClient>,
66 resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
67 running: Arc<AtomicBool>,
68 query_stat_size: Option<SizeReportSender>,
69 resource_stat: ResourceStatRef,
70}
71
72impl HeartbeatTask {
73 pub fn with_query_stat_size(mut self, query_stat_size: SizeReportSender) -> Self {
74 self.query_stat_size = Some(query_stat_size);
75 self
76 }
77
78 pub fn new(
79 opts: &FlownodeOptions,
80 meta_client: Arc<MetaClient>,
81 resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
82 resource_stat: ResourceStatRef,
83 ) -> Self {
84 Self {
85 node_id: opts.node_id.unwrap_or(0),
86 node_epoch: common_time::util::current_time_millis() as u64,
87 peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
88 meta_client,
89 resp_handler_executor,
90 running: Arc::new(AtomicBool::new(false)),
91 query_stat_size: None,
92 resource_stat,
93 }
94 }
95
96 pub async fn start(&self) -> Result<(), Error> {
97 if self
98 .running
99 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
100 .is_err()
101 {
102 warn!("Heartbeat task started multiple times");
103 return Ok(());
104 }
105
106 self.create_streams().await
107 }
108
109 async fn create_streams(&self) -> Result<(), Error> {
110 info!("Establishing heartbeat connection to Metasrv...");
111
112 let (req_sender, resp_stream, config) = self
113 .meta_client
114 .heartbeat()
115 .await
116 .map_err(BoxedError::new)
117 .context(ExternalSnafu)?;
118
119 info!(
120 "Heartbeat started for flownode {}, Metasrv config: {}",
121 self.node_id, config
122 );
123
124 let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
125 let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
126
127 self.start_handle_resp_stream(resp_stream, mailbox, config.retry_interval);
128
129 self.start_heartbeat_report(req_sender, outgoing_rx, config.interval);
130
131 Ok(())
132 }
133
134 pub fn shutdown(&self) {
135 info!("Close heartbeat task for flownode");
136 if self
137 .running
138 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
139 .is_err()
140 {
141 warn!("Call close heartbeat task multiple times");
142 }
143 }
144
145 fn new_heartbeat_request(
146 heartbeat_request: &HeartbeatRequest,
147 message: Option<OutgoingMessage>,
148 latest_report: &Option<FlowStat>,
149 cpu_usage: i64,
150 memory_usage: i64,
151 ) -> Option<HeartbeatRequest> {
152 let mailbox_message = match message.map(outgoing_message_to_mailbox_message) {
153 Some(Ok(message)) => Some(message),
154 Some(Err(e)) => {
155 error!(e; "Failed to encode mailbox messages");
156 return None;
157 }
158 None => None,
159 };
160 let flow_stat = latest_report
161 .as_ref()
162 .map(|report| api::v1::meta::FlowStat {
163 flow_stat_size: report
164 .state_size
165 .iter()
166 .map(|(k, v)| (*k, *v as u64))
167 .collect(),
168 flow_last_exec_time_map: report
169 .last_exec_time_map
170 .iter()
171 .map(|(k, v)| (*k, *v))
172 .collect(),
173 });
174
175 let mut heartbeat_request = HeartbeatRequest {
176 mailbox_message,
177 flow_stat,
178 ..heartbeat_request.clone()
179 };
180
181 if let Some(info) = heartbeat_request.info.as_mut() {
182 info.cpu_usage_millicores = cpu_usage;
183 info.memory_usage_bytes = memory_usage;
184 }
185
186 Some(heartbeat_request)
187 }
188
189 #[allow(deprecated)]
190 fn build_node_info(
191 start_time_ms: u64,
192 total_cpu_millicores: i64,
193 total_memory_bytes: i64,
194 ) -> Option<NodeInfo> {
195 let build_info = common_version::build_info();
196 Some(NodeInfo {
197 version: build_info.version.to_string(),
198 git_commit: build_info.commit_short.to_string(),
199 start_time_ms,
200 total_cpu_millicores,
201 total_memory_bytes,
202 cpu_usage_millicores: 0,
203 memory_usage_bytes: 0,
204 cpus: total_cpu_millicores as u32,
206 memory_bytes: total_memory_bytes as u64,
207 hostname: hostname::get()
208 .unwrap_or_default()
209 .to_string_lossy()
210 .to_string(),
211 })
212 }
213
214 fn start_heartbeat_report(
215 &self,
216 req_sender: HeartbeatSender,
217 mut outgoing_rx: mpsc::Receiver<OutgoingMessage>,
218 report_interval: Duration,
219 ) {
220 let node_epoch = self.node_epoch;
221 let self_peer = Some(Peer {
222 id: self.node_id,
223 addr: self.peer_addr.clone(),
224 });
225 let total_cpu_millicores = self.resource_stat.get_total_cpu_millicores();
226 let total_memory_bytes = self.resource_stat.get_total_memory_bytes();
227 let resource_stat = self.resource_stat.clone();
228 let query_stat_size = self.query_stat_size.clone();
229
230 common_runtime::spawn_hb(async move {
231 let mut interval = tokio::time::interval(report_interval);
234 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
235 let mut latest_report = None;
236
237 let heartbeat_request = HeartbeatRequest {
238 peer: self_peer,
239 node_epoch,
240 info: Self::build_node_info(node_epoch, total_cpu_millicores, total_memory_bytes),
241 ..Default::default()
242 };
243
244 loop {
245 let req = tokio::select! {
246 message = outgoing_rx.recv() => {
247 if let Some(message) = message {
248 Self::new_heartbeat_request(&heartbeat_request, Some(message), &latest_report, 0, 0)
249 } else {
250 warn!("Sender has been dropped, exiting the heartbeat loop");
251 break
253 }
254 }
255 _ = interval.tick() => {
256 Self::new_heartbeat_request(&heartbeat_request, None, &latest_report, resource_stat.get_cpu_usage_millicores(), resource_stat.get_memory_usage_bytes())
257 }
258 };
259
260 if let Some(req) = req {
261 if let Err(e) = req_sender.send(req.clone()).await {
262 error!(e; "Failed to send heartbeat to metasrv");
263 break;
264 } else {
265 debug!("Send a heartbeat request to metasrv, content: {:?}", req);
266 }
267 }
268 latest_report = query_flow_state(&query_stat_size, report_interval / 2).await;
272 }
273
274 info!("flownode heartbeat task stopped.");
275 });
276 }
277
278 fn start_handle_resp_stream(
279 &self,
280 mut resp_stream: HeartbeatStream,
281 mailbox: MailboxRef,
282 retry_interval: Duration,
283 ) {
284 let capture_self = self.clone();
285
286 let _handle = common_runtime::spawn_hb(async move {
287 loop {
288 match resp_stream.message().await {
289 Ok(Some(resp)) => {
290 debug!("Receiving heartbeat response: {:?}", resp);
291 let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), resp);
292 if let Err(e) = capture_self.handle_response(ctx).await {
293 error!(e; "Error while handling heartbeat response");
294 }
295 }
296 Ok(None) => {
297 warn!("Heartbeat response stream closed");
298 capture_self.start_with_retry(retry_interval).await;
299 break;
300 }
301 Err(e) => {
302 error!(e; "Occur error while reading heartbeat response");
303 capture_self.start_with_retry(retry_interval).await;
304
305 break;
306 }
307 }
308 }
309 });
310 }
311
312 async fn handle_response(&self, ctx: HeartbeatResponseHandlerContext) -> Result<(), Error> {
313 self.resp_handler_executor
314 .handle(ctx)
315 .await
316 .map_err(BoxedError::new)
317 .context(ExternalSnafu)
318 }
319
320 async fn start_with_retry(&self, retry_interval: Duration) {
321 loop {
322 tokio::time::sleep(retry_interval).await;
323
324 info!("Try to re-establish the heartbeat connection to metasrv.");
325
326 if self.create_streams().await.is_ok() {
327 break;
328 }
329 }
330 }
331}