1#[cfg(test)]
16mod tests;
17
18use std::sync::Arc;
19
20use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer};
21use common_meta::heartbeat::handler::{
22 HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
23};
24use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
25use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
26use common_stat::ResourceStatRef;
27use common_telemetry::{debug, error, info, warn};
28use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
29use servers::addrs;
30use snafu::ResultExt;
31use tokio::sync::mpsc;
32use tokio::sync::mpsc::Receiver;
33use tokio::time::{Duration, Instant};
34
35use crate::error;
36use crate::error::Result;
37use crate::frontend::FrontendOptions;
38use crate::metrics::{HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
39
40#[derive(Clone)]
42pub struct HeartbeatTask {
43 peer_addr: String,
44 meta_client: Arc<MetaClient>,
45 report_interval: Duration,
46 retry_interval: Duration,
47 resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
48 start_time_ms: u64,
49 resource_stat: ResourceStatRef,
50}
51
52impl HeartbeatTask {
53 pub fn new(
54 opts: &FrontendOptions,
55 meta_client: Arc<MetaClient>,
56 resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
57 resource_stat: ResourceStatRef,
58 ) -> Self {
59 HeartbeatTask {
60 peer_addr: if let Some(internal) = &opts.internal_grpc {
64 addrs::resolve_addr(&internal.bind_addr, Some(&internal.server_addr))
65 } else {
66 addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr))
67 },
68 meta_client,
69 report_interval: opts.heartbeat.interval,
70 retry_interval: opts.heartbeat.retry_interval,
71 resp_handler_executor,
72 start_time_ms: common_time::util::current_time_millis() as u64,
73 resource_stat,
74 }
75 }
76
77 pub async fn start(&self) -> Result<()> {
78 let (req_sender, resp_stream) = self
79 .meta_client
80 .heartbeat()
81 .await
82 .context(error::CreateMetaHeartbeatStreamSnafu)?;
83
84 info!("A heartbeat connection has been established with metasrv");
85
86 let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
87 let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
88
89 self.start_handle_resp_stream(resp_stream, mailbox);
90
91 self.start_heartbeat_report(req_sender, outgoing_rx);
92
93 Ok(())
94 }
95
96 fn start_handle_resp_stream(&self, mut resp_stream: HeartbeatStream, mailbox: MailboxRef) {
97 let capture_self = self.clone();
98 let retry_interval = self.retry_interval;
99
100 let _handle = common_runtime::spawn_hb(async move {
101 loop {
102 match resp_stream.message().await {
103 Ok(Some(resp)) => {
104 debug!("Receiving heartbeat response: {:?}", resp);
105 if let Some(message) = &resp.mailbox_message {
106 info!("Received mailbox message: {message:?}");
107 }
108 let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), resp);
109 if let Err(e) = capture_self.handle_response(ctx).await {
110 error!(e; "Error while handling heartbeat response");
111 HEARTBEAT_RECV_COUNT
112 .with_label_values(&["processing_error"])
113 .inc();
114 } else {
115 HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
116 }
117 }
118 Ok(None) => {
119 warn!("Heartbeat response stream closed");
120 capture_self.start_with_retry(retry_interval).await;
121 break;
122 }
123 Err(e) => {
124 HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
125 error!(e; "Occur error while reading heartbeat response");
126 capture_self.start_with_retry(retry_interval).await;
127
128 break;
129 }
130 }
131 }
132 });
133 }
134
135 fn new_heartbeat_request(
136 heartbeat_request: &HeartbeatRequest,
137 message: Option<OutgoingMessage>,
138 cpu_usage: i64,
139 memory_usage: i64,
140 ) -> Option<HeartbeatRequest> {
141 let mailbox_message = match message.map(outgoing_message_to_mailbox_message) {
142 Some(Ok(message)) => Some(message),
143 Some(Err(e)) => {
144 error!(e; "Failed to encode mailbox messages");
145 return None;
146 }
147 None => None,
148 };
149
150 let mut heartbeat_request = HeartbeatRequest {
151 mailbox_message,
152 ..heartbeat_request.clone()
153 };
154
155 if let Some(info) = heartbeat_request.info.as_mut() {
156 info.memory_usage_bytes = memory_usage;
157 info.cpu_usage_millicores = cpu_usage;
158 }
159
160 Some(heartbeat_request)
161 }
162
163 #[allow(deprecated)]
164 fn build_node_info(
165 start_time_ms: u64,
166 total_cpu_millicores: i64,
167 total_memory_bytes: i64,
168 ) -> Option<NodeInfo> {
169 let build_info = common_version::build_info();
170
171 Some(NodeInfo {
172 version: build_info.version.to_string(),
173 git_commit: build_info.commit_short.to_string(),
174 start_time_ms,
175 total_cpu_millicores,
176 total_memory_bytes,
177 cpu_usage_millicores: 0,
178 memory_usage_bytes: 0,
179 cpus: total_cpu_millicores as u32,
181 memory_bytes: total_memory_bytes as u64,
182 hostname: hostname::get()
183 .unwrap_or_default()
184 .to_string_lossy()
185 .to_string(),
186 })
187 }
188
189 fn start_heartbeat_report(
190 &self,
191 req_sender: HeartbeatSender,
192 mut outgoing_rx: Receiver<OutgoingMessage>,
193 ) {
194 let report_interval = self.report_interval;
195 let start_time_ms = self.start_time_ms;
196 let self_peer = Some(Peer {
197 id: 0,
200 addr: self.peer_addr.clone(),
201 });
202 let total_cpu_millicores = self.resource_stat.get_total_cpu_millicores();
203 let total_memory_bytes = self.resource_stat.get_total_memory_bytes();
204 let resource_stat = self.resource_stat.clone();
205 common_runtime::spawn_hb(async move {
206 let sleep = tokio::time::sleep(Duration::from_millis(0));
207 tokio::pin!(sleep);
208
209 let heartbeat_request = HeartbeatRequest {
210 peer: self_peer,
211 info: Self::build_node_info(
212 start_time_ms,
213 total_cpu_millicores,
214 total_memory_bytes,
215 ),
216 ..Default::default()
217 };
218
219 loop {
220 let req = tokio::select! {
221 message = outgoing_rx.recv() => {
222 if let Some(message) = message {
223 Self::new_heartbeat_request(&heartbeat_request, Some(message), 0, 0)
224 } else {
225 warn!("Sender has been dropped, exiting the heartbeat loop");
226 break
228 }
229 }
230 _ = &mut sleep => {
231 sleep.as_mut().reset(Instant::now() + report_interval);
232 Self::new_heartbeat_request(&heartbeat_request, None, resource_stat.get_cpu_usage_millicores(), resource_stat.get_memory_usage_bytes())
233 }
234 };
235
236 if let Some(req) = req {
237 if let Err(e) = req_sender.send(req.clone()).await {
238 error!(e; "Failed to send heartbeat to metasrv");
239 break;
240 } else {
241 HEARTBEAT_SENT_COUNT.inc();
242 debug!("Send a heartbeat request to metasrv, content: {:?}", req);
243 }
244 }
245 }
246 });
247 }
248
249 async fn handle_response(&self, ctx: HeartbeatResponseHandlerContext) -> Result<()> {
250 self.resp_handler_executor
251 .handle(ctx)
252 .await
253 .context(error::HandleHeartbeatResponseSnafu)
254 }
255
256 async fn start_with_retry(&self, retry_interval: Duration) {
257 loop {
258 tokio::time::sleep(retry_interval).await;
259
260 info!("Try to re-establish the heartbeat connection to metasrv.");
261
262 if self.start().await.is_ok() {
263 break;
264 }
265 }
266 }
267}