1use std::collections::HashMap;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicBool, Ordering};
18use std::time::Duration;
19
20use api::v1::meta::heartbeat_request::NodeWorkloads;
21use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat};
22use common_base::Plugins;
23use common_meta::cache_invalidator::CacheInvalidatorRef;
24use common_meta::datanode::REGION_STATISTIC_KEY;
25use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
26use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
27use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
28use common_meta::heartbeat::handler::suspend::SuspendHandler;
29use common_meta::heartbeat::handler::{
30 HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
31};
32use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef};
33use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
34use common_stat::ResourceStatRef;
35use common_telemetry::{debug, error, info, trace, warn};
36use common_workload::DatanodeWorkloadType;
37use meta_client::MetaClientRef;
38use meta_client::client::{HeartbeatSender, MetaClient};
39use servers::addrs;
40use snafu::{OptionExt as _, ResultExt};
41use tokio::sync::{Notify, mpsc};
42use tokio::time::Instant;
43
44use self::handler::RegionHeartbeatResponseHandler;
45use crate::alive_keeper::{CountdownTaskHandlerExtRef, RegionAliveKeeper};
46use crate::config::DatanodeOptions;
47use crate::error::{self, MetaClientInitSnafu, RegionEngineNotFoundSnafu, Result};
48use crate::event_listener::RegionServerEventReceiver;
49use crate::metrics::{self, HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
50use crate::region_server::RegionServer;
51
52pub(crate) mod handler;
53pub(crate) mod task_tracker;
54
55pub struct HeartbeatTask {
57 node_id: u64,
58 workload_types: Vec<DatanodeWorkloadType>,
59 node_epoch: u64,
60 peer_addr: String,
61 running: Arc<AtomicBool>,
62 meta_client: MetaClientRef,
63 region_server: RegionServer,
64 interval: u64,
65 resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
66 region_alive_keeper: Arc<RegionAliveKeeper>,
67 resource_stat: ResourceStatRef,
68}
69
70impl Drop for HeartbeatTask {
71 fn drop(&mut self) {
72 self.running.store(false, Ordering::Release);
73 }
74}
75
76impl HeartbeatTask {
77 pub async fn try_new(
79 opts: &DatanodeOptions,
80 region_server: RegionServer,
81 meta_client: MetaClientRef,
82 cache_invalidator: CacheInvalidatorRef,
83 plugins: Plugins,
84 resource_stat: ResourceStatRef,
85 ) -> Result<Self> {
86 let countdown_task_handler_ext = plugins.get::<CountdownTaskHandlerExtRef>();
87 let region_alive_keeper = Arc::new(RegionAliveKeeper::new(
88 region_server.clone(),
89 countdown_task_handler_ext,
90 opts.heartbeat.interval.as_millis() as u64,
91 ));
92 let resp_handler_executor = Arc::new(HandlerGroupExecutor::new(vec![
93 region_alive_keeper.clone(),
94 Arc::new(ParseMailboxMessageHandler),
95 Arc::new(SuspendHandler::new(region_server.suspend_state())),
96 Arc::new(
97 RegionHeartbeatResponseHandler::new(region_server.clone())
98 .with_open_region_parallelism(opts.init_regions_parallelism),
99 ),
100 Arc::new(InvalidateCacheHandler::new(cache_invalidator)),
101 ]));
102
103 Ok(Self {
104 node_id: opts.node_id.unwrap_or(0),
105 workload_types: opts.workload_types.clone(),
106 node_epoch: common_time::util::current_time_millis() as u64,
108 peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
109 running: Arc::new(AtomicBool::new(false)),
110 meta_client,
111 region_server,
112 interval: opts.heartbeat.interval.as_millis() as u64,
113 resp_handler_executor,
114 region_alive_keeper,
115 resource_stat,
116 })
117 }
118
119 pub async fn create_streams(
120 meta_client: &MetaClient,
121 running: Arc<AtomicBool>,
122 handler_executor: HeartbeatResponseHandlerExecutorRef,
123 mailbox: MailboxRef,
124 mut notify: Option<Arc<Notify>>,
125 quit_signal: Arc<Notify>,
126 ) -> Result<HeartbeatSender> {
127 let client_id = meta_client.id();
128 let (tx, mut rx) = meta_client.heartbeat().await.context(MetaClientInitSnafu)?;
129
130 let mut last_received_lease = Instant::now();
131
132 let _handle = common_runtime::spawn_hb(async move {
133 while let Some(res) = rx.message().await.unwrap_or_else(|e| {
134 error!(e; "Error while reading heartbeat response");
135 None
136 }) {
137 if let Some(msg) = res.mailbox_message.as_ref() {
138 info!("Received mailbox message: {msg:?}, meta_client id: {client_id:?}");
139 }
140 if let Some(lease) = res.region_lease.as_ref() {
141 metrics::LAST_RECEIVED_HEARTBEAT_ELAPSED
142 .set(last_received_lease.elapsed().as_millis() as i64);
143 last_received_lease = Instant::now();
145
146 let mut leader_region_lease_count = 0;
147 let mut follower_region_lease_count = 0;
148 for lease in &lease.regions {
149 match lease.role() {
150 RegionRole::Leader | RegionRole::DowngradingLeader => {
151 leader_region_lease_count += 1
152 }
153 RegionRole::Follower => follower_region_lease_count += 1,
154 }
155 }
156
157 metrics::HEARTBEAT_REGION_LEASES
158 .with_label_values(&["leader"])
159 .set(leader_region_lease_count);
160 metrics::HEARTBEAT_REGION_LEASES
161 .with_label_values(&["follower"])
162 .set(follower_region_lease_count);
163 }
164 let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), res);
165 if let Err(e) = Self::handle_response(ctx, handler_executor.clone()).await {
166 error!(e; "Error while handling heartbeat response");
167 }
168 if let Some(notify) = notify.take() {
169 notify.notify_one();
170 }
171 if !running.load(Ordering::Acquire) {
172 info!("Heartbeat task shutdown");
173 }
174 }
175 quit_signal.notify_one();
176 info!("Heartbeat handling loop exit.");
177 });
178 Ok(tx)
179 }
180
181 async fn handle_response(
182 ctx: HeartbeatResponseHandlerContext,
183 handler_executor: HeartbeatResponseHandlerExecutorRef,
184 ) -> Result<()> {
185 trace!("Heartbeat response: {:?}", ctx.response);
186 handler_executor
187 .handle(ctx)
188 .await
189 .context(error::HandleHeartbeatResponseSnafu)
190 }
191
192 #[allow(deprecated)]
193 pub async fn start(
195 &self,
196 event_receiver: RegionServerEventReceiver,
197 notify: Option<Arc<Notify>>,
198 ) -> Result<()> {
199 let running = self.running.clone();
200 if running
201 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
202 .is_err()
203 {
204 warn!("Heartbeat task started multiple times");
205 return Ok(());
206 }
207 let interval = self.interval;
208 let node_id = self.node_id;
209 let node_epoch = self.node_epoch;
210 let addr = &self.peer_addr;
211 info!(
212 "Starting heartbeat to Metasrv with interval {interval}. My node id is {node_id}, address is {addr}."
213 );
214
215 let meta_client = self.meta_client.clone();
216 let region_server_clone = self.region_server.clone();
217
218 let handler_executor = self.resp_handler_executor.clone();
219
220 let (outgoing_tx, mut outgoing_rx) = mpsc::channel(16);
221 let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
222
223 let quit_signal = Arc::new(Notify::new());
224
225 let mut tx = Self::create_streams(
226 &meta_client,
227 running.clone(),
228 handler_executor.clone(),
229 mailbox.clone(),
230 notify,
231 quit_signal.clone(),
232 )
233 .await?;
234
235 let self_peer = Some(Peer {
236 id: node_id,
237 addr: addr.clone(),
238 });
239 let epoch = self.region_alive_keeper.epoch();
240 let workload_types = self.workload_types.clone();
241
242 self.region_alive_keeper.start(Some(event_receiver)).await?;
243 let mut last_sent = Instant::now();
244 let total_cpu_millicores = self.resource_stat.get_total_cpu_millicores();
245 let total_memory_bytes = self.resource_stat.get_total_memory_bytes();
246 let resource_stat = self.resource_stat.clone();
247 let gc_limiter = self
248 .region_server
249 .mito_engine()
250 .context(RegionEngineNotFoundSnafu { name: "mito" })?
251 .gc_limiter();
252
253 common_runtime::spawn_hb(async move {
254 let sleep = tokio::time::sleep(Duration::from_millis(0));
255 tokio::pin!(sleep);
256
257 let build_info = common_version::build_info();
258
259 let heartbeat_request = HeartbeatRequest {
260 peer: self_peer,
261 node_epoch,
262 info: Some(NodeInfo {
263 version: build_info.version.to_string(),
264 git_commit: build_info.commit_short.to_string(),
265 start_time_ms: node_epoch,
266 total_cpu_millicores,
267 total_memory_bytes,
268 cpu_usage_millicores: 0,
269 memory_usage_bytes: 0,
270 cpus: total_cpu_millicores as u32,
272 memory_bytes: total_memory_bytes as u64,
273 hostname: hostname::get()
274 .unwrap_or_default()
275 .to_string_lossy()
276 .to_string(),
277 }),
278 node_workloads: Some(NodeWorkloads::Datanode(DatanodeWorkloads {
279 types: workload_types.iter().map(|w| w.to_i32()).collect(),
280 })),
281 ..Default::default()
282 };
283
284 loop {
285 if !running.load(Ordering::Relaxed) {
286 info!("shutdown heartbeat task");
287 break;
288 }
289 let req = tokio::select! {
290 message = outgoing_rx.recv() => {
291 if let Some(message) = message {
292 match outgoing_message_to_mailbox_message(message) {
293 Ok(message) => {
294 let mut extensions = heartbeat_request.extensions.clone();
295 let gc_stat = gc_limiter.gc_stat();
296 gc_stat.into_extensions(&mut extensions);
297
298 let req = HeartbeatRequest {
299 mailbox_message: Some(message),
300 extensions,
301 ..heartbeat_request.clone()
302 };
303 HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
304 Some(req)
305 }
306 Err(e) => {
307 error!(e; "Failed to encode mailbox messages!");
308 HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
309 None
310 }
311 }
312 } else {
313 None
314 }
315 }
316 _ = &mut sleep => {
317 let region_stats = Self::load_region_stats(®ion_server_clone);
318 let topic_stats = region_server_clone.topic_stats();
319 let now = Instant::now();
320 let duration_since_epoch = (now - epoch).as_millis() as u64;
321
322 let mut extensions = heartbeat_request.extensions.clone();
323 let gc_stat = gc_limiter.gc_stat();
324 gc_stat.into_extensions(&mut extensions);
325
326 let mut req = HeartbeatRequest {
327 region_stats,
328 topic_stats,
329 duration_since_epoch,
330 extensions,
331 ..heartbeat_request.clone()
332 };
333
334 if let Some(info) = req.info.as_mut() {
335 info.cpu_usage_millicores = resource_stat.get_cpu_usage_millicores();
336 info.memory_usage_bytes = resource_stat.get_memory_usage_bytes();
337 }
338
339 sleep.as_mut().reset(now + Duration::from_millis(interval));
340 Some(req)
341 }
342 _ = quit_signal.notified() => {
344 let req = HeartbeatRequest::default();
345 Some(req)
346 }
347 };
348 if let Some(req) = req {
349 metrics::LAST_SENT_HEARTBEAT_ELAPSED
350 .set(last_sent.elapsed().as_millis() as i64);
351 last_sent = Instant::now();
353 debug!("Sending heartbeat request: {:?}", req);
354 if let Err(e) = tx.send(req).await {
355 error!(e; "Failed to send heartbeat to metasrv");
356 match Self::create_streams(
357 &meta_client,
358 running.clone(),
359 handler_executor.clone(),
360 mailbox.clone(),
361 None,
362 quit_signal.clone(),
363 )
364 .await
365 {
366 Ok(new_tx) => {
367 info!("Reconnected to metasrv");
368 tx = new_tx;
369 sleep.as_mut().reset(Instant::now());
371 }
372 Err(e) => {
373 sleep.as_mut().reset(
377 Instant::now()
378 + Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS),
379 );
380 error!(e; "Failed to reconnect to metasrv!");
381 }
382 }
383 } else {
384 HEARTBEAT_SENT_COUNT.inc();
385 }
386 }
387 }
388 });
389
390 Ok(())
391 }
392
393 fn load_region_stats(region_server: &RegionServer) -> Vec<RegionStat> {
394 region_server
395 .reportable_regions()
396 .into_iter()
397 .map(|stat| {
398 let region_stat = region_server
399 .region_statistic(stat.region_id)
400 .unwrap_or_default();
401 let mut extensions = HashMap::new();
402 if let Some(serialized) = region_stat.serialize_to_vec() {
403 extensions.insert(REGION_STATISTIC_KEY.to_string(), serialized);
404 }
405
406 RegionStat {
407 region_id: stat.region_id.as_u64(),
408 engine: stat.engine,
409 role: RegionRole::from(stat.role).into(),
410 rcus: 0,
412 wcus: 0,
413 approximate_bytes: region_stat.estimated_disk_size() as i64,
414 extensions,
415 }
416 })
417 .collect()
418 }
419
420 pub fn close(&self) -> Result<()> {
421 let running = self.running.clone();
422 if running
423 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
424 .is_err()
425 {
426 warn!("Call close heartbeat task multiple times");
427 }
428
429 Ok(())
430 }
431}