1use std::sync::atomic::{AtomicBool, Ordering};
18use std::sync::Arc;
19
20use api::v1::meta::{HeartbeatRequest, Peer};
21use common_error::ext::BoxedError;
22use common_meta::heartbeat::handler::{
23 HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
24};
25use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
26use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
27use common_meta::key::flow::flow_state::FlowStat;
28use common_telemetry::{debug, error, info, warn};
29use greptime_proto::v1::meta::NodeInfo;
30use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
31use servers::addrs;
32use servers::heartbeat_options::HeartbeatOptions;
33use snafu::ResultExt;
34use tokio::sync::mpsc;
35use tokio::time::Duration;
36
37use crate::error::ExternalSnafu;
38use crate::utils::SizeReportSender;
39use crate::{Error, FlownodeOptions};
40
41async fn query_flow_state(
42 query_stat_size: &Option<SizeReportSender>,
43 timeout: Duration,
44) -> Option<FlowStat> {
45 if let Some(report_requester) = query_stat_size.as_ref() {
46 let ret = report_requester.query(timeout).await;
47 match ret {
48 Ok(latest) => Some(latest),
49 Err(err) => {
50 error!(err; "Failed to get query stat size");
51 None
52 }
53 }
54 } else {
55 None
56 }
57}
58
59#[derive(Clone)]
61pub struct HeartbeatTask {
62 node_id: u64,
63 node_epoch: u64,
64 peer_addr: String,
65 meta_client: Arc<MetaClient>,
66 report_interval: Duration,
67 retry_interval: Duration,
68 resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
69 running: Arc<AtomicBool>,
70 query_stat_size: Option<SizeReportSender>,
71}
72
73impl HeartbeatTask {
74 pub fn with_query_stat_size(mut self, query_stat_size: SizeReportSender) -> Self {
75 self.query_stat_size = Some(query_stat_size);
76 self
77 }
78 pub fn new(
79 opts: &FlownodeOptions,
80 meta_client: Arc<MetaClient>,
81 heartbeat_opts: HeartbeatOptions,
82 resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
83 ) -> Self {
84 Self {
85 node_id: opts.node_id.unwrap_or(0),
86 node_epoch: common_time::util::current_time_millis() as u64,
87 peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
88 meta_client,
89 report_interval: heartbeat_opts.interval,
90 retry_interval: heartbeat_opts.retry_interval,
91 resp_handler_executor,
92 running: Arc::new(AtomicBool::new(false)),
93 query_stat_size: None,
94 }
95 }
96
97 pub async fn start(&self) -> Result<(), Error> {
98 if self
99 .running
100 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
101 .is_err()
102 {
103 warn!("Heartbeat task started multiple times");
104 return Ok(());
105 }
106
107 self.create_streams().await
108 }
109
110 async fn create_streams(&self) -> Result<(), Error> {
111 info!("Start to establish the heartbeat connection to metasrv.");
112 let (req_sender, resp_stream) = self
113 .meta_client
114 .heartbeat()
115 .await
116 .map_err(BoxedError::new)
117 .context(ExternalSnafu)?;
118
119 info!("Flownode's heartbeat connection has been established with metasrv");
120
121 let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
122 let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
123
124 self.start_handle_resp_stream(resp_stream, mailbox);
125
126 self.start_heartbeat_report(req_sender, outgoing_rx);
127
128 Ok(())
129 }
130
131 pub fn shutdown(&self) {
132 info!("Close heartbeat task for flownode");
133 if self
134 .running
135 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
136 .is_err()
137 {
138 warn!("Call close heartbeat task multiple times");
139 }
140 }
141
142 fn new_heartbeat_request(
143 heartbeat_request: &HeartbeatRequest,
144 message: Option<OutgoingMessage>,
145 latest_report: &Option<FlowStat>,
146 ) -> Option<HeartbeatRequest> {
147 let mailbox_message = match message.map(outgoing_message_to_mailbox_message) {
148 Some(Ok(message)) => Some(message),
149 Some(Err(e)) => {
150 error!(e; "Failed to encode mailbox messages");
151 return None;
152 }
153 None => None,
154 };
155 let flow_stat = latest_report
156 .as_ref()
157 .map(|report| api::v1::meta::FlowStat {
158 flow_stat_size: report
159 .state_size
160 .iter()
161 .map(|(k, v)| (*k, *v as u64))
162 .collect(),
163 flow_last_exec_time_map: report
164 .last_exec_time_map
165 .iter()
166 .map(|(k, v)| (*k, *v))
167 .collect(),
168 });
169
170 Some(HeartbeatRequest {
171 mailbox_message,
172 flow_stat,
173 ..heartbeat_request.clone()
174 })
175 }
176
177 fn build_node_info(start_time_ms: u64) -> Option<NodeInfo> {
178 let build_info = common_version::build_info();
179 Some(NodeInfo {
180 version: build_info.version.to_string(),
181 git_commit: build_info.commit_short.to_string(),
182 start_time_ms,
183 cpus: num_cpus::get() as u32,
184 })
185 }
186
187 fn start_heartbeat_report(
188 &self,
189 req_sender: HeartbeatSender,
190 mut outgoing_rx: mpsc::Receiver<OutgoingMessage>,
191 ) {
192 let report_interval = self.report_interval;
193 let node_epoch = self.node_epoch;
194 let self_peer = Some(Peer {
195 id: self.node_id,
196 addr: self.peer_addr.clone(),
197 });
198
199 let query_stat_size = self.query_stat_size.clone();
200
201 common_runtime::spawn_hb(async move {
202 let mut interval = tokio::time::interval(report_interval);
205 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
206 let mut latest_report = None;
207
208 let heartbeat_request = HeartbeatRequest {
209 peer: self_peer,
210 node_epoch,
211 info: Self::build_node_info(node_epoch),
212 ..Default::default()
213 };
214
215 loop {
216 let req = tokio::select! {
217 message = outgoing_rx.recv() => {
218 if let Some(message) = message {
219 Self::new_heartbeat_request(&heartbeat_request, Some(message), &latest_report)
220 } else {
221 break
223 }
224 }
225 _ = interval.tick() => {
226 Self::new_heartbeat_request(&heartbeat_request, None, &latest_report)
227 }
228 };
229
230 if let Some(req) = req {
231 if let Err(e) = req_sender.send(req.clone()).await {
232 error!(e; "Failed to send heartbeat to metasrv");
233 break;
234 } else {
235 debug!("Send a heartbeat request to metasrv, content: {:?}", req);
236 }
237 }
238 latest_report = query_flow_state(&query_stat_size, report_interval / 2).await;
242 }
243
244 info!("flownode heartbeat task stopped.");
245 });
246 }
247
248 fn start_handle_resp_stream(&self, mut resp_stream: HeartbeatStream, mailbox: MailboxRef) {
249 let capture_self = self.clone();
250 let retry_interval = self.retry_interval;
251
252 let _handle = common_runtime::spawn_hb(async move {
253 loop {
254 match resp_stream.message().await {
255 Ok(Some(resp)) => {
256 debug!("Receiving heartbeat response: {:?}", resp);
257 let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), resp);
258 if let Err(e) = capture_self.handle_response(ctx).await {
259 error!(e; "Error while handling heartbeat response");
260 }
261 }
262 Ok(None) => break,
263 Err(e) => {
264 error!(e; "Occur error while reading heartbeat response");
265 capture_self.start_with_retry(retry_interval).await;
266
267 break;
268 }
269 }
270 }
271 });
272 }
273
274 async fn handle_response(&self, ctx: HeartbeatResponseHandlerContext) -> Result<(), Error> {
275 self.resp_handler_executor
276 .handle(ctx)
277 .await
278 .map_err(BoxedError::new)
279 .context(ExternalSnafu)
280 }
281
282 async fn start_with_retry(&self, retry_interval: Duration) {
283 loop {
284 tokio::time::sleep(retry_interval).await;
285
286 info!("Try to re-establish the heartbeat connection to metasrv.");
287
288 if self.create_streams().await.is_ok() {
289 break;
290 }
291 }
292 }
293}