1#[cfg(test)]
16mod tests;
17
18use std::sync::Arc;
19
20use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer};
21use common_meta::datanode::EnvVars;
22use common_meta::heartbeat::handler::{
23 HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
24};
25use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
26use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
27use common_stat::ResourceStatRef;
28use common_telemetry::{debug, error, info, warn};
29use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
30use servers::addrs;
31use snafu::ResultExt;
32use tokio::sync::mpsc;
33use tokio::sync::mpsc::Receiver;
34use tokio::time::{Duration, Instant};
35
36use crate::error;
37use crate::error::Result;
38use crate::frontend::FrontendOptions;
39use crate::metrics::{HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
40
41#[derive(Clone)]
43pub struct HeartbeatTask {
44 peer_addr: String,
45 meta_client: Arc<MetaClient>,
46 resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
47 start_time_ms: u64,
48 resource_stat: ResourceStatRef,
49 env_vars: EnvVars,
50}
51
52impl HeartbeatTask {
53 pub fn new(
54 peer_addr: String,
55 opts: &FrontendOptions,
56 meta_client: Arc<MetaClient>,
57 resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
58 resource_stat: ResourceStatRef,
59 ) -> Self {
60 HeartbeatTask {
61 peer_addr,
62 meta_client,
63 resp_handler_executor,
64 start_time_ms: common_time::util::current_time_millis() as u64,
65 resource_stat,
66 env_vars: EnvVars::from_config(&opts.heartbeat_env_vars),
67 }
68 }
69
70 pub async fn start(&self) -> Result<()> {
71 let (req_sender, resp_stream, config) = self
72 .meta_client
73 .heartbeat()
74 .await
75 .context(error::CreateMetaHeartbeatStreamSnafu)?;
76
77 info!("Heartbeat started with Metasrv config: {}", config);
78
79 let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
80 let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
81
82 self.start_handle_resp_stream(resp_stream, mailbox, config.retry_interval);
83
84 self.start_heartbeat_report(req_sender, outgoing_rx, config.interval);
85
86 Ok(())
87 }
88
89 fn start_handle_resp_stream(
90 &self,
91 mut resp_stream: HeartbeatStream,
92 mailbox: MailboxRef,
93 retry_interval: Duration,
94 ) {
95 let capture_self = self.clone();
96
97 let _handle = common_runtime::spawn_hb(async move {
98 loop {
99 match resp_stream.message().await {
100 Ok(Some(resp)) => {
101 debug!("Receiving heartbeat response: {:?}", resp);
102 if let Some(message) = &resp.mailbox_message {
103 info!("Received mailbox message: {message:?}");
104 }
105 let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), resp);
106 if let Err(e) = capture_self.handle_response(ctx).await {
107 error!(e; "Error while handling heartbeat response");
108 HEARTBEAT_RECV_COUNT
109 .with_label_values(&["processing_error"])
110 .inc();
111 } else {
112 HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
113 }
114 }
115 Ok(None) => {
116 warn!("Heartbeat response stream closed");
117 capture_self.start_with_retry(retry_interval).await;
118 break;
119 }
120 Err(e) => {
121 HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
122 error!(e; "Occur error while reading heartbeat response");
123 capture_self.start_with_retry(retry_interval).await;
124
125 break;
126 }
127 }
128 }
129 });
130 }
131
132 fn new_heartbeat_request(
133 heartbeat_request: &HeartbeatRequest,
134 message: Option<OutgoingMessage>,
135 cpu_usage: i64,
136 memory_usage: i64,
137 ) -> Option<HeartbeatRequest> {
138 let mailbox_message = match message.map(outgoing_message_to_mailbox_message) {
139 Some(Ok(message)) => Some(message),
140 Some(Err(e)) => {
141 error!(e; "Failed to encode mailbox messages");
142 return None;
143 }
144 None => None,
145 };
146
147 let mut heartbeat_request = HeartbeatRequest {
148 mailbox_message,
149 ..heartbeat_request.clone()
150 };
151
152 if let Some(info) = heartbeat_request.info.as_mut() {
153 info.memory_usage_bytes = memory_usage;
154 info.cpu_usage_millicores = cpu_usage;
155 }
156
157 Some(heartbeat_request)
158 }
159
160 #[allow(deprecated)]
161 fn build_node_info(
162 start_time_ms: u64,
163 total_cpu_millicores: i64,
164 total_memory_bytes: i64,
165 ) -> Option<NodeInfo> {
166 let build_info = common_version::build_info();
167
168 Some(NodeInfo {
169 version: build_info.version.to_string(),
170 git_commit: build_info.commit_short.to_string(),
171 start_time_ms,
172 total_cpu_millicores,
173 total_memory_bytes,
174 cpu_usage_millicores: 0,
175 memory_usage_bytes: 0,
176 cpus: total_cpu_millicores as u32,
178 memory_bytes: total_memory_bytes as u64,
179 hostname: hostname::get()
180 .unwrap_or_default()
181 .to_string_lossy()
182 .to_string(),
183 })
184 }
185
186 fn start_heartbeat_report(
187 &self,
188 req_sender: HeartbeatSender,
189 mut outgoing_rx: Receiver<OutgoingMessage>,
190 report_interval: Duration,
191 ) {
192 let start_time_ms = self.start_time_ms;
193 let self_peer = Some(Peer {
194 id: 0,
197 addr: self.peer_addr.clone(),
198 });
199 let total_cpu_millicores = self.resource_stat.get_total_cpu_millicores();
200 let total_memory_bytes = self.resource_stat.get_total_memory_bytes();
201 let resource_stat = self.resource_stat.clone();
202 let env_vars = self.env_vars.clone();
203 common_runtime::spawn_hb(async move {
204 let sleep = tokio::time::sleep(Duration::from_millis(0));
205 tokio::pin!(sleep);
206
207 let mut extensions = std::collections::HashMap::new();
208 env_vars.into_extensions(&mut extensions);
209
210 let heartbeat_request = HeartbeatRequest {
211 peer: self_peer,
212 info: Self::build_node_info(
213 start_time_ms,
214 total_cpu_millicores,
215 total_memory_bytes,
216 ),
217 extensions,
218 ..Default::default()
219 };
220
221 loop {
222 let req = tokio::select! {
223 message = outgoing_rx.recv() => {
224 if let Some(message) = message {
225 Self::new_heartbeat_request(&heartbeat_request, Some(message), 0, 0)
226 } else {
227 warn!("Sender has been dropped, exiting the heartbeat loop");
228 break
230 }
231 }
232 _ = &mut sleep => {
233 sleep.as_mut().reset(Instant::now() + report_interval);
234 Self::new_heartbeat_request(&heartbeat_request, None, resource_stat.get_cpu_usage_millicores(), resource_stat.get_memory_usage_bytes())
235 }
236 };
237
238 if let Some(req) = req {
239 if let Err(e) = req_sender.send(req.clone()).await {
240 error!(e; "Failed to send heartbeat to metasrv");
241 break;
242 } else {
243 HEARTBEAT_SENT_COUNT.inc();
244 debug!("Send a heartbeat request to metasrv, content: {:?}", req);
245 }
246 }
247 }
248 });
249 }
250
251 async fn handle_response(&self, ctx: HeartbeatResponseHandlerContext) -> Result<()> {
252 self.resp_handler_executor
253 .handle(ctx)
254 .await
255 .context(error::HandleHeartbeatResponseSnafu)
256 }
257
258 async fn start_with_retry(&self, retry_interval: Duration) {
259 loop {
260 tokio::time::sleep(retry_interval).await;
261
262 info!("Try to re-establish the heartbeat connection to metasrv.");
263
264 if self.start().await.is_ok() {
265 break;
266 }
267 }
268 }
269}
270
271pub(crate) fn frontend_peer_addr(opts: &FrontendOptions) -> String {
272 if let Some(internal) = &opts.internal_grpc {
276 addrs::resolve_addr(&internal.bind_addr, Some(&internal.server_addr))
277 } else {
278 addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr))
279 }
280}