1use std::collections::HashMap;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::sync::Arc;
18use std::time::Duration;
19
20use api::v1::meta::heartbeat_request::NodeWorkloads;
21use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat};
22use common_base::Plugins;
23use common_meta::cache_invalidator::CacheInvalidatorRef;
24use common_meta::datanode::REGION_STATISTIC_KEY;
25use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
26use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
27use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
28use common_meta::heartbeat::handler::{
29 HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
30};
31use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef};
32use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
33use common_telemetry::{debug, error, info, trace, warn};
34use common_workload::DatanodeWorkloadType;
35use meta_client::client::{HeartbeatSender, MetaClient};
36use meta_client::MetaClientRef;
37use servers::addrs;
38use snafu::ResultExt;
39use tokio::sync::{mpsc, Notify};
40use tokio::time::Instant;
41
42use self::handler::RegionHeartbeatResponseHandler;
43use crate::alive_keeper::{CountdownTaskHandlerExtRef, RegionAliveKeeper};
44use crate::config::DatanodeOptions;
45use crate::error::{self, MetaClientInitSnafu, Result};
46use crate::event_listener::RegionServerEventReceiver;
47use crate::metrics::{self, HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
48use crate::region_server::RegionServer;
49
50pub(crate) mod handler;
51pub(crate) mod task_tracker;
52
53pub struct HeartbeatTask {
55 node_id: u64,
56 workload_types: Vec<DatanodeWorkloadType>,
57 node_epoch: u64,
58 peer_addr: String,
59 running: Arc<AtomicBool>,
60 meta_client: MetaClientRef,
61 region_server: RegionServer,
62 interval: u64,
63 resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
64 region_alive_keeper: Arc<RegionAliveKeeper>,
65}
66
67impl Drop for HeartbeatTask {
68 fn drop(&mut self) {
69 self.running.store(false, Ordering::Release);
70 }
71}
72
73impl HeartbeatTask {
74 pub async fn try_new(
76 opts: &DatanodeOptions,
77 region_server: RegionServer,
78 meta_client: MetaClientRef,
79 cache_invalidator: CacheInvalidatorRef,
80 plugins: Plugins,
81 ) -> Result<Self> {
82 let countdown_task_handler_ext = plugins.get::<CountdownTaskHandlerExtRef>();
83 let region_alive_keeper = Arc::new(RegionAliveKeeper::new(
84 region_server.clone(),
85 countdown_task_handler_ext,
86 opts.heartbeat.interval.as_millis() as u64,
87 ));
88 let resp_handler_executor = Arc::new(HandlerGroupExecutor::new(vec![
89 region_alive_keeper.clone(),
90 Arc::new(ParseMailboxMessageHandler),
91 Arc::new(RegionHeartbeatResponseHandler::new(region_server.clone())),
92 Arc::new(InvalidateCacheHandler::new(cache_invalidator)),
93 ]));
94
95 Ok(Self {
96 node_id: opts.node_id.unwrap_or(0),
97 workload_types: opts.workload_types.clone(),
98 node_epoch: common_time::util::current_time_millis() as u64,
100 peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
101 running: Arc::new(AtomicBool::new(false)),
102 meta_client,
103 region_server,
104 interval: opts.heartbeat.interval.as_millis() as u64,
105 resp_handler_executor,
106 region_alive_keeper,
107 })
108 }
109
110 pub async fn create_streams(
111 meta_client: &MetaClient,
112 running: Arc<AtomicBool>,
113 handler_executor: HeartbeatResponseHandlerExecutorRef,
114 mailbox: MailboxRef,
115 mut notify: Option<Arc<Notify>>,
116 quit_signal: Arc<Notify>,
117 ) -> Result<HeartbeatSender> {
118 let client_id = meta_client.id();
119 let (tx, mut rx) = meta_client.heartbeat().await.context(MetaClientInitSnafu)?;
120
121 let mut last_received_lease = Instant::now();
122
123 let _handle = common_runtime::spawn_hb(async move {
124 while let Some(res) = rx.message().await.unwrap_or_else(|e| {
125 error!(e; "Error while reading heartbeat response");
126 None
127 }) {
128 if let Some(msg) = res.mailbox_message.as_ref() {
129 info!("Received mailbox message: {msg:?}, meta_client id: {client_id:?}");
130 }
131 if let Some(lease) = res.region_lease.as_ref() {
132 metrics::LAST_RECEIVED_HEARTBEAT_ELAPSED
133 .set(last_received_lease.elapsed().as_millis() as i64);
134 last_received_lease = Instant::now();
136
137 let mut leader_region_lease_count = 0;
138 let mut follower_region_lease_count = 0;
139 for lease in &lease.regions {
140 match lease.role() {
141 RegionRole::Leader | RegionRole::DowngradingLeader => {
142 leader_region_lease_count += 1
143 }
144 RegionRole::Follower => follower_region_lease_count += 1,
145 }
146 }
147
148 metrics::HEARTBEAT_REGION_LEASES
149 .with_label_values(&["leader"])
150 .set(leader_region_lease_count);
151 metrics::HEARTBEAT_REGION_LEASES
152 .with_label_values(&["follower"])
153 .set(follower_region_lease_count);
154 }
155 let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), res);
156 if let Err(e) = Self::handle_response(ctx, handler_executor.clone()).await {
157 error!(e; "Error while handling heartbeat response");
158 }
159 if let Some(notify) = notify.take() {
160 notify.notify_one();
161 }
162 if !running.load(Ordering::Acquire) {
163 info!("Heartbeat task shutdown");
164 }
165 }
166 quit_signal.notify_one();
167 info!("Heartbeat handling loop exit.");
168 });
169 Ok(tx)
170 }
171
172 async fn handle_response(
173 ctx: HeartbeatResponseHandlerContext,
174 handler_executor: HeartbeatResponseHandlerExecutorRef,
175 ) -> Result<()> {
176 trace!("Heartbeat response: {:?}", ctx.response);
177 handler_executor
178 .handle(ctx)
179 .await
180 .context(error::HandleHeartbeatResponseSnafu)
181 }
182
183 pub async fn start(
185 &self,
186 event_receiver: RegionServerEventReceiver,
187 notify: Option<Arc<Notify>>,
188 ) -> Result<()> {
189 let running = self.running.clone();
190 if running
191 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
192 .is_err()
193 {
194 warn!("Heartbeat task started multiple times");
195 return Ok(());
196 }
197 let interval = self.interval;
198 let node_id = self.node_id;
199 let node_epoch = self.node_epoch;
200 let addr = &self.peer_addr;
201 info!("Starting heartbeat to Metasrv with interval {interval}. My node id is {node_id}, address is {addr}.");
202
203 let meta_client = self.meta_client.clone();
204 let region_server_clone = self.region_server.clone();
205
206 let handler_executor = self.resp_handler_executor.clone();
207
208 let (outgoing_tx, mut outgoing_rx) = mpsc::channel(16);
209 let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
210
211 let quit_signal = Arc::new(Notify::new());
212
213 let mut tx = Self::create_streams(
214 &meta_client,
215 running.clone(),
216 handler_executor.clone(),
217 mailbox.clone(),
218 notify,
219 quit_signal.clone(),
220 )
221 .await?;
222
223 let self_peer = Some(Peer {
224 id: node_id,
225 addr: addr.clone(),
226 });
227 let epoch = self.region_alive_keeper.epoch();
228 let workload_types = self.workload_types.clone();
229
230 self.region_alive_keeper.start(Some(event_receiver)).await?;
231 let mut last_sent = Instant::now();
232
233 common_runtime::spawn_hb(async move {
234 let sleep = tokio::time::sleep(Duration::from_millis(0));
235 tokio::pin!(sleep);
236
237 let build_info = common_version::build_info();
238 let heartbeat_request = HeartbeatRequest {
239 peer: self_peer,
240 node_epoch,
241 info: Some(NodeInfo {
242 version: build_info.version.to_string(),
243 git_commit: build_info.commit_short.to_string(),
244 start_time_ms: node_epoch,
245 cpus: num_cpus::get() as u32,
246 }),
247 node_workloads: Some(NodeWorkloads::Datanode(DatanodeWorkloads {
248 types: workload_types.iter().map(|w| w.to_i32()).collect(),
249 })),
250 ..Default::default()
251 };
252
253 loop {
254 if !running.load(Ordering::Relaxed) {
255 info!("shutdown heartbeat task");
256 break;
257 }
258 let req = tokio::select! {
259 message = outgoing_rx.recv() => {
260 if let Some(message) = message {
261 match outgoing_message_to_mailbox_message(message) {
262 Ok(message) => {
263 let req = HeartbeatRequest {
264 mailbox_message: Some(message),
265 ..heartbeat_request.clone()
266 };
267 HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
268 Some(req)
269 }
270 Err(e) => {
271 error!(e; "Failed to encode mailbox messages!");
272 HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
273 None
274 }
275 }
276 } else {
277 None
278 }
279 }
280 _ = &mut sleep => {
281 let region_stats = Self::load_region_stats(®ion_server_clone);
282 let now = Instant::now();
283 let duration_since_epoch = (now - epoch).as_millis() as u64;
284 let req = HeartbeatRequest {
285 region_stats,
286 duration_since_epoch,
287 ..heartbeat_request.clone()
288 };
289 sleep.as_mut().reset(now + Duration::from_millis(interval));
290 Some(req)
291 }
292 _ = quit_signal.notified() => {
294 let req = HeartbeatRequest::default();
295 Some(req)
296 }
297 };
298 if let Some(req) = req {
299 metrics::LAST_SENT_HEARTBEAT_ELAPSED
300 .set(last_sent.elapsed().as_millis() as i64);
301 last_sent = Instant::now();
303 debug!("Sending heartbeat request: {:?}", req);
304 if let Err(e) = tx.send(req).await {
305 error!(e; "Failed to send heartbeat to metasrv");
306 match Self::create_streams(
307 &meta_client,
308 running.clone(),
309 handler_executor.clone(),
310 mailbox.clone(),
311 None,
312 quit_signal.clone(),
313 )
314 .await
315 {
316 Ok(new_tx) => {
317 info!("Reconnected to metasrv");
318 tx = new_tx;
319 sleep.as_mut().reset(Instant::now());
321 }
322 Err(e) => {
323 sleep.as_mut().reset(
327 Instant::now()
328 + Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS),
329 );
330 error!(e; "Failed to reconnect to metasrv!");
331 }
332 }
333 } else {
334 HEARTBEAT_SENT_COUNT.inc();
335 }
336 }
337 }
338 });
339
340 Ok(())
341 }
342
343 fn load_region_stats(region_server: &RegionServer) -> Vec<RegionStat> {
344 region_server
345 .reportable_regions()
346 .into_iter()
347 .map(|stat| {
348 let region_stat = region_server
349 .region_statistic(stat.region_id)
350 .unwrap_or_default();
351 let mut extensions = HashMap::new();
352 if let Some(serialized) = region_stat.serialize_to_vec() {
353 extensions.insert(REGION_STATISTIC_KEY.to_string(), serialized);
354 }
355
356 RegionStat {
357 region_id: stat.region_id.as_u64(),
358 engine: stat.engine,
359 role: RegionRole::from(stat.role).into(),
360 rcus: 0,
362 wcus: 0,
363 approximate_bytes: region_stat.estimated_disk_size() as i64,
364 extensions,
365 }
366 })
367 .collect()
368 }
369
370 pub fn close(&self) -> Result<()> {
371 let running = self.running.clone();
372 if running
373 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
374 .is_err()
375 {
376 warn!("Call close heartbeat task multiple times");
377 }
378
379 Ok(())
380 }
381}