1use std::collections::HashMap;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicBool, Ordering};
18use std::time::Duration;
19
20use api::v1::meta::heartbeat_request::NodeWorkloads;
21use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat};
22use common_base::Plugins;
23use common_meta::cache_invalidator::CacheInvalidatorRef;
24use common_meta::datanode::REGION_STATISTIC_KEY;
25use common_meta::distributed_time_constants::BASE_HEARTBEAT_INTERVAL;
26use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
27use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
28use common_meta::heartbeat::handler::suspend::SuspendHandler;
29use common_meta::heartbeat::handler::{
30 HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
31};
32use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef};
33use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
34use common_meta::kv_backend::KvBackendRef;
35use common_stat::ResourceStatRef;
36use common_telemetry::{debug, error, info, trace, warn};
37use common_workload::DatanodeWorkloadType;
38use meta_client::MetaClientRef;
39use meta_client::client::heartbeat::HeartbeatConfig;
40use meta_client::client::{HeartbeatSender, MetaClient};
41use servers::addrs;
42use snafu::{OptionExt as _, ResultExt};
43use tokio::sync::{Notify, mpsc};
44use tokio::time::Instant;
45
46use self::handler::RegionHeartbeatResponseHandler;
47use crate::alive_keeper::{CountdownTaskHandlerExtRef, RegionAliveKeeper};
48use crate::config::DatanodeOptions;
49use crate::error::{self, MetaClientInitSnafu, RegionEngineNotFoundSnafu, Result};
50use crate::event_listener::RegionServerEventReceiver;
51use crate::metrics::{self, HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
52use crate::region_server::RegionServer;
53
54pub(crate) mod handler;
55pub(crate) mod task_tracker;
56
57pub struct HeartbeatTask {
59 node_id: u64,
60 workload_types: Vec<DatanodeWorkloadType>,
61 node_epoch: u64,
62 peer_addr: String,
63 running: Arc<AtomicBool>,
64 meta_client: MetaClientRef,
65 region_server: RegionServer,
66 resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
67 region_alive_keeper: Arc<RegionAliveKeeper>,
68 resource_stat: ResourceStatRef,
69}
70
71impl Drop for HeartbeatTask {
72 fn drop(&mut self) {
73 self.running.store(false, Ordering::Release);
74 }
75}
76
77impl HeartbeatTask {
78 pub async fn try_new(
80 opts: &DatanodeOptions,
81 region_server: RegionServer,
82 meta_client: MetaClientRef,
83 kv_backend: KvBackendRef,
84 cache_invalidator: CacheInvalidatorRef,
85 plugins: Plugins,
86 resource_stat: ResourceStatRef,
87 ) -> Result<Self> {
88 let countdown_task_handler_ext = plugins.get::<CountdownTaskHandlerExtRef>();
89 let region_alive_keeper = Arc::new(RegionAliveKeeper::new(
90 region_server.clone(),
91 countdown_task_handler_ext,
92 BASE_HEARTBEAT_INTERVAL,
93 ));
94 let resp_handler_executor = Arc::new(HandlerGroupExecutor::new(vec![
95 region_alive_keeper.clone(),
96 Arc::new(ParseMailboxMessageHandler),
97 Arc::new(SuspendHandler::new(region_server.suspend_state())),
98 Arc::new(
99 RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend)
100 .with_open_region_parallelism(opts.init_regions_parallelism),
101 ),
102 Arc::new(InvalidateCacheHandler::new(cache_invalidator)),
103 ]));
104
105 Ok(Self {
106 node_id: opts.node_id.unwrap_or(0),
107 workload_types: opts.workload_types.clone(),
108 node_epoch: common_time::util::current_time_millis() as u64,
110 peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
111 running: Arc::new(AtomicBool::new(false)),
112 meta_client,
113 region_server,
114 resp_handler_executor,
115 region_alive_keeper,
116 resource_stat,
117 })
118 }
119
120 pub async fn create_streams(
121 meta_client: &MetaClient,
122 running: Arc<AtomicBool>,
123 handler_executor: HeartbeatResponseHandlerExecutorRef,
124 mailbox: MailboxRef,
125 mut notify: Option<Arc<Notify>>,
126 quit_signal: Arc<Notify>,
127 ) -> Result<(HeartbeatSender, HeartbeatConfig)> {
128 let client_id = meta_client.id();
129 let (tx, mut rx, config) = meta_client.heartbeat().await.context(MetaClientInitSnafu)?;
130
131 let mut last_received_lease = Instant::now();
132
133 let _handle = common_runtime::spawn_hb(async move {
134 while let Some(res) = rx.message().await.unwrap_or_else(|e| {
135 error!(e; "Error while reading heartbeat response");
136 None
137 }) {
138 if let Some(msg) = res.mailbox_message.as_ref() {
139 info!("Received mailbox message: {msg:?}, meta_client id: {client_id:?}");
140 }
141 if let Some(lease) = res.region_lease.as_ref() {
142 metrics::LAST_RECEIVED_HEARTBEAT_ELAPSED
143 .set(last_received_lease.elapsed().as_millis() as i64);
144 last_received_lease = Instant::now();
146
147 let mut leader_region_lease_count = 0;
148 let mut follower_region_lease_count = 0;
149 for lease in &lease.regions {
150 match lease.role() {
151 RegionRole::Leader | RegionRole::DowngradingLeader => {
152 leader_region_lease_count += 1
153 }
154 RegionRole::Follower => follower_region_lease_count += 1,
155 }
156 }
157
158 metrics::HEARTBEAT_REGION_LEASES
159 .with_label_values(&["leader"])
160 .set(leader_region_lease_count);
161 metrics::HEARTBEAT_REGION_LEASES
162 .with_label_values(&["follower"])
163 .set(follower_region_lease_count);
164 }
165 let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), res);
166 if let Err(e) = Self::handle_response(ctx, handler_executor.clone()).await {
167 error!(e; "Error while handling heartbeat response");
168 }
169 if let Some(notify) = notify.take() {
170 notify.notify_one();
171 }
172 if !running.load(Ordering::Acquire) {
173 info!("Heartbeat task shutdown");
174 }
175 }
176 quit_signal.notify_one();
177 info!("Heartbeat handling loop exit.");
178 });
179 Ok((tx, config))
180 }
181
182 async fn handle_response(
183 ctx: HeartbeatResponseHandlerContext,
184 handler_executor: HeartbeatResponseHandlerExecutorRef,
185 ) -> Result<()> {
186 trace!("Heartbeat response: {:?}", ctx.response);
187 handler_executor
188 .handle(ctx)
189 .await
190 .context(error::HandleHeartbeatResponseSnafu)
191 }
192
193 #[allow(deprecated)]
194 pub async fn start(
196 &self,
197 event_receiver: RegionServerEventReceiver,
198 notify: Option<Arc<Notify>>,
199 ) -> Result<()> {
200 let running = self.running.clone();
201 if running
202 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
203 .is_err()
204 {
205 warn!("Heartbeat task started multiple times");
206 return Ok(());
207 }
208 let node_id = self.node_id;
209 let node_epoch = self.node_epoch;
210 let addr = &self.peer_addr;
211
212 let meta_client = self.meta_client.clone();
213 let region_server_clone = self.region_server.clone();
214
215 let handler_executor = self.resp_handler_executor.clone();
216
217 let (outgoing_tx, mut outgoing_rx) = mpsc::channel(16);
218 let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
219
220 let quit_signal = Arc::new(Notify::new());
221
222 let (mut tx, config) = Self::create_streams(
223 &meta_client,
224 running.clone(),
225 handler_executor.clone(),
226 mailbox.clone(),
227 notify,
228 quit_signal.clone(),
229 )
230 .await?;
231
232 let interval = config.interval.as_millis() as u64;
233 let mut retry_interval = config.retry_interval;
234
235 self.region_alive_keeper.update_heartbeat_interval(interval);
237
238 info!(
239 "Starting heartbeat to Metasrv with config: {}. My node id is {}, address is {}.",
240 config, node_id, addr
241 );
242
243 let self_peer = Some(Peer {
244 id: node_id,
245 addr: addr.clone(),
246 });
247 let epoch = self.region_alive_keeper.epoch();
248 let workload_types = self.workload_types.clone();
249
250 self.region_alive_keeper.start(Some(event_receiver)).await?;
251 let mut last_sent = Instant::now();
252 let total_cpu_millicores = self.resource_stat.get_total_cpu_millicores();
253 let total_memory_bytes = self.resource_stat.get_total_memory_bytes();
254 let resource_stat = self.resource_stat.clone();
255 let region_alive_keeper = self.region_alive_keeper.clone();
256 let gc_limiter = self
257 .region_server
258 .mito_engine()
259 .context(RegionEngineNotFoundSnafu { name: "mito" })?
260 .gc_limiter();
261
262 common_runtime::spawn_hb(async move {
263 let sleep = tokio::time::sleep(Duration::from_millis(0));
264 tokio::pin!(sleep);
265
266 let build_info = common_version::build_info();
267
268 let heartbeat_request = HeartbeatRequest {
269 peer: self_peer,
270 node_epoch,
271 info: Some(NodeInfo {
272 version: build_info.version.to_string(),
273 git_commit: build_info.commit_short.to_string(),
274 start_time_ms: node_epoch,
275 total_cpu_millicores,
276 total_memory_bytes,
277 cpu_usage_millicores: 0,
278 memory_usage_bytes: 0,
279 cpus: total_cpu_millicores as u32,
281 memory_bytes: total_memory_bytes as u64,
282 hostname: hostname::get()
283 .unwrap_or_default()
284 .to_string_lossy()
285 .to_string(),
286 }),
287 node_workloads: Some(NodeWorkloads::Datanode(DatanodeWorkloads {
288 types: workload_types.iter().map(|w| w.to_i32()).collect(),
289 })),
290 ..Default::default()
291 };
292
293 loop {
294 if !running.load(Ordering::Relaxed) {
295 info!("shutdown heartbeat task");
296 break;
297 }
298 let req = tokio::select! {
299 message = outgoing_rx.recv() => {
300 if let Some(message) = message {
301 match outgoing_message_to_mailbox_message(message) {
302 Ok(message) => {
303 let mut extensions = heartbeat_request.extensions.clone();
304 let gc_stat = gc_limiter.gc_stat();
305 gc_stat.into_extensions(&mut extensions);
306
307 let req = HeartbeatRequest {
308 mailbox_message: Some(message),
309 extensions,
310 ..heartbeat_request.clone()
311 };
312 HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
313 Some(req)
314 }
315 Err(e) => {
316 error!(e; "Failed to encode mailbox messages!");
317 HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
318 None
319 }
320 }
321 } else {
322 None
323 }
324 }
325 _ = &mut sleep => {
326 let region_stats = Self::load_region_stats(®ion_server_clone);
327 let topic_stats = region_server_clone.topic_stats();
328 let now = Instant::now();
329 let duration_since_epoch = (now - epoch).as_millis() as u64;
330
331 let mut extensions = heartbeat_request.extensions.clone();
332 let gc_stat = gc_limiter.gc_stat();
333 gc_stat.into_extensions(&mut extensions);
334
335 let mut req = HeartbeatRequest {
336 region_stats,
337 topic_stats,
338 duration_since_epoch,
339 extensions,
340 ..heartbeat_request.clone()
341 };
342
343 if let Some(info) = req.info.as_mut() {
344 info.cpu_usage_millicores = resource_stat.get_cpu_usage_millicores();
345 info.memory_usage_bytes = resource_stat.get_memory_usage_bytes();
346 }
347
348 sleep.as_mut().reset(now + Duration::from_millis(interval));
349 Some(req)
350 }
351 _ = quit_signal.notified() => {
353 let req = HeartbeatRequest::default();
354 Some(req)
355 }
356 };
357 if let Some(req) = req {
358 metrics::LAST_SENT_HEARTBEAT_ELAPSED
359 .set(last_sent.elapsed().as_millis() as i64);
360 last_sent = Instant::now();
362 debug!("Sending heartbeat request: {:?}", req);
363 if let Err(e) = tx.send(req).await {
364 error!(e; "Failed to send heartbeat to metasrv");
365 match Self::create_streams(
366 &meta_client,
367 running.clone(),
368 handler_executor.clone(),
369 mailbox.clone(),
370 None,
371 quit_signal.clone(),
372 )
373 .await
374 {
375 Ok((new_tx, new_config)) => {
376 info!("Reconnected to metasrv, heartbeat config: {}", new_config);
377 tx = new_tx;
378 retry_interval = new_config.retry_interval;
380 region_alive_keeper.update_heartbeat_interval(
382 new_config.interval.as_millis() as u64,
383 );
384 sleep.as_mut().reset(Instant::now());
386 }
387 Err(e) => {
388 sleep.as_mut().reset(Instant::now() + retry_interval);
392 error!(e; "Failed to reconnect to metasrv!");
393 }
394 }
395 } else {
396 HEARTBEAT_SENT_COUNT.inc();
397 }
398 }
399 }
400 });
401
402 Ok(())
403 }
404
405 fn load_region_stats(region_server: &RegionServer) -> Vec<RegionStat> {
406 region_server
407 .reportable_regions()
408 .into_iter()
409 .map(|stat| {
410 let region_stat = region_server
411 .region_statistic(stat.region_id)
412 .unwrap_or_default();
413 let mut extensions = HashMap::new();
414 if let Some(serialized) = region_stat.serialize_to_vec() {
415 extensions.insert(REGION_STATISTIC_KEY.to_string(), serialized);
416 }
417
418 RegionStat {
419 region_id: stat.region_id.as_u64(),
420 engine: stat.engine,
421 role: RegionRole::from(stat.role).into(),
422 rcus: 0,
424 wcus: 0,
425 approximate_bytes: region_stat.estimated_disk_size() as i64,
426 extensions,
427 }
428 })
429 .collect()
430 }
431
432 pub fn close(&self) -> Result<()> {
433 let running = self.running.clone();
434 if running
435 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
436 .is_err()
437 {
438 warn!("Call close heartbeat task multiple times");
439 }
440
441 Ok(())
442 }
443}