1use std::collections::HashMap;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicBool, Ordering};
18use std::time::Duration;
19
20use api::v1::meta::heartbeat_request::NodeWorkloads;
21use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat};
22use common_base::Plugins;
23use common_meta::cache_invalidator::CacheInvalidatorRef;
24use common_meta::datanode::REGION_STATISTIC_KEY;
25use common_meta::distributed_time_constants::BASE_HEARTBEAT_INTERVAL;
26use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
27use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
28use common_meta::heartbeat::handler::suspend::SuspendHandler;
29use common_meta::heartbeat::handler::{
30 HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
31};
32use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef};
33use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
34use common_stat::ResourceStatRef;
35use common_telemetry::{debug, error, info, trace, warn};
36use common_workload::DatanodeWorkloadType;
37use meta_client::MetaClientRef;
38use meta_client::client::heartbeat::HeartbeatConfig;
39use meta_client::client::{HeartbeatSender, MetaClient};
40use servers::addrs;
41use snafu::{OptionExt as _, ResultExt};
42use tokio::sync::{Notify, mpsc};
43use tokio::time::Instant;
44
45use self::handler::RegionHeartbeatResponseHandler;
46use crate::alive_keeper::{CountdownTaskHandlerExtRef, RegionAliveKeeper};
47use crate::config::DatanodeOptions;
48use crate::error::{self, MetaClientInitSnafu, RegionEngineNotFoundSnafu, Result};
49use crate::event_listener::RegionServerEventReceiver;
50use crate::metrics::{self, HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
51use crate::region_server::RegionServer;
52
53pub(crate) mod handler;
54pub(crate) mod task_tracker;
55
56pub struct HeartbeatTask {
58 node_id: u64,
59 workload_types: Vec<DatanodeWorkloadType>,
60 node_epoch: u64,
61 peer_addr: String,
62 running: Arc<AtomicBool>,
63 meta_client: MetaClientRef,
64 region_server: RegionServer,
65 resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
66 region_alive_keeper: Arc<RegionAliveKeeper>,
67 resource_stat: ResourceStatRef,
68}
69
70impl Drop for HeartbeatTask {
71 fn drop(&mut self) {
72 self.running.store(false, Ordering::Release);
73 }
74}
75
76impl HeartbeatTask {
77 pub async fn try_new(
79 opts: &DatanodeOptions,
80 region_server: RegionServer,
81 meta_client: MetaClientRef,
82 cache_invalidator: CacheInvalidatorRef,
83 plugins: Plugins,
84 resource_stat: ResourceStatRef,
85 ) -> Result<Self> {
86 let countdown_task_handler_ext = plugins.get::<CountdownTaskHandlerExtRef>();
87 let region_alive_keeper = Arc::new(RegionAliveKeeper::new(
88 region_server.clone(),
89 countdown_task_handler_ext,
90 BASE_HEARTBEAT_INTERVAL,
91 ));
92 let resp_handler_executor = Arc::new(HandlerGroupExecutor::new(vec![
93 region_alive_keeper.clone(),
94 Arc::new(ParseMailboxMessageHandler),
95 Arc::new(SuspendHandler::new(region_server.suspend_state())),
96 Arc::new(
97 RegionHeartbeatResponseHandler::new(region_server.clone())
98 .with_open_region_parallelism(opts.init_regions_parallelism),
99 ),
100 Arc::new(InvalidateCacheHandler::new(cache_invalidator)),
101 ]));
102
103 Ok(Self {
104 node_id: opts.node_id.unwrap_or(0),
105 workload_types: opts.workload_types.clone(),
106 node_epoch: common_time::util::current_time_millis() as u64,
108 peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
109 running: Arc::new(AtomicBool::new(false)),
110 meta_client,
111 region_server,
112 resp_handler_executor,
113 region_alive_keeper,
114 resource_stat,
115 })
116 }
117
118 pub async fn create_streams(
119 meta_client: &MetaClient,
120 running: Arc<AtomicBool>,
121 handler_executor: HeartbeatResponseHandlerExecutorRef,
122 mailbox: MailboxRef,
123 mut notify: Option<Arc<Notify>>,
124 quit_signal: Arc<Notify>,
125 ) -> Result<(HeartbeatSender, HeartbeatConfig)> {
126 let client_id = meta_client.id();
127 let (tx, mut rx, config) = meta_client.heartbeat().await.context(MetaClientInitSnafu)?;
128
129 let mut last_received_lease = Instant::now();
130
131 let _handle = common_runtime::spawn_hb(async move {
132 while let Some(res) = rx.message().await.unwrap_or_else(|e| {
133 error!(e; "Error while reading heartbeat response");
134 None
135 }) {
136 if let Some(msg) = res.mailbox_message.as_ref() {
137 info!("Received mailbox message: {msg:?}, meta_client id: {client_id:?}");
138 }
139 if let Some(lease) = res.region_lease.as_ref() {
140 metrics::LAST_RECEIVED_HEARTBEAT_ELAPSED
141 .set(last_received_lease.elapsed().as_millis() as i64);
142 last_received_lease = Instant::now();
144
145 let mut leader_region_lease_count = 0;
146 let mut follower_region_lease_count = 0;
147 for lease in &lease.regions {
148 match lease.role() {
149 RegionRole::Leader | RegionRole::DowngradingLeader => {
150 leader_region_lease_count += 1
151 }
152 RegionRole::Follower => follower_region_lease_count += 1,
153 }
154 }
155
156 metrics::HEARTBEAT_REGION_LEASES
157 .with_label_values(&["leader"])
158 .set(leader_region_lease_count);
159 metrics::HEARTBEAT_REGION_LEASES
160 .with_label_values(&["follower"])
161 .set(follower_region_lease_count);
162 }
163 let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), res);
164 if let Err(e) = Self::handle_response(ctx, handler_executor.clone()).await {
165 error!(e; "Error while handling heartbeat response");
166 }
167 if let Some(notify) = notify.take() {
168 notify.notify_one();
169 }
170 if !running.load(Ordering::Acquire) {
171 info!("Heartbeat task shutdown");
172 }
173 }
174 quit_signal.notify_one();
175 info!("Heartbeat handling loop exit.");
176 });
177 Ok((tx, config))
178 }
179
180 async fn handle_response(
181 ctx: HeartbeatResponseHandlerContext,
182 handler_executor: HeartbeatResponseHandlerExecutorRef,
183 ) -> Result<()> {
184 trace!("Heartbeat response: {:?}", ctx.response);
185 handler_executor
186 .handle(ctx)
187 .await
188 .context(error::HandleHeartbeatResponseSnafu)
189 }
190
191 #[allow(deprecated)]
192 pub async fn start(
194 &self,
195 event_receiver: RegionServerEventReceiver,
196 notify: Option<Arc<Notify>>,
197 ) -> Result<()> {
198 let running = self.running.clone();
199 if running
200 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
201 .is_err()
202 {
203 warn!("Heartbeat task started multiple times");
204 return Ok(());
205 }
206 let node_id = self.node_id;
207 let node_epoch = self.node_epoch;
208 let addr = &self.peer_addr;
209
210 let meta_client = self.meta_client.clone();
211 let region_server_clone = self.region_server.clone();
212
213 let handler_executor = self.resp_handler_executor.clone();
214
215 let (outgoing_tx, mut outgoing_rx) = mpsc::channel(16);
216 let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
217
218 let quit_signal = Arc::new(Notify::new());
219
220 let (mut tx, config) = Self::create_streams(
221 &meta_client,
222 running.clone(),
223 handler_executor.clone(),
224 mailbox.clone(),
225 notify,
226 quit_signal.clone(),
227 )
228 .await?;
229
230 let interval = config.interval.as_millis() as u64;
231 let mut retry_interval = config.retry_interval;
232
233 self.region_alive_keeper.update_heartbeat_interval(interval);
235
236 info!(
237 "Starting heartbeat to Metasrv with config: {}. My node id is {}, address is {}.",
238 config, node_id, addr
239 );
240
241 let self_peer = Some(Peer {
242 id: node_id,
243 addr: addr.clone(),
244 });
245 let epoch = self.region_alive_keeper.epoch();
246 let workload_types = self.workload_types.clone();
247
248 self.region_alive_keeper.start(Some(event_receiver)).await?;
249 let mut last_sent = Instant::now();
250 let total_cpu_millicores = self.resource_stat.get_total_cpu_millicores();
251 let total_memory_bytes = self.resource_stat.get_total_memory_bytes();
252 let resource_stat = self.resource_stat.clone();
253 let region_alive_keeper = self.region_alive_keeper.clone();
254 let gc_limiter = self
255 .region_server
256 .mito_engine()
257 .context(RegionEngineNotFoundSnafu { name: "mito" })?
258 .gc_limiter();
259
260 common_runtime::spawn_hb(async move {
261 let sleep = tokio::time::sleep(Duration::from_millis(0));
262 tokio::pin!(sleep);
263
264 let build_info = common_version::build_info();
265
266 let heartbeat_request = HeartbeatRequest {
267 peer: self_peer,
268 node_epoch,
269 info: Some(NodeInfo {
270 version: build_info.version.to_string(),
271 git_commit: build_info.commit_short.to_string(),
272 start_time_ms: node_epoch,
273 total_cpu_millicores,
274 total_memory_bytes,
275 cpu_usage_millicores: 0,
276 memory_usage_bytes: 0,
277 cpus: total_cpu_millicores as u32,
279 memory_bytes: total_memory_bytes as u64,
280 hostname: hostname::get()
281 .unwrap_or_default()
282 .to_string_lossy()
283 .to_string(),
284 }),
285 node_workloads: Some(NodeWorkloads::Datanode(DatanodeWorkloads {
286 types: workload_types.iter().map(|w| w.to_i32()).collect(),
287 })),
288 ..Default::default()
289 };
290
291 loop {
292 if !running.load(Ordering::Relaxed) {
293 info!("shutdown heartbeat task");
294 break;
295 }
296 let req = tokio::select! {
297 message = outgoing_rx.recv() => {
298 if let Some(message) = message {
299 match outgoing_message_to_mailbox_message(message) {
300 Ok(message) => {
301 let mut extensions = heartbeat_request.extensions.clone();
302 let gc_stat = gc_limiter.gc_stat();
303 gc_stat.into_extensions(&mut extensions);
304
305 let req = HeartbeatRequest {
306 mailbox_message: Some(message),
307 extensions,
308 ..heartbeat_request.clone()
309 };
310 HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
311 Some(req)
312 }
313 Err(e) => {
314 error!(e; "Failed to encode mailbox messages!");
315 HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
316 None
317 }
318 }
319 } else {
320 None
321 }
322 }
323 _ = &mut sleep => {
324 let region_stats = Self::load_region_stats(®ion_server_clone);
325 let topic_stats = region_server_clone.topic_stats();
326 let now = Instant::now();
327 let duration_since_epoch = (now - epoch).as_millis() as u64;
328
329 let mut extensions = heartbeat_request.extensions.clone();
330 let gc_stat = gc_limiter.gc_stat();
331 gc_stat.into_extensions(&mut extensions);
332
333 let mut req = HeartbeatRequest {
334 region_stats,
335 topic_stats,
336 duration_since_epoch,
337 extensions,
338 ..heartbeat_request.clone()
339 };
340
341 if let Some(info) = req.info.as_mut() {
342 info.cpu_usage_millicores = resource_stat.get_cpu_usage_millicores();
343 info.memory_usage_bytes = resource_stat.get_memory_usage_bytes();
344 }
345
346 sleep.as_mut().reset(now + Duration::from_millis(interval));
347 Some(req)
348 }
349 _ = quit_signal.notified() => {
351 let req = HeartbeatRequest::default();
352 Some(req)
353 }
354 };
355 if let Some(req) = req {
356 metrics::LAST_SENT_HEARTBEAT_ELAPSED
357 .set(last_sent.elapsed().as_millis() as i64);
358 last_sent = Instant::now();
360 debug!("Sending heartbeat request: {:?}", req);
361 if let Err(e) = tx.send(req).await {
362 error!(e; "Failed to send heartbeat to metasrv");
363 match Self::create_streams(
364 &meta_client,
365 running.clone(),
366 handler_executor.clone(),
367 mailbox.clone(),
368 None,
369 quit_signal.clone(),
370 )
371 .await
372 {
373 Ok((new_tx, new_config)) => {
374 info!("Reconnected to metasrv, heartbeat config: {}", new_config);
375 tx = new_tx;
376 retry_interval = new_config.retry_interval;
378 region_alive_keeper.update_heartbeat_interval(
380 new_config.interval.as_millis() as u64,
381 );
382 sleep.as_mut().reset(Instant::now());
384 }
385 Err(e) => {
386 sleep.as_mut().reset(Instant::now() + retry_interval);
390 error!(e; "Failed to reconnect to metasrv!");
391 }
392 }
393 } else {
394 HEARTBEAT_SENT_COUNT.inc();
395 }
396 }
397 }
398 });
399
400 Ok(())
401 }
402
403 fn load_region_stats(region_server: &RegionServer) -> Vec<RegionStat> {
404 region_server
405 .reportable_regions()
406 .into_iter()
407 .map(|stat| {
408 let region_stat = region_server
409 .region_statistic(stat.region_id)
410 .unwrap_or_default();
411 let mut extensions = HashMap::new();
412 if let Some(serialized) = region_stat.serialize_to_vec() {
413 extensions.insert(REGION_STATISTIC_KEY.to_string(), serialized);
414 }
415
416 RegionStat {
417 region_id: stat.region_id.as_u64(),
418 engine: stat.engine,
419 role: RegionRole::from(stat.role).into(),
420 rcus: 0,
422 wcus: 0,
423 approximate_bytes: region_stat.estimated_disk_size() as i64,
424 extensions,
425 }
426 })
427 .collect()
428 }
429
430 pub fn close(&self) -> Result<()> {
431 let running = self.running.clone();
432 if running
433 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
434 .is_err()
435 {
436 warn!("Call close heartbeat task multiple times");
437 }
438
439 Ok(())
440 }
441}