1#[cfg(test)]
16mod tests;
17
18use std::sync::Arc;
19
20use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer};
21use common_meta::heartbeat::handler::{
22 HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
23};
24use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
25use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
26use common_telemetry::{debug, error, info, warn};
27use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
28use servers::addrs;
29use servers::heartbeat_options::HeartbeatOptions;
30use snafu::ResultExt;
31use tokio::sync::mpsc;
32use tokio::sync::mpsc::Receiver;
33use tokio::time::{Duration, Instant};
34
35use crate::error;
36use crate::error::Result;
37use crate::frontend::FrontendOptions;
38use crate::metrics::{HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
39
40#[derive(Clone)]
42pub struct HeartbeatTask {
43 peer_addr: String,
44 meta_client: Arc<MetaClient>,
45 report_interval: Duration,
46 retry_interval: Duration,
47 resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
48 start_time_ms: u64,
49}
50
51impl HeartbeatTask {
52 pub fn new(
53 opts: &FrontendOptions,
54 meta_client: Arc<MetaClient>,
55 heartbeat_opts: HeartbeatOptions,
56 resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
57 ) -> Self {
58 HeartbeatTask {
59 peer_addr: if let Some(internal) = &opts.internal_grpc {
63 addrs::resolve_addr(&internal.bind_addr, Some(&internal.server_addr))
64 } else {
65 addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr))
66 },
67 meta_client,
68 report_interval: heartbeat_opts.interval,
69 retry_interval: heartbeat_opts.retry_interval,
70 resp_handler_executor,
71 start_time_ms: common_time::util::current_time_millis() as u64,
72 }
73 }
74
75 pub async fn start(&self) -> Result<()> {
76 let (req_sender, resp_stream) = self
77 .meta_client
78 .heartbeat()
79 .await
80 .context(error::CreateMetaHeartbeatStreamSnafu)?;
81
82 info!("A heartbeat connection has been established with metasrv");
83
84 let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
85 let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
86
87 self.start_handle_resp_stream(resp_stream, mailbox);
88
89 self.start_heartbeat_report(req_sender, outgoing_rx);
90
91 Ok(())
92 }
93
94 fn start_handle_resp_stream(&self, mut resp_stream: HeartbeatStream, mailbox: MailboxRef) {
95 let capture_self = self.clone();
96 let retry_interval = self.retry_interval;
97
98 let _handle = common_runtime::spawn_hb(async move {
99 loop {
100 match resp_stream.message().await {
101 Ok(Some(resp)) => {
102 debug!("Receiving heartbeat response: {:?}", resp);
103 let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), resp);
104 if let Err(e) = capture_self.handle_response(ctx).await {
105 error!(e; "Error while handling heartbeat response");
106 HEARTBEAT_RECV_COUNT
107 .with_label_values(&["processing_error"])
108 .inc();
109 } else {
110 HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
111 }
112 }
113 Ok(None) => {
114 warn!("Heartbeat response stream closed");
115 capture_self.start_with_retry(retry_interval).await;
116 break;
117 }
118 Err(e) => {
119 HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
120 error!(e; "Occur error while reading heartbeat response");
121 capture_self.start_with_retry(retry_interval).await;
122
123 break;
124 }
125 }
126 }
127 });
128 }
129
130 fn new_heartbeat_request(
131 heartbeat_request: &HeartbeatRequest,
132 message: Option<OutgoingMessage>,
133 ) -> Option<HeartbeatRequest> {
134 let mailbox_message = match message.map(outgoing_message_to_mailbox_message) {
135 Some(Ok(message)) => Some(message),
136 Some(Err(e)) => {
137 error!(e; "Failed to encode mailbox messages");
138 return None;
139 }
140 None => None,
141 };
142
143 Some(HeartbeatRequest {
144 mailbox_message,
145 ..heartbeat_request.clone()
146 })
147 }
148
149 fn build_node_info(start_time_ms: u64) -> Option<NodeInfo> {
150 let build_info = common_version::build_info();
151
152 Some(NodeInfo {
153 version: build_info.version.to_string(),
154 git_commit: build_info.commit_short.to_string(),
155 start_time_ms,
156 cpus: num_cpus::get() as u32,
157 })
158 }
159
160 fn start_heartbeat_report(
161 &self,
162 req_sender: HeartbeatSender,
163 mut outgoing_rx: Receiver<OutgoingMessage>,
164 ) {
165 let report_interval = self.report_interval;
166 let start_time_ms = self.start_time_ms;
167 let self_peer = Some(Peer {
168 id: 0,
170 addr: self.peer_addr.clone(),
171 });
172
173 common_runtime::spawn_hb(async move {
174 let sleep = tokio::time::sleep(Duration::from_millis(0));
175 tokio::pin!(sleep);
176
177 let heartbeat_request = HeartbeatRequest {
178 peer: self_peer,
179 info: Self::build_node_info(start_time_ms),
180 ..Default::default()
181 };
182
183 loop {
184 let req = tokio::select! {
185 message = outgoing_rx.recv() => {
186 if let Some(message) = message {
187 Self::new_heartbeat_request(&heartbeat_request, Some(message))
188 } else {
189 warn!("Sender has been dropped, exiting the heartbeat loop");
190 break
192 }
193 }
194 _ = &mut sleep => {
195 sleep.as_mut().reset(Instant::now() + report_interval);
196 Self::new_heartbeat_request(&heartbeat_request, None)
197 }
198 };
199
200 if let Some(req) = req {
201 if let Err(e) = req_sender.send(req.clone()).await {
202 error!(e; "Failed to send heartbeat to metasrv");
203 break;
204 } else {
205 HEARTBEAT_SENT_COUNT.inc();
206 debug!("Send a heartbeat request to metasrv, content: {:?}", req);
207 }
208 }
209 }
210 });
211 }
212
213 async fn handle_response(&self, ctx: HeartbeatResponseHandlerContext) -> Result<()> {
214 self.resp_handler_executor
215 .handle(ctx)
216 .await
217 .context(error::HandleHeartbeatResponseSnafu)
218 }
219
220 async fn start_with_retry(&self, retry_interval: Duration) {
221 loop {
222 tokio::time::sleep(retry_interval).await;
223
224 info!("Try to re-establish the heartbeat connection to metasrv.");
225
226 if self.start().await.is_ok() {
227 break;
228 }
229 }
230 }
231}