1use std::sync::Arc;
18use std::sync::atomic::{AtomicBool, Ordering};
19
20use api::v1::meta::{HeartbeatRequest, Peer};
21use common_config::utils::ResourceSpec;
22use common_error::ext::BoxedError;
23use common_meta::heartbeat::handler::{
24 HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
25};
26use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
27use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
28use common_meta::key::flow::flow_state::FlowStat;
29use common_telemetry::{debug, error, info, warn};
30use greptime_proto::v1::meta::NodeInfo;
31use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
32use servers::addrs;
33use servers::heartbeat_options::HeartbeatOptions;
34use snafu::ResultExt;
35use tokio::sync::mpsc;
36use tokio::time::Duration;
37
38use crate::error::ExternalSnafu;
39use crate::utils::SizeReportSender;
40use crate::{Error, FlownodeOptions};
41
42async fn query_flow_state(
43 query_stat_size: &Option<SizeReportSender>,
44 timeout: Duration,
45) -> Option<FlowStat> {
46 if let Some(report_requester) = query_stat_size.as_ref() {
47 let ret = report_requester.query(timeout).await;
48 match ret {
49 Ok(latest) => Some(latest),
50 Err(err) => {
51 error!(err; "Failed to get query stat size");
52 None
53 }
54 }
55 } else {
56 None
57 }
58}
59
60#[derive(Clone)]
62pub struct HeartbeatTask {
63 node_id: u64,
64 node_epoch: u64,
65 peer_addr: String,
66 meta_client: Arc<MetaClient>,
67 report_interval: Duration,
68 retry_interval: Duration,
69 resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
70 running: Arc<AtomicBool>,
71 query_stat_size: Option<SizeReportSender>,
72 resource_spec: ResourceSpec,
73}
74
75impl HeartbeatTask {
76 pub fn with_query_stat_size(mut self, query_stat_size: SizeReportSender) -> Self {
77 self.query_stat_size = Some(query_stat_size);
78 self
79 }
80 pub fn new(
81 opts: &FlownodeOptions,
82 meta_client: Arc<MetaClient>,
83 heartbeat_opts: HeartbeatOptions,
84 resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
85 ) -> Self {
86 Self {
87 node_id: opts.node_id.unwrap_or(0),
88 node_epoch: common_time::util::current_time_millis() as u64,
89 peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
90 meta_client,
91 report_interval: heartbeat_opts.interval,
92 retry_interval: heartbeat_opts.retry_interval,
93 resp_handler_executor,
94 running: Arc::new(AtomicBool::new(false)),
95 query_stat_size: None,
96 resource_spec: Default::default(),
97 }
98 }
99
100 pub async fn start(&self) -> Result<(), Error> {
101 if self
102 .running
103 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
104 .is_err()
105 {
106 warn!("Heartbeat task started multiple times");
107 return Ok(());
108 }
109
110 self.create_streams().await
111 }
112
113 async fn create_streams(&self) -> Result<(), Error> {
114 info!("Start to establish the heartbeat connection to metasrv.");
115 let (req_sender, resp_stream) = self
116 .meta_client
117 .heartbeat()
118 .await
119 .map_err(BoxedError::new)
120 .context(ExternalSnafu)?;
121
122 info!("Flownode's heartbeat connection has been established with metasrv");
123
124 let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
125 let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
126
127 self.start_handle_resp_stream(resp_stream, mailbox);
128
129 self.start_heartbeat_report(req_sender, outgoing_rx);
130
131 Ok(())
132 }
133
134 pub fn shutdown(&self) {
135 info!("Close heartbeat task for flownode");
136 if self
137 .running
138 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
139 .is_err()
140 {
141 warn!("Call close heartbeat task multiple times");
142 }
143 }
144
145 fn new_heartbeat_request(
146 heartbeat_request: &HeartbeatRequest,
147 message: Option<OutgoingMessage>,
148 latest_report: &Option<FlowStat>,
149 ) -> Option<HeartbeatRequest> {
150 let mailbox_message = match message.map(outgoing_message_to_mailbox_message) {
151 Some(Ok(message)) => Some(message),
152 Some(Err(e)) => {
153 error!(e; "Failed to encode mailbox messages");
154 return None;
155 }
156 None => None,
157 };
158 let flow_stat = latest_report
159 .as_ref()
160 .map(|report| api::v1::meta::FlowStat {
161 flow_stat_size: report
162 .state_size
163 .iter()
164 .map(|(k, v)| (*k, *v as u64))
165 .collect(),
166 flow_last_exec_time_map: report
167 .last_exec_time_map
168 .iter()
169 .map(|(k, v)| (*k, *v))
170 .collect(),
171 });
172
173 Some(HeartbeatRequest {
174 mailbox_message,
175 flow_stat,
176 ..heartbeat_request.clone()
177 })
178 }
179
180 fn build_node_info(start_time_ms: u64, cpus: u32, memory_bytes: u64) -> Option<NodeInfo> {
181 let build_info = common_version::build_info();
182 Some(NodeInfo {
183 version: build_info.version.to_string(),
184 git_commit: build_info.commit_short.to_string(),
185 start_time_ms,
186 cpus,
187 memory_bytes,
188 })
189 }
190
191 fn start_heartbeat_report(
192 &self,
193 req_sender: HeartbeatSender,
194 mut outgoing_rx: mpsc::Receiver<OutgoingMessage>,
195 ) {
196 let report_interval = self.report_interval;
197 let node_epoch = self.node_epoch;
198 let self_peer = Some(Peer {
199 id: self.node_id,
200 addr: self.peer_addr.clone(),
201 });
202 let cpus = self.resource_spec.cpus as u32;
203 let memory_bytes = self.resource_spec.memory.unwrap_or_default().as_bytes();
204
205 let query_stat_size = self.query_stat_size.clone();
206
207 common_runtime::spawn_hb(async move {
208 let mut interval = tokio::time::interval(report_interval);
211 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
212 let mut latest_report = None;
213
214 let heartbeat_request = HeartbeatRequest {
215 peer: self_peer,
216 node_epoch,
217 info: Self::build_node_info(node_epoch, cpus, memory_bytes),
218 ..Default::default()
219 };
220
221 loop {
222 let req = tokio::select! {
223 message = outgoing_rx.recv() => {
224 if let Some(message) = message {
225 Self::new_heartbeat_request(&heartbeat_request, Some(message), &latest_report)
226 } else {
227 warn!("Sender has been dropped, exiting the heartbeat loop");
228 break
230 }
231 }
232 _ = interval.tick() => {
233 Self::new_heartbeat_request(&heartbeat_request, None, &latest_report)
234 }
235 };
236
237 if let Some(req) = req {
238 if let Err(e) = req_sender.send(req.clone()).await {
239 error!(e; "Failed to send heartbeat to metasrv");
240 break;
241 } else {
242 debug!("Send a heartbeat request to metasrv, content: {:?}", req);
243 }
244 }
245 latest_report = query_flow_state(&query_stat_size, report_interval / 2).await;
249 }
250
251 info!("flownode heartbeat task stopped.");
252 });
253 }
254
255 fn start_handle_resp_stream(&self, mut resp_stream: HeartbeatStream, mailbox: MailboxRef) {
256 let capture_self = self.clone();
257 let retry_interval = self.retry_interval;
258
259 let _handle = common_runtime::spawn_hb(async move {
260 loop {
261 match resp_stream.message().await {
262 Ok(Some(resp)) => {
263 debug!("Receiving heartbeat response: {:?}", resp);
264 let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), resp);
265 if let Err(e) = capture_self.handle_response(ctx).await {
266 error!(e; "Error while handling heartbeat response");
267 }
268 }
269 Ok(None) => {
270 warn!("Heartbeat response stream closed");
271 capture_self.start_with_retry(retry_interval).await;
272 break;
273 }
274 Err(e) => {
275 error!(e; "Occur error while reading heartbeat response");
276 capture_self.start_with_retry(retry_interval).await;
277
278 break;
279 }
280 }
281 }
282 });
283 }
284
285 async fn handle_response(&self, ctx: HeartbeatResponseHandlerContext) -> Result<(), Error> {
286 self.resp_handler_executor
287 .handle(ctx)
288 .await
289 .map_err(BoxedError::new)
290 .context(ExternalSnafu)
291 }
292
293 async fn start_with_retry(&self, retry_interval: Duration) {
294 loop {
295 tokio::time::sleep(retry_interval).await;
296
297 info!("Try to re-establish the heartbeat connection to metasrv.");
298
299 if self.create_streams().await.is_ok() {
300 break;
301 }
302 }
303 }
304}