1#[cfg(test)]
16mod tests;
17
18use std::sync::Arc;
19
20use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer};
21use common_meta::heartbeat::handler::{
22 HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
23};
24use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef, OutgoingMessage};
25use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
26use common_telemetry::{debug, error, info};
27use meta_client::client::{HeartbeatSender, HeartbeatStream, MetaClient};
28use servers::addrs;
29use servers::heartbeat_options::HeartbeatOptions;
30use snafu::ResultExt;
31use tokio::sync::mpsc;
32use tokio::sync::mpsc::Receiver;
33use tokio::time::{Duration, Instant};
34
35use crate::error;
36use crate::error::Result;
37use crate::frontend::FrontendOptions;
38use crate::metrics::{HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
39
40#[derive(Clone)]
42pub struct HeartbeatTask {
43 peer_addr: String,
44 meta_client: Arc<MetaClient>,
45 report_interval: u64,
46 retry_interval: u64,
47 resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
48 start_time_ms: u64,
49}
50
51impl HeartbeatTask {
52 pub fn new(
53 opts: &FrontendOptions,
54 meta_client: Arc<MetaClient>,
55 heartbeat_opts: HeartbeatOptions,
56 resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
57 ) -> Self {
58 HeartbeatTask {
59 peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
60 meta_client,
61 report_interval: heartbeat_opts.interval.as_millis() as u64,
62 retry_interval: heartbeat_opts.retry_interval.as_millis() as u64,
63 resp_handler_executor,
64 start_time_ms: common_time::util::current_time_millis() as u64,
65 }
66 }
67
68 pub async fn start(&self) -> Result<()> {
69 let (req_sender, resp_stream) = self
70 .meta_client
71 .heartbeat()
72 .await
73 .context(error::CreateMetaHeartbeatStreamSnafu)?;
74
75 info!("A heartbeat connection has been established with metasrv");
76
77 let (outgoing_tx, outgoing_rx) = mpsc::channel(16);
78 let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
79
80 self.start_handle_resp_stream(resp_stream, mailbox);
81
82 self.start_heartbeat_report(req_sender, outgoing_rx);
83
84 Ok(())
85 }
86
87 fn start_handle_resp_stream(&self, mut resp_stream: HeartbeatStream, mailbox: MailboxRef) {
88 let capture_self = self.clone();
89 let retry_interval = self.retry_interval;
90
91 let _handle = common_runtime::spawn_hb(async move {
92 loop {
93 match resp_stream.message().await {
94 Ok(Some(resp)) => {
95 debug!("Receiving heartbeat response: {:?}", resp);
96 let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), resp);
97 if let Err(e) = capture_self.handle_response(ctx).await {
98 error!(e; "Error while handling heartbeat response");
99 HEARTBEAT_RECV_COUNT
100 .with_label_values(&["processing_error"])
101 .inc();
102 } else {
103 HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
104 }
105 }
106 Ok(None) => break,
107 Err(e) => {
108 HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
109 error!(e; "Occur error while reading heartbeat response");
110 capture_self
111 .start_with_retry(Duration::from_millis(retry_interval))
112 .await;
113
114 break;
115 }
116 }
117 }
118 });
119 }
120
121 fn new_heartbeat_request(
122 heartbeat_request: &HeartbeatRequest,
123 message: Option<OutgoingMessage>,
124 ) -> Option<HeartbeatRequest> {
125 let mailbox_message = match message.map(outgoing_message_to_mailbox_message) {
126 Some(Ok(message)) => Some(message),
127 Some(Err(e)) => {
128 error!(e; "Failed to encode mailbox messages");
129 return None;
130 }
131 None => None,
132 };
133
134 Some(HeartbeatRequest {
135 mailbox_message,
136 ..heartbeat_request.clone()
137 })
138 }
139
140 fn build_node_info(start_time_ms: u64) -> Option<NodeInfo> {
141 let build_info = common_version::build_info();
142
143 Some(NodeInfo {
144 version: build_info.version.to_string(),
145 git_commit: build_info.commit_short.to_string(),
146 start_time_ms,
147 cpus: num_cpus::get() as u32,
148 })
149 }
150
151 fn start_heartbeat_report(
152 &self,
153 req_sender: HeartbeatSender,
154 mut outgoing_rx: Receiver<OutgoingMessage>,
155 ) {
156 let report_interval = self.report_interval;
157 let start_time_ms = self.start_time_ms;
158 let self_peer = Some(Peer {
159 id: 0,
161 addr: self.peer_addr.clone(),
162 });
163
164 common_runtime::spawn_hb(async move {
165 let sleep = tokio::time::sleep(Duration::from_millis(0));
166 tokio::pin!(sleep);
167
168 let heartbeat_request = HeartbeatRequest {
169 peer: self_peer,
170 info: Self::build_node_info(start_time_ms),
171 ..Default::default()
172 };
173
174 loop {
175 let req = tokio::select! {
176 message = outgoing_rx.recv() => {
177 if let Some(message) = message {
178 Self::new_heartbeat_request(&heartbeat_request, Some(message))
179 } else {
180 break
182 }
183 }
184 _ = &mut sleep => {
185 sleep.as_mut().reset(Instant::now() + Duration::from_millis(report_interval));
186 Self::new_heartbeat_request(&heartbeat_request, None)
187 }
188 };
189
190 if let Some(req) = req {
191 if let Err(e) = req_sender.send(req.clone()).await {
192 error!(e; "Failed to send heartbeat to metasrv");
193 break;
194 } else {
195 HEARTBEAT_SENT_COUNT.inc();
196 debug!("Send a heartbeat request to metasrv, content: {:?}", req);
197 }
198 }
199 }
200 });
201 }
202
203 async fn handle_response(&self, ctx: HeartbeatResponseHandlerContext) -> Result<()> {
204 self.resp_handler_executor
205 .handle(ctx)
206 .await
207 .context(error::HandleHeartbeatResponseSnafu)
208 }
209
210 async fn start_with_retry(&self, retry_interval: Duration) {
211 loop {
212 tokio::time::sleep(retry_interval).await;
213
214 info!("Try to re-establish the heartbeat connection to metasrv.");
215
216 if self.start().await.is_ok() {
217 break;
218 }
219 }
220 }
221}