1#[cfg(test)]
16mod tests;
17
18use std::sync::Arc;
19
20use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer};
21use common_config::utils::ResourceSpec;
22use common_meta::heartbeat::handler::{
23 HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
24};
25use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
26use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
27use common_telemetry::{debug, error, info, warn};
28use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
29use servers::addrs;
30use servers::heartbeat_options::HeartbeatOptions;
31use snafu::ResultExt;
32use tokio::sync::mpsc;
33use tokio::sync::mpsc::Receiver;
34use tokio::time::{Duration, Instant};
35
36use crate::error;
37use crate::error::Result;
38use crate::frontend::FrontendOptions;
39use crate::metrics::{HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
40
41#[derive(Clone)]
43pub struct HeartbeatTask {
44 peer_addr: String,
45 meta_client: Arc<MetaClient>,
46 report_interval: Duration,
47 retry_interval: Duration,
48 resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
49 start_time_ms: u64,
50 resource_spec: ResourceSpec,
51}
52
53impl HeartbeatTask {
54 pub fn new(
55 opts: &FrontendOptions,
56 meta_client: Arc<MetaClient>,
57 heartbeat_opts: HeartbeatOptions,
58 resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
59 ) -> Self {
60 HeartbeatTask {
61 peer_addr: if let Some(internal) = &opts.internal_grpc {
65 addrs::resolve_addr(&internal.bind_addr, Some(&internal.server_addr))
66 } else {
67 addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr))
68 },
69 meta_client,
70 report_interval: heartbeat_opts.interval,
71 retry_interval: heartbeat_opts.retry_interval,
72 resp_handler_executor,
73 start_time_ms: common_time::util::current_time_millis() as u64,
74 resource_spec: Default::default(),
75 }
76 }
77
78 pub async fn start(&self) -> Result<()> {
79 let (req_sender, resp_stream) = self
80 .meta_client
81 .heartbeat()
82 .await
83 .context(error::CreateMetaHeartbeatStreamSnafu)?;
84
85 info!("A heartbeat connection has been established with metasrv");
86
87 let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
88 let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
89
90 self.start_handle_resp_stream(resp_stream, mailbox);
91
92 self.start_heartbeat_report(req_sender, outgoing_rx);
93
94 Ok(())
95 }
96
97 fn start_handle_resp_stream(&self, mut resp_stream: HeartbeatStream, mailbox: MailboxRef) {
98 let capture_self = self.clone();
99 let retry_interval = self.retry_interval;
100
101 let _handle = common_runtime::spawn_hb(async move {
102 loop {
103 match resp_stream.message().await {
104 Ok(Some(resp)) => {
105 debug!("Receiving heartbeat response: {:?}", resp);
106 let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), resp);
107 if let Err(e) = capture_self.handle_response(ctx).await {
108 error!(e; "Error while handling heartbeat response");
109 HEARTBEAT_RECV_COUNT
110 .with_label_values(&["processing_error"])
111 .inc();
112 } else {
113 HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
114 }
115 }
116 Ok(None) => {
117 warn!("Heartbeat response stream closed");
118 capture_self.start_with_retry(retry_interval).await;
119 break;
120 }
121 Err(e) => {
122 HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
123 error!(e; "Occur error while reading heartbeat response");
124 capture_self.start_with_retry(retry_interval).await;
125
126 break;
127 }
128 }
129 }
130 });
131 }
132
133 fn new_heartbeat_request(
134 heartbeat_request: &HeartbeatRequest,
135 message: Option<OutgoingMessage>,
136 ) -> Option<HeartbeatRequest> {
137 let mailbox_message = match message.map(outgoing_message_to_mailbox_message) {
138 Some(Ok(message)) => Some(message),
139 Some(Err(e)) => {
140 error!(e; "Failed to encode mailbox messages");
141 return None;
142 }
143 None => None,
144 };
145
146 Some(HeartbeatRequest {
147 mailbox_message,
148 ..heartbeat_request.clone()
149 })
150 }
151
152 fn build_node_info(start_time_ms: u64, cpus: u32, memory_bytes: u64) -> Option<NodeInfo> {
153 let build_info = common_version::build_info();
154
155 Some(NodeInfo {
156 version: build_info.version.to_string(),
157 git_commit: build_info.commit_short.to_string(),
158 start_time_ms,
159 cpus,
160 memory_bytes,
161 hostname: hostname::get()
162 .unwrap_or_default()
163 .to_string_lossy()
164 .to_string(),
165 })
166 }
167
168 fn start_heartbeat_report(
169 &self,
170 req_sender: HeartbeatSender,
171 mut outgoing_rx: Receiver<OutgoingMessage>,
172 ) {
173 let report_interval = self.report_interval;
174 let start_time_ms = self.start_time_ms;
175 let self_peer = Some(Peer {
176 id: 0,
178 addr: self.peer_addr.clone(),
179 });
180 let cpus = self.resource_spec.cpus as u32;
181 let memory_bytes = self.resource_spec.memory.unwrap_or_default().as_bytes();
182
183 common_runtime::spawn_hb(async move {
184 let sleep = tokio::time::sleep(Duration::from_millis(0));
185 tokio::pin!(sleep);
186
187 let heartbeat_request = HeartbeatRequest {
188 peer: self_peer,
189 info: Self::build_node_info(start_time_ms, cpus, memory_bytes),
190 ..Default::default()
191 };
192
193 loop {
194 let req = tokio::select! {
195 message = outgoing_rx.recv() => {
196 if let Some(message) = message {
197 Self::new_heartbeat_request(&heartbeat_request, Some(message))
198 } else {
199 warn!("Sender has been dropped, exiting the heartbeat loop");
200 break
202 }
203 }
204 _ = &mut sleep => {
205 sleep.as_mut().reset(Instant::now() + report_interval);
206 Self::new_heartbeat_request(&heartbeat_request, None)
207 }
208 };
209
210 if let Some(req) = req {
211 if let Err(e) = req_sender.send(req.clone()).await {
212 error!(e; "Failed to send heartbeat to metasrv");
213 break;
214 } else {
215 HEARTBEAT_SENT_COUNT.inc();
216 debug!("Send a heartbeat request to metasrv, content: {:?}", req);
217 }
218 }
219 }
220 });
221 }
222
223 async fn handle_response(&self, ctx: HeartbeatResponseHandlerContext) -> Result<()> {
224 self.resp_handler_executor
225 .handle(ctx)
226 .await
227 .context(error::HandleHeartbeatResponseSnafu)
228 }
229
230 async fn start_with_retry(&self, retry_interval: Duration) {
231 loop {
232 tokio::time::sleep(retry_interval).await;
233
234 info!("Try to re-establish the heartbeat connection to metasrv.");
235
236 if self.start().await.is_ok() {
237 break;
238 }
239 }
240 }
241}