1use std::collections::HashMap;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::sync::Arc;
18use std::time::Duration;
19
20use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat};
21use common_base::Plugins;
22use common_meta::cache_invalidator::CacheInvalidatorRef;
23use common_meta::datanode::REGION_STATISTIC_KEY;
24use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
25use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
26use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
27use common_meta::heartbeat::handler::{
28 HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
29};
30use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef};
31use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
32use common_telemetry::{debug, error, info, trace, warn};
33use meta_client::client::{HeartbeatSender, MetaClient};
34use meta_client::MetaClientRef;
35use servers::addrs;
36use snafu::ResultExt;
37use tokio::sync::{mpsc, Notify};
38use tokio::time::Instant;
39
40use self::handler::RegionHeartbeatResponseHandler;
41use crate::alive_keeper::{CountdownTaskHandlerExtRef, RegionAliveKeeper};
42use crate::config::DatanodeOptions;
43use crate::error::{self, MetaClientInitSnafu, Result};
44use crate::event_listener::RegionServerEventReceiver;
45use crate::metrics::{self, HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
46use crate::region_server::RegionServer;
47
48pub(crate) mod handler;
49pub(crate) mod task_tracker;
50
51pub struct HeartbeatTask {
53 node_id: u64,
54 node_epoch: u64,
55 peer_addr: String,
56 running: Arc<AtomicBool>,
57 meta_client: MetaClientRef,
58 region_server: RegionServer,
59 interval: u64,
60 resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
61 region_alive_keeper: Arc<RegionAliveKeeper>,
62}
63
64impl Drop for HeartbeatTask {
65 fn drop(&mut self) {
66 self.running.store(false, Ordering::Release);
67 }
68}
69
70impl HeartbeatTask {
71 pub async fn try_new(
73 opts: &DatanodeOptions,
74 region_server: RegionServer,
75 meta_client: MetaClientRef,
76 cache_invalidator: CacheInvalidatorRef,
77 plugins: Plugins,
78 ) -> Result<Self> {
79 let countdown_task_handler_ext = plugins.get::<CountdownTaskHandlerExtRef>();
80 let region_alive_keeper = Arc::new(RegionAliveKeeper::new(
81 region_server.clone(),
82 countdown_task_handler_ext,
83 opts.heartbeat.interval.as_millis() as u64,
84 ));
85 let resp_handler_executor = Arc::new(HandlerGroupExecutor::new(vec![
86 region_alive_keeper.clone(),
87 Arc::new(ParseMailboxMessageHandler),
88 Arc::new(RegionHeartbeatResponseHandler::new(region_server.clone())),
89 Arc::new(InvalidateCacheHandler::new(cache_invalidator)),
90 ]));
91
92 Ok(Self {
93 node_id: opts.node_id.unwrap_or(0),
94 node_epoch: common_time::util::current_time_millis() as u64,
96 peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
97 running: Arc::new(AtomicBool::new(false)),
98 meta_client,
99 region_server,
100 interval: opts.heartbeat.interval.as_millis() as u64,
101 resp_handler_executor,
102 region_alive_keeper,
103 })
104 }
105
106 pub async fn create_streams(
107 meta_client: &MetaClient,
108 running: Arc<AtomicBool>,
109 handler_executor: HeartbeatResponseHandlerExecutorRef,
110 mailbox: MailboxRef,
111 mut notify: Option<Arc<Notify>>,
112 quit_signal: Arc<Notify>,
113 ) -> Result<HeartbeatSender> {
114 let client_id = meta_client.id();
115 let (tx, mut rx) = meta_client.heartbeat().await.context(MetaClientInitSnafu)?;
116
117 let mut last_received_lease = Instant::now();
118
119 let _handle = common_runtime::spawn_hb(async move {
120 while let Some(res) = rx.message().await.unwrap_or_else(|e| {
121 error!(e; "Error while reading heartbeat response");
122 None
123 }) {
124 if let Some(msg) = res.mailbox_message.as_ref() {
125 info!("Received mailbox message: {msg:?}, meta_client id: {client_id:?}");
126 }
127 if let Some(lease) = res.region_lease.as_ref() {
128 metrics::LAST_RECEIVED_HEARTBEAT_ELAPSED
129 .set(last_received_lease.elapsed().as_millis() as i64);
130 last_received_lease = Instant::now();
132
133 let mut leader_region_lease_count = 0;
134 let mut follower_region_lease_count = 0;
135 for lease in &lease.regions {
136 match lease.role() {
137 RegionRole::Leader | RegionRole::DowngradingLeader => {
138 leader_region_lease_count += 1
139 }
140 RegionRole::Follower => follower_region_lease_count += 1,
141 }
142 }
143
144 metrics::HEARTBEAT_REGION_LEASES
145 .with_label_values(&["leader"])
146 .set(leader_region_lease_count);
147 metrics::HEARTBEAT_REGION_LEASES
148 .with_label_values(&["follower"])
149 .set(follower_region_lease_count);
150 }
151 let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), res);
152 if let Err(e) = Self::handle_response(ctx, handler_executor.clone()).await {
153 error!(e; "Error while handling heartbeat response");
154 }
155 if let Some(notify) = notify.take() {
156 notify.notify_one();
157 }
158 if !running.load(Ordering::Acquire) {
159 info!("Heartbeat task shutdown");
160 }
161 }
162 quit_signal.notify_one();
163 info!("Heartbeat handling loop exit.");
164 });
165 Ok(tx)
166 }
167
168 async fn handle_response(
169 ctx: HeartbeatResponseHandlerContext,
170 handler_executor: HeartbeatResponseHandlerExecutorRef,
171 ) -> Result<()> {
172 trace!("Heartbeat response: {:?}", ctx.response);
173 handler_executor
174 .handle(ctx)
175 .await
176 .context(error::HandleHeartbeatResponseSnafu)
177 }
178
179 pub async fn start(
181 &self,
182 event_receiver: RegionServerEventReceiver,
183 notify: Option<Arc<Notify>>,
184 ) -> Result<()> {
185 let running = self.running.clone();
186 if running
187 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
188 .is_err()
189 {
190 warn!("Heartbeat task started multiple times");
191 return Ok(());
192 }
193 let interval = self.interval;
194 let node_id = self.node_id;
195 let node_epoch = self.node_epoch;
196 let addr = &self.peer_addr;
197 info!("Starting heartbeat to Metasrv with interval {interval}. My node id is {node_id}, address is {addr}.");
198
199 let meta_client = self.meta_client.clone();
200 let region_server_clone = self.region_server.clone();
201
202 let handler_executor = self.resp_handler_executor.clone();
203
204 let (outgoing_tx, mut outgoing_rx) = mpsc::channel(16);
205 let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
206
207 let quit_signal = Arc::new(Notify::new());
208
209 let mut tx = Self::create_streams(
210 &meta_client,
211 running.clone(),
212 handler_executor.clone(),
213 mailbox.clone(),
214 notify,
215 quit_signal.clone(),
216 )
217 .await?;
218
219 let self_peer = Some(Peer {
220 id: node_id,
221 addr: addr.clone(),
222 });
223 let epoch = self.region_alive_keeper.epoch();
224
225 self.region_alive_keeper.start(Some(event_receiver)).await?;
226 let mut last_sent = Instant::now();
227
228 common_runtime::spawn_hb(async move {
229 let sleep = tokio::time::sleep(Duration::from_millis(0));
230 tokio::pin!(sleep);
231
232 let build_info = common_version::build_info();
233 let heartbeat_request = HeartbeatRequest {
234 peer: self_peer,
235 node_epoch,
236 info: Some(NodeInfo {
237 version: build_info.version.to_string(),
238 git_commit: build_info.commit_short.to_string(),
239 start_time_ms: node_epoch,
240 cpus: num_cpus::get() as u32,
241 }),
242 ..Default::default()
243 };
244
245 loop {
246 if !running.load(Ordering::Relaxed) {
247 info!("shutdown heartbeat task");
248 break;
249 }
250 let req = tokio::select! {
251 message = outgoing_rx.recv() => {
252 if let Some(message) = message {
253 match outgoing_message_to_mailbox_message(message) {
254 Ok(message) => {
255 let req = HeartbeatRequest {
256 mailbox_message: Some(message),
257 ..heartbeat_request.clone()
258 };
259 HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
260 Some(req)
261 }
262 Err(e) => {
263 error!(e; "Failed to encode mailbox messages!");
264 HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
265 None
266 }
267 }
268 } else {
269 None
270 }
271 }
272 _ = &mut sleep => {
273 let region_stats = Self::load_region_stats(®ion_server_clone);
274 let now = Instant::now();
275 let duration_since_epoch = (now - epoch).as_millis() as u64;
276 let req = HeartbeatRequest {
277 region_stats,
278 duration_since_epoch,
279 ..heartbeat_request.clone()
280 };
281 sleep.as_mut().reset(now + Duration::from_millis(interval));
282 Some(req)
283 }
284 _ = quit_signal.notified() => {
286 let req = HeartbeatRequest::default();
287 Some(req)
288 }
289 };
290 if let Some(req) = req {
291 metrics::LAST_SENT_HEARTBEAT_ELAPSED
292 .set(last_sent.elapsed().as_millis() as i64);
293 last_sent = Instant::now();
295 debug!("Sending heartbeat request: {:?}", req);
296 if let Err(e) = tx.send(req).await {
297 error!(e; "Failed to send heartbeat to metasrv");
298 match Self::create_streams(
299 &meta_client,
300 running.clone(),
301 handler_executor.clone(),
302 mailbox.clone(),
303 None,
304 quit_signal.clone(),
305 )
306 .await
307 {
308 Ok(new_tx) => {
309 info!("Reconnected to metasrv");
310 tx = new_tx;
311 sleep.as_mut().reset(Instant::now());
313 }
314 Err(e) => {
315 sleep.as_mut().reset(
319 Instant::now()
320 + Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS),
321 );
322 error!(e; "Failed to reconnect to metasrv!");
323 }
324 }
325 } else {
326 HEARTBEAT_SENT_COUNT.inc();
327 }
328 }
329 }
330 });
331
332 Ok(())
333 }
334
335 fn load_region_stats(region_server: &RegionServer) -> Vec<RegionStat> {
336 region_server
337 .reportable_regions()
338 .into_iter()
339 .map(|stat| {
340 let region_stat = region_server
341 .region_statistic(stat.region_id)
342 .unwrap_or_default();
343 let mut extensions = HashMap::new();
344 if let Some(serialized) = region_stat.serialize_to_vec() {
345 extensions.insert(REGION_STATISTIC_KEY.to_string(), serialized);
346 }
347
348 RegionStat {
349 region_id: stat.region_id.as_u64(),
350 engine: stat.engine,
351 role: RegionRole::from(stat.role).into(),
352 rcus: 0,
354 wcus: 0,
355 approximate_bytes: region_stat.estimated_disk_size() as i64,
356 extensions,
357 }
358 })
359 .collect()
360 }
361
362 pub fn close(&self) -> Result<()> {
363 let running = self.running.clone();
364 if running
365 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
366 .is_err()
367 {
368 warn!("Call close heartbeat task multiple times");
369 }
370
371 Ok(())
372 }
373}