1#[cfg(test)]
16mod tests;
17
18use std::sync::Arc;
19
20use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer};
21use common_meta::heartbeat::handler::{
22 HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
23};
24use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
25use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
26use common_stat::ResourceStatRef;
27use common_telemetry::{debug, error, info, warn};
28use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
29use servers::addrs;
30use servers::heartbeat_options::HeartbeatOptions;
31use snafu::ResultExt;
32use tokio::sync::mpsc;
33use tokio::sync::mpsc::Receiver;
34use tokio::time::{Duration, Instant};
35
36use crate::error;
37use crate::error::Result;
38use crate::frontend::FrontendOptions;
39use crate::metrics::{HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
40
41#[derive(Clone)]
43pub struct HeartbeatTask {
44 peer_addr: String,
45 meta_client: Arc<MetaClient>,
46 report_interval: Duration,
47 retry_interval: Duration,
48 resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
49 start_time_ms: u64,
50 resource_stat: ResourceStatRef,
51}
52
53impl HeartbeatTask {
54 pub fn new(
55 opts: &FrontendOptions,
56 meta_client: Arc<MetaClient>,
57 heartbeat_opts: HeartbeatOptions,
58 resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
59 resource_stat: ResourceStatRef,
60 ) -> Self {
61 HeartbeatTask {
62 peer_addr: if let Some(internal) = &opts.internal_grpc {
66 addrs::resolve_addr(&internal.bind_addr, Some(&internal.server_addr))
67 } else {
68 addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr))
69 },
70 meta_client,
71 report_interval: heartbeat_opts.interval,
72 retry_interval: heartbeat_opts.retry_interval,
73 resp_handler_executor,
74 start_time_ms: common_time::util::current_time_millis() as u64,
75 resource_stat,
76 }
77 }
78
79 pub async fn start(&self) -> Result<()> {
80 let (req_sender, resp_stream) = self
81 .meta_client
82 .heartbeat()
83 .await
84 .context(error::CreateMetaHeartbeatStreamSnafu)?;
85
86 info!("A heartbeat connection has been established with metasrv");
87
88 let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
89 let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
90
91 self.start_handle_resp_stream(resp_stream, mailbox);
92
93 self.start_heartbeat_report(req_sender, outgoing_rx);
94
95 Ok(())
96 }
97
98 fn start_handle_resp_stream(&self, mut resp_stream: HeartbeatStream, mailbox: MailboxRef) {
99 let capture_self = self.clone();
100 let retry_interval = self.retry_interval;
101
102 let _handle = common_runtime::spawn_hb(async move {
103 loop {
104 match resp_stream.message().await {
105 Ok(Some(resp)) => {
106 debug!("Receiving heartbeat response: {:?}", resp);
107 if let Some(message) = &resp.mailbox_message {
108 info!("Received mailbox message: {message:?}");
109 }
110 let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), resp);
111 if let Err(e) = capture_self.handle_response(ctx).await {
112 error!(e; "Error while handling heartbeat response");
113 HEARTBEAT_RECV_COUNT
114 .with_label_values(&["processing_error"])
115 .inc();
116 } else {
117 HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
118 }
119 }
120 Ok(None) => {
121 warn!("Heartbeat response stream closed");
122 capture_self.start_with_retry(retry_interval).await;
123 break;
124 }
125 Err(e) => {
126 HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
127 error!(e; "Occur error while reading heartbeat response");
128 capture_self.start_with_retry(retry_interval).await;
129
130 break;
131 }
132 }
133 }
134 });
135 }
136
137 fn new_heartbeat_request(
138 heartbeat_request: &HeartbeatRequest,
139 message: Option<OutgoingMessage>,
140 cpu_usage: i64,
141 memory_usage: i64,
142 ) -> Option<HeartbeatRequest> {
143 let mailbox_message = match message.map(outgoing_message_to_mailbox_message) {
144 Some(Ok(message)) => Some(message),
145 Some(Err(e)) => {
146 error!(e; "Failed to encode mailbox messages");
147 return None;
148 }
149 None => None,
150 };
151
152 let mut heartbeat_request = HeartbeatRequest {
153 mailbox_message,
154 ..heartbeat_request.clone()
155 };
156
157 if let Some(info) = heartbeat_request.info.as_mut() {
158 info.memory_usage_bytes = memory_usage;
159 info.cpu_usage_millicores = cpu_usage;
160 }
161
162 Some(heartbeat_request)
163 }
164
165 #[allow(deprecated)]
166 fn build_node_info(
167 start_time_ms: u64,
168 total_cpu_millicores: i64,
169 total_memory_bytes: i64,
170 ) -> Option<NodeInfo> {
171 let build_info = common_version::build_info();
172
173 Some(NodeInfo {
174 version: build_info.version.to_string(),
175 git_commit: build_info.commit_short.to_string(),
176 start_time_ms,
177 total_cpu_millicores,
178 total_memory_bytes,
179 cpu_usage_millicores: 0,
180 memory_usage_bytes: 0,
181 cpus: total_cpu_millicores as u32,
183 memory_bytes: total_memory_bytes as u64,
184 hostname: hostname::get()
185 .unwrap_or_default()
186 .to_string_lossy()
187 .to_string(),
188 })
189 }
190
191 fn start_heartbeat_report(
192 &self,
193 req_sender: HeartbeatSender,
194 mut outgoing_rx: Receiver<OutgoingMessage>,
195 ) {
196 let report_interval = self.report_interval;
197 let start_time_ms = self.start_time_ms;
198 let self_peer = Some(Peer {
199 id: 0,
201 addr: self.peer_addr.clone(),
202 });
203 let total_cpu_millicores = self.resource_stat.get_total_cpu_millicores();
204 let total_memory_bytes = self.resource_stat.get_total_memory_bytes();
205 let resource_stat = self.resource_stat.clone();
206 common_runtime::spawn_hb(async move {
207 let sleep = tokio::time::sleep(Duration::from_millis(0));
208 tokio::pin!(sleep);
209
210 let heartbeat_request = HeartbeatRequest {
211 peer: self_peer,
212 info: Self::build_node_info(
213 start_time_ms,
214 total_cpu_millicores,
215 total_memory_bytes,
216 ),
217 ..Default::default()
218 };
219
220 loop {
221 let req = tokio::select! {
222 message = outgoing_rx.recv() => {
223 if let Some(message) = message {
224 Self::new_heartbeat_request(&heartbeat_request, Some(message), 0, 0)
225 } else {
226 warn!("Sender has been dropped, exiting the heartbeat loop");
227 break
229 }
230 }
231 _ = &mut sleep => {
232 sleep.as_mut().reset(Instant::now() + report_interval);
233 Self::new_heartbeat_request(&heartbeat_request, None, resource_stat.get_cpu_usage_millicores(), resource_stat.get_memory_usage_bytes())
234 }
235 };
236
237 if let Some(req) = req {
238 if let Err(e) = req_sender.send(req.clone()).await {
239 error!(e; "Failed to send heartbeat to metasrv");
240 break;
241 } else {
242 HEARTBEAT_SENT_COUNT.inc();
243 debug!("Send a heartbeat request to metasrv, content: {:?}", req);
244 }
245 }
246 }
247 });
248 }
249
250 async fn handle_response(&self, ctx: HeartbeatResponseHandlerContext) -> Result<()> {
251 self.resp_handler_executor
252 .handle(ctx)
253 .await
254 .context(error::HandleHeartbeatResponseSnafu)
255 }
256
257 async fn start_with_retry(&self, retry_interval: Duration) {
258 loop {
259 tokio::time::sleep(retry_interval).await;
260
261 info!("Try to re-establish the heartbeat connection to metasrv.");
262
263 if self.start().await.is_ok() {
264 break;
265 }
266 }
267 }
268}