1#[cfg(test)]
16mod tests;
17
18use std::sync::Arc;
19
20use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer};
21use common_config::utils::ResourceSpec;
22use common_meta::heartbeat::handler::{
23 HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
24};
25use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
26use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
27use common_telemetry::{debug, error, info, warn};
28use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
29use servers::addrs;
30use servers::heartbeat_options::HeartbeatOptions;
31use snafu::ResultExt;
32use tokio::sync::mpsc;
33use tokio::sync::mpsc::Receiver;
34use tokio::time::{Duration, Instant};
35
36use crate::error;
37use crate::error::Result;
38use crate::frontend::FrontendOptions;
39use crate::metrics::{HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
40
41#[derive(Clone)]
43pub struct HeartbeatTask {
44 peer_addr: String,
45 meta_client: Arc<MetaClient>,
46 report_interval: Duration,
47 retry_interval: Duration,
48 resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
49 start_time_ms: u64,
50 resource_spec: ResourceSpec,
51}
52
53impl HeartbeatTask {
54 pub fn new(
55 opts: &FrontendOptions,
56 meta_client: Arc<MetaClient>,
57 heartbeat_opts: HeartbeatOptions,
58 resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
59 ) -> Self {
60 HeartbeatTask {
61 peer_addr: if let Some(internal) = &opts.internal_grpc {
65 addrs::resolve_addr(&internal.bind_addr, Some(&internal.server_addr))
66 } else {
67 addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr))
68 },
69 meta_client,
70 report_interval: heartbeat_opts.interval,
71 retry_interval: heartbeat_opts.retry_interval,
72 resp_handler_executor,
73 start_time_ms: common_time::util::current_time_millis() as u64,
74 resource_spec: Default::default(),
75 }
76 }
77
78 pub async fn start(&self) -> Result<()> {
79 let (req_sender, resp_stream) = self
80 .meta_client
81 .heartbeat()
82 .await
83 .context(error::CreateMetaHeartbeatStreamSnafu)?;
84
85 info!("A heartbeat connection has been established with metasrv");
86
87 let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
88 let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
89
90 self.start_handle_resp_stream(resp_stream, mailbox);
91
92 self.start_heartbeat_report(req_sender, outgoing_rx);
93
94 Ok(())
95 }
96
97 fn start_handle_resp_stream(&self, mut resp_stream: HeartbeatStream, mailbox: MailboxRef) {
98 let capture_self = self.clone();
99 let retry_interval = self.retry_interval;
100
101 let _handle = common_runtime::spawn_hb(async move {
102 loop {
103 match resp_stream.message().await {
104 Ok(Some(resp)) => {
105 debug!("Receiving heartbeat response: {:?}", resp);
106 let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), resp);
107 if let Err(e) = capture_self.handle_response(ctx).await {
108 error!(e; "Error while handling heartbeat response");
109 HEARTBEAT_RECV_COUNT
110 .with_label_values(&["processing_error"])
111 .inc();
112 } else {
113 HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
114 }
115 }
116 Ok(None) => {
117 warn!("Heartbeat response stream closed");
118 capture_self.start_with_retry(retry_interval).await;
119 break;
120 }
121 Err(e) => {
122 HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
123 error!(e; "Occur error while reading heartbeat response");
124 capture_self.start_with_retry(retry_interval).await;
125
126 break;
127 }
128 }
129 }
130 });
131 }
132
133 fn new_heartbeat_request(
134 heartbeat_request: &HeartbeatRequest,
135 message: Option<OutgoingMessage>,
136 ) -> Option<HeartbeatRequest> {
137 let mailbox_message = match message.map(outgoing_message_to_mailbox_message) {
138 Some(Ok(message)) => Some(message),
139 Some(Err(e)) => {
140 error!(e; "Failed to encode mailbox messages");
141 return None;
142 }
143 None => None,
144 };
145
146 Some(HeartbeatRequest {
147 mailbox_message,
148 ..heartbeat_request.clone()
149 })
150 }
151
152 fn build_node_info(start_time_ms: u64, cpus: u32, memory_bytes: u64) -> Option<NodeInfo> {
153 let build_info = common_version::build_info();
154
155 Some(NodeInfo {
156 version: build_info.version.to_string(),
157 git_commit: build_info.commit_short.to_string(),
158 start_time_ms,
159 cpus,
160 memory_bytes,
161 })
162 }
163
164 fn start_heartbeat_report(
165 &self,
166 req_sender: HeartbeatSender,
167 mut outgoing_rx: Receiver<OutgoingMessage>,
168 ) {
169 let report_interval = self.report_interval;
170 let start_time_ms = self.start_time_ms;
171 let self_peer = Some(Peer {
172 id: 0,
174 addr: self.peer_addr.clone(),
175 });
176 let cpus = self.resource_spec.cpus as u32;
177 let memory_bytes = self.resource_spec.memory.unwrap_or_default().as_bytes();
178
179 common_runtime::spawn_hb(async move {
180 let sleep = tokio::time::sleep(Duration::from_millis(0));
181 tokio::pin!(sleep);
182
183 let heartbeat_request = HeartbeatRequest {
184 peer: self_peer,
185 info: Self::build_node_info(start_time_ms, cpus, memory_bytes),
186 ..Default::default()
187 };
188
189 loop {
190 let req = tokio::select! {
191 message = outgoing_rx.recv() => {
192 if let Some(message) = message {
193 Self::new_heartbeat_request(&heartbeat_request, Some(message))
194 } else {
195 warn!("Sender has been dropped, exiting the heartbeat loop");
196 break
198 }
199 }
200 _ = &mut sleep => {
201 sleep.as_mut().reset(Instant::now() + report_interval);
202 Self::new_heartbeat_request(&heartbeat_request, None)
203 }
204 };
205
206 if let Some(req) = req {
207 if let Err(e) = req_sender.send(req.clone()).await {
208 error!(e; "Failed to send heartbeat to metasrv");
209 break;
210 } else {
211 HEARTBEAT_SENT_COUNT.inc();
212 debug!("Send a heartbeat request to metasrv, content: {:?}", req);
213 }
214 }
215 }
216 });
217 }
218
219 async fn handle_response(&self, ctx: HeartbeatResponseHandlerContext) -> Result<()> {
220 self.resp_handler_executor
221 .handle(ctx)
222 .await
223 .context(error::HandleHeartbeatResponseSnafu)
224 }
225
226 async fn start_with_retry(&self, retry_interval: Duration) {
227 loop {
228 tokio::time::sleep(retry_interval).await;
229
230 info!("Try to re-establish the heartbeat connection to metasrv.");
231
232 if self.start().await.is_ok() {
233 break;
234 }
235 }
236 }
237}