1use std::sync::Arc;
18use std::sync::atomic::{AtomicBool, Ordering};
19
20use api::v1::meta::{HeartbeatRequest, Peer};
21use common_error::ext::BoxedError;
22use common_meta::heartbeat::handler::{
23 HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
24};
25use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
26use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
27use common_meta::key::flow::flow_state::FlowStat;
28use common_stat::ResourceStatRef;
29use common_telemetry::{debug, error, info, warn};
30use greptime_proto::v1::meta::NodeInfo;
31use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
32use servers::addrs;
33use servers::heartbeat_options::HeartbeatOptions;
34use snafu::ResultExt;
35use tokio::sync::mpsc;
36use tokio::time::Duration;
37
38use crate::error::ExternalSnafu;
39use crate::utils::SizeReportSender;
40use crate::{Error, FlownodeOptions};
41
42async fn query_flow_state(
43 query_stat_size: &Option<SizeReportSender>,
44 timeout: Duration,
45) -> Option<FlowStat> {
46 if let Some(report_requester) = query_stat_size.as_ref() {
47 let ret = report_requester.query(timeout).await;
48 match ret {
49 Ok(latest) => Some(latest),
50 Err(err) => {
51 error!(err; "Failed to get query stat size");
52 None
53 }
54 }
55 } else {
56 None
57 }
58}
59
60#[derive(Clone)]
62pub struct HeartbeatTask {
63 node_id: u64,
64 node_epoch: u64,
65 peer_addr: String,
66 meta_client: Arc<MetaClient>,
67 report_interval: Duration,
68 retry_interval: Duration,
69 resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
70 running: Arc<AtomicBool>,
71 query_stat_size: Option<SizeReportSender>,
72 resource_stat: ResourceStatRef,
73}
74
75impl HeartbeatTask {
76 pub fn with_query_stat_size(mut self, query_stat_size: SizeReportSender) -> Self {
77 self.query_stat_size = Some(query_stat_size);
78 self
79 }
80
81 pub fn new(
82 opts: &FlownodeOptions,
83 meta_client: Arc<MetaClient>,
84 heartbeat_opts: HeartbeatOptions,
85 resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
86 resource_stat: ResourceStatRef,
87 ) -> Self {
88 Self {
89 node_id: opts.node_id.unwrap_or(0),
90 node_epoch: common_time::util::current_time_millis() as u64,
91 peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
92 meta_client,
93 report_interval: heartbeat_opts.interval,
94 retry_interval: heartbeat_opts.retry_interval,
95 resp_handler_executor,
96 running: Arc::new(AtomicBool::new(false)),
97 query_stat_size: None,
98 resource_stat,
99 }
100 }
101
102 pub async fn start(&self) -> Result<(), Error> {
103 if self
104 .running
105 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
106 .is_err()
107 {
108 warn!("Heartbeat task started multiple times");
109 return Ok(());
110 }
111
112 self.create_streams().await
113 }
114
115 async fn create_streams(&self) -> Result<(), Error> {
116 info!("Start to establish the heartbeat connection to metasrv.");
117 let (req_sender, resp_stream) = self
118 .meta_client
119 .heartbeat()
120 .await
121 .map_err(BoxedError::new)
122 .context(ExternalSnafu)?;
123
124 info!("Flownode's heartbeat connection has been established with metasrv");
125
126 let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
127 let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
128
129 self.start_handle_resp_stream(resp_stream, mailbox);
130
131 self.start_heartbeat_report(req_sender, outgoing_rx);
132
133 Ok(())
134 }
135
136 pub fn shutdown(&self) {
137 info!("Close heartbeat task for flownode");
138 if self
139 .running
140 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
141 .is_err()
142 {
143 warn!("Call close heartbeat task multiple times");
144 }
145 }
146
147 fn new_heartbeat_request(
148 heartbeat_request: &HeartbeatRequest,
149 message: Option<OutgoingMessage>,
150 latest_report: &Option<FlowStat>,
151 cpu_usage: i64,
152 memory_usage: i64,
153 ) -> Option<HeartbeatRequest> {
154 let mailbox_message = match message.map(outgoing_message_to_mailbox_message) {
155 Some(Ok(message)) => Some(message),
156 Some(Err(e)) => {
157 error!(e; "Failed to encode mailbox messages");
158 return None;
159 }
160 None => None,
161 };
162 let flow_stat = latest_report
163 .as_ref()
164 .map(|report| api::v1::meta::FlowStat {
165 flow_stat_size: report
166 .state_size
167 .iter()
168 .map(|(k, v)| (*k, *v as u64))
169 .collect(),
170 flow_last_exec_time_map: report
171 .last_exec_time_map
172 .iter()
173 .map(|(k, v)| (*k, *v))
174 .collect(),
175 });
176
177 let mut heartbeat_request = HeartbeatRequest {
178 mailbox_message,
179 flow_stat,
180 ..heartbeat_request.clone()
181 };
182
183 if let Some(info) = heartbeat_request.info.as_mut() {
184 info.cpu_usage_millicores = cpu_usage;
185 info.memory_usage_bytes = memory_usage;
186 }
187
188 Some(heartbeat_request)
189 }
190
191 #[allow(deprecated)]
192 fn build_node_info(
193 start_time_ms: u64,
194 total_cpu_millicores: i64,
195 total_memory_bytes: i64,
196 ) -> Option<NodeInfo> {
197 let build_info = common_version::build_info();
198 Some(NodeInfo {
199 version: build_info.version.to_string(),
200 git_commit: build_info.commit_short.to_string(),
201 start_time_ms,
202 total_cpu_millicores,
203 total_memory_bytes,
204 cpu_usage_millicores: 0,
205 memory_usage_bytes: 0,
206 cpus: total_cpu_millicores as u32,
208 memory_bytes: total_memory_bytes as u64,
209 hostname: hostname::get()
210 .unwrap_or_default()
211 .to_string_lossy()
212 .to_string(),
213 })
214 }
215
216 fn start_heartbeat_report(
217 &self,
218 req_sender: HeartbeatSender,
219 mut outgoing_rx: mpsc::Receiver<OutgoingMessage>,
220 ) {
221 let report_interval = self.report_interval;
222 let node_epoch = self.node_epoch;
223 let self_peer = Some(Peer {
224 id: self.node_id,
225 addr: self.peer_addr.clone(),
226 });
227 let total_cpu_millicores = self.resource_stat.get_total_cpu_millicores();
228 let total_memory_bytes = self.resource_stat.get_total_memory_bytes();
229 let resource_stat = self.resource_stat.clone();
230 let query_stat_size = self.query_stat_size.clone();
231
232 common_runtime::spawn_hb(async move {
233 let mut interval = tokio::time::interval(report_interval);
236 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
237 let mut latest_report = None;
238
239 let heartbeat_request = HeartbeatRequest {
240 peer: self_peer,
241 node_epoch,
242 info: Self::build_node_info(node_epoch, total_cpu_millicores, total_memory_bytes),
243 ..Default::default()
244 };
245
246 loop {
247 let req = tokio::select! {
248 message = outgoing_rx.recv() => {
249 if let Some(message) = message {
250 Self::new_heartbeat_request(&heartbeat_request, Some(message), &latest_report, 0, 0)
251 } else {
252 warn!("Sender has been dropped, exiting the heartbeat loop");
253 break
255 }
256 }
257 _ = interval.tick() => {
258 Self::new_heartbeat_request(&heartbeat_request, None, &latest_report, resource_stat.get_cpu_usage_millicores(), resource_stat.get_memory_usage_bytes())
259 }
260 };
261
262 if let Some(req) = req {
263 if let Err(e) = req_sender.send(req.clone()).await {
264 error!(e; "Failed to send heartbeat to metasrv");
265 break;
266 } else {
267 debug!("Send a heartbeat request to metasrv, content: {:?}", req);
268 }
269 }
270 latest_report = query_flow_state(&query_stat_size, report_interval / 2).await;
274 }
275
276 info!("flownode heartbeat task stopped.");
277 });
278 }
279
280 fn start_handle_resp_stream(&self, mut resp_stream: HeartbeatStream, mailbox: MailboxRef) {
281 let capture_self = self.clone();
282 let retry_interval = self.retry_interval;
283
284 let _handle = common_runtime::spawn_hb(async move {
285 loop {
286 match resp_stream.message().await {
287 Ok(Some(resp)) => {
288 debug!("Receiving heartbeat response: {:?}", resp);
289 let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), resp);
290 if let Err(e) = capture_self.handle_response(ctx).await {
291 error!(e; "Error while handling heartbeat response");
292 }
293 }
294 Ok(None) => {
295 warn!("Heartbeat response stream closed");
296 capture_self.start_with_retry(retry_interval).await;
297 break;
298 }
299 Err(e) => {
300 error!(e; "Occur error while reading heartbeat response");
301 capture_self.start_with_retry(retry_interval).await;
302
303 break;
304 }
305 }
306 }
307 });
308 }
309
310 async fn handle_response(&self, ctx: HeartbeatResponseHandlerContext) -> Result<(), Error> {
311 self.resp_handler_executor
312 .handle(ctx)
313 .await
314 .map_err(BoxedError::new)
315 .context(ExternalSnafu)
316 }
317
318 async fn start_with_retry(&self, retry_interval: Duration) {
319 loop {
320 tokio::time::sleep(retry_interval).await;
321
322 info!("Try to re-establish the heartbeat connection to metasrv.");
323
324 if self.create_streams().await.is_ok() {
325 break;
326 }
327 }
328 }
329}