1use std::collections::HashMap;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicBool, Ordering};
18use std::time::Duration;
19
20use api::v1::meta::heartbeat_request::NodeWorkloads;
21use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat};
22use common_base::Plugins;
23use common_meta::cache_invalidator::CacheInvalidatorRef;
24use common_meta::datanode::REGION_STATISTIC_KEY;
25use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
26use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
27use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
28use common_meta::heartbeat::handler::{
29 HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
30};
31use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef};
32use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
33use common_stat::ResourceStatRef;
34use common_telemetry::{debug, error, info, trace, warn};
35use common_workload::DatanodeWorkloadType;
36use meta_client::MetaClientRef;
37use meta_client::client::{HeartbeatSender, MetaClient};
38use servers::addrs;
39use snafu::{OptionExt as _, ResultExt};
40use tokio::sync::{Notify, mpsc};
41use tokio::time::Instant;
42
43use self::handler::RegionHeartbeatResponseHandler;
44use crate::alive_keeper::{CountdownTaskHandlerExtRef, RegionAliveKeeper};
45use crate::config::DatanodeOptions;
46use crate::error::{self, MetaClientInitSnafu, RegionEngineNotFoundSnafu, Result};
47use crate::event_listener::RegionServerEventReceiver;
48use crate::metrics::{self, HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
49use crate::region_server::RegionServer;
50
51pub(crate) mod handler;
52pub(crate) mod task_tracker;
53
54pub struct HeartbeatTask {
56 node_id: u64,
57 workload_types: Vec<DatanodeWorkloadType>,
58 node_epoch: u64,
59 peer_addr: String,
60 running: Arc<AtomicBool>,
61 meta_client: MetaClientRef,
62 region_server: RegionServer,
63 interval: u64,
64 resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
65 region_alive_keeper: Arc<RegionAliveKeeper>,
66 resource_stat: ResourceStatRef,
67}
68
69impl Drop for HeartbeatTask {
70 fn drop(&mut self) {
71 self.running.store(false, Ordering::Release);
72 }
73}
74
75impl HeartbeatTask {
76 pub async fn try_new(
78 opts: &DatanodeOptions,
79 region_server: RegionServer,
80 meta_client: MetaClientRef,
81 cache_invalidator: CacheInvalidatorRef,
82 plugins: Plugins,
83 resource_stat: ResourceStatRef,
84 ) -> Result<Self> {
85 let countdown_task_handler_ext = plugins.get::<CountdownTaskHandlerExtRef>();
86 let region_alive_keeper = Arc::new(RegionAliveKeeper::new(
87 region_server.clone(),
88 countdown_task_handler_ext,
89 opts.heartbeat.interval.as_millis() as u64,
90 ));
91 let resp_handler_executor = Arc::new(HandlerGroupExecutor::new(vec![
92 region_alive_keeper.clone(),
93 Arc::new(ParseMailboxMessageHandler),
94 Arc::new(
95 RegionHeartbeatResponseHandler::new(region_server.clone())
96 .with_open_region_parallelism(opts.init_regions_parallelism),
97 ),
98 Arc::new(InvalidateCacheHandler::new(cache_invalidator)),
99 ]));
100
101 Ok(Self {
102 node_id: opts.node_id.unwrap_or(0),
103 workload_types: opts.workload_types.clone(),
104 node_epoch: common_time::util::current_time_millis() as u64,
106 peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
107 running: Arc::new(AtomicBool::new(false)),
108 meta_client,
109 region_server,
110 interval: opts.heartbeat.interval.as_millis() as u64,
111 resp_handler_executor,
112 region_alive_keeper,
113 resource_stat,
114 })
115 }
116
117 pub async fn create_streams(
118 meta_client: &MetaClient,
119 running: Arc<AtomicBool>,
120 handler_executor: HeartbeatResponseHandlerExecutorRef,
121 mailbox: MailboxRef,
122 mut notify: Option<Arc<Notify>>,
123 quit_signal: Arc<Notify>,
124 ) -> Result<HeartbeatSender> {
125 let client_id = meta_client.id();
126 let (tx, mut rx) = meta_client.heartbeat().await.context(MetaClientInitSnafu)?;
127
128 let mut last_received_lease = Instant::now();
129
130 let _handle = common_runtime::spawn_hb(async move {
131 while let Some(res) = rx.message().await.unwrap_or_else(|e| {
132 error!(e; "Error while reading heartbeat response");
133 None
134 }) {
135 if let Some(msg) = res.mailbox_message.as_ref() {
136 info!("Received mailbox message: {msg:?}, meta_client id: {client_id:?}");
137 }
138 if let Some(lease) = res.region_lease.as_ref() {
139 metrics::LAST_RECEIVED_HEARTBEAT_ELAPSED
140 .set(last_received_lease.elapsed().as_millis() as i64);
141 last_received_lease = Instant::now();
143
144 let mut leader_region_lease_count = 0;
145 let mut follower_region_lease_count = 0;
146 for lease in &lease.regions {
147 match lease.role() {
148 RegionRole::Leader | RegionRole::DowngradingLeader => {
149 leader_region_lease_count += 1
150 }
151 RegionRole::Follower => follower_region_lease_count += 1,
152 }
153 }
154
155 metrics::HEARTBEAT_REGION_LEASES
156 .with_label_values(&["leader"])
157 .set(leader_region_lease_count);
158 metrics::HEARTBEAT_REGION_LEASES
159 .with_label_values(&["follower"])
160 .set(follower_region_lease_count);
161 }
162 let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), res);
163 if let Err(e) = Self::handle_response(ctx, handler_executor.clone()).await {
164 error!(e; "Error while handling heartbeat response");
165 }
166 if let Some(notify) = notify.take() {
167 notify.notify_one();
168 }
169 if !running.load(Ordering::Acquire) {
170 info!("Heartbeat task shutdown");
171 }
172 }
173 quit_signal.notify_one();
174 info!("Heartbeat handling loop exit.");
175 });
176 Ok(tx)
177 }
178
179 async fn handle_response(
180 ctx: HeartbeatResponseHandlerContext,
181 handler_executor: HeartbeatResponseHandlerExecutorRef,
182 ) -> Result<()> {
183 trace!("Heartbeat response: {:?}", ctx.response);
184 handler_executor
185 .handle(ctx)
186 .await
187 .context(error::HandleHeartbeatResponseSnafu)
188 }
189
190 #[allow(deprecated)]
191 pub async fn start(
193 &self,
194 event_receiver: RegionServerEventReceiver,
195 notify: Option<Arc<Notify>>,
196 ) -> Result<()> {
197 let running = self.running.clone();
198 if running
199 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
200 .is_err()
201 {
202 warn!("Heartbeat task started multiple times");
203 return Ok(());
204 }
205 let interval = self.interval;
206 let node_id = self.node_id;
207 let node_epoch = self.node_epoch;
208 let addr = &self.peer_addr;
209 info!(
210 "Starting heartbeat to Metasrv with interval {interval}. My node id is {node_id}, address is {addr}."
211 );
212
213 let meta_client = self.meta_client.clone();
214 let region_server_clone = self.region_server.clone();
215
216 let handler_executor = self.resp_handler_executor.clone();
217
218 let (outgoing_tx, mut outgoing_rx) = mpsc::channel(16);
219 let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
220
221 let quit_signal = Arc::new(Notify::new());
222
223 let mut tx = Self::create_streams(
224 &meta_client,
225 running.clone(),
226 handler_executor.clone(),
227 mailbox.clone(),
228 notify,
229 quit_signal.clone(),
230 )
231 .await?;
232
233 let self_peer = Some(Peer {
234 id: node_id,
235 addr: addr.clone(),
236 });
237 let epoch = self.region_alive_keeper.epoch();
238 let workload_types = self.workload_types.clone();
239
240 self.region_alive_keeper.start(Some(event_receiver)).await?;
241 let mut last_sent = Instant::now();
242 let total_cpu_millicores = self.resource_stat.get_total_cpu_millicores();
243 let total_memory_bytes = self.resource_stat.get_total_memory_bytes();
244 let resource_stat = self.resource_stat.clone();
245 let gc_limiter = self
246 .region_server
247 .mito_engine()
248 .context(RegionEngineNotFoundSnafu { name: "mito" })?
249 .gc_limiter();
250
251 common_runtime::spawn_hb(async move {
252 let sleep = tokio::time::sleep(Duration::from_millis(0));
253 tokio::pin!(sleep);
254
255 let build_info = common_version::build_info();
256
257 let heartbeat_request = HeartbeatRequest {
258 peer: self_peer,
259 node_epoch,
260 info: Some(NodeInfo {
261 version: build_info.version.to_string(),
262 git_commit: build_info.commit_short.to_string(),
263 start_time_ms: node_epoch,
264 total_cpu_millicores,
265 total_memory_bytes,
266 cpu_usage_millicores: 0,
267 memory_usage_bytes: 0,
268 cpus: total_cpu_millicores as u32,
270 memory_bytes: total_memory_bytes as u64,
271 hostname: hostname::get()
272 .unwrap_or_default()
273 .to_string_lossy()
274 .to_string(),
275 }),
276 node_workloads: Some(NodeWorkloads::Datanode(DatanodeWorkloads {
277 types: workload_types.iter().map(|w| w.to_i32()).collect(),
278 })),
279 ..Default::default()
280 };
281
282 loop {
283 if !running.load(Ordering::Relaxed) {
284 info!("shutdown heartbeat task");
285 break;
286 }
287 let req = tokio::select! {
288 message = outgoing_rx.recv() => {
289 if let Some(message) = message {
290 match outgoing_message_to_mailbox_message(message) {
291 Ok(message) => {
292 let mut extensions = heartbeat_request.extensions.clone();
293 let gc_stat = gc_limiter.gc_stat();
294 gc_stat.into_extensions(&mut extensions);
295
296 let req = HeartbeatRequest {
297 mailbox_message: Some(message),
298 extensions,
299 ..heartbeat_request.clone()
300 };
301 HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
302 Some(req)
303 }
304 Err(e) => {
305 error!(e; "Failed to encode mailbox messages!");
306 HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
307 None
308 }
309 }
310 } else {
311 None
312 }
313 }
314 _ = &mut sleep => {
315 let region_stats = Self::load_region_stats(®ion_server_clone);
316 let topic_stats = region_server_clone.topic_stats();
317 let now = Instant::now();
318 let duration_since_epoch = (now - epoch).as_millis() as u64;
319
320 let mut extensions = heartbeat_request.extensions.clone();
321 let gc_stat = gc_limiter.gc_stat();
322 gc_stat.into_extensions(&mut extensions);
323
324 let mut req = HeartbeatRequest {
325 region_stats,
326 topic_stats,
327 duration_since_epoch,
328 extensions,
329 ..heartbeat_request.clone()
330 };
331
332 if let Some(info) = req.info.as_mut() {
333 info.cpu_usage_millicores = resource_stat.get_cpu_usage_millicores();
334 info.memory_usage_bytes = resource_stat.get_memory_usage_bytes();
335 }
336
337 sleep.as_mut().reset(now + Duration::from_millis(interval));
338 Some(req)
339 }
340 _ = quit_signal.notified() => {
342 let req = HeartbeatRequest::default();
343 Some(req)
344 }
345 };
346 if let Some(req) = req {
347 metrics::LAST_SENT_HEARTBEAT_ELAPSED
348 .set(last_sent.elapsed().as_millis() as i64);
349 last_sent = Instant::now();
351 debug!("Sending heartbeat request: {:?}", req);
352 if let Err(e) = tx.send(req).await {
353 error!(e; "Failed to send heartbeat to metasrv");
354 match Self::create_streams(
355 &meta_client,
356 running.clone(),
357 handler_executor.clone(),
358 mailbox.clone(),
359 None,
360 quit_signal.clone(),
361 )
362 .await
363 {
364 Ok(new_tx) => {
365 info!("Reconnected to metasrv");
366 tx = new_tx;
367 sleep.as_mut().reset(Instant::now());
369 }
370 Err(e) => {
371 sleep.as_mut().reset(
375 Instant::now()
376 + Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS),
377 );
378 error!(e; "Failed to reconnect to metasrv!");
379 }
380 }
381 } else {
382 HEARTBEAT_SENT_COUNT.inc();
383 }
384 }
385 }
386 });
387
388 Ok(())
389 }
390
391 fn load_region_stats(region_server: &RegionServer) -> Vec<RegionStat> {
392 region_server
393 .reportable_regions()
394 .into_iter()
395 .map(|stat| {
396 let region_stat = region_server
397 .region_statistic(stat.region_id)
398 .unwrap_or_default();
399 let mut extensions = HashMap::new();
400 if let Some(serialized) = region_stat.serialize_to_vec() {
401 extensions.insert(REGION_STATISTIC_KEY.to_string(), serialized);
402 }
403
404 RegionStat {
405 region_id: stat.region_id.as_u64(),
406 engine: stat.engine,
407 role: RegionRole::from(stat.role).into(),
408 rcus: 0,
410 wcus: 0,
411 approximate_bytes: region_stat.estimated_disk_size() as i64,
412 extensions,
413 }
414 })
415 .collect()
416 }
417
418 pub fn close(&self) -> Result<()> {
419 let running = self.running.clone();
420 if running
421 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
422 .is_err()
423 {
424 warn!("Call close heartbeat task multiple times");
425 }
426
427 Ok(())
428 }
429}