1use std::collections::HashMap;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicBool, Ordering};
18use std::time::Duration;
19
20use api::v1::meta::heartbeat_request::NodeWorkloads;
21use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat};
22use common_base::Plugins;
23use common_config::utils::ResourceSpec;
24use common_meta::cache_invalidator::CacheInvalidatorRef;
25use common_meta::datanode::REGION_STATISTIC_KEY;
26use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
27use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
28use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
29use common_meta::heartbeat::handler::{
30 HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
31};
32use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef};
33use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
34use common_telemetry::{debug, error, info, trace, warn};
35use common_workload::DatanodeWorkloadType;
36use meta_client::MetaClientRef;
37use meta_client::client::{HeartbeatSender, MetaClient};
38use servers::addrs;
39use snafu::ResultExt;
40use tokio::sync::{Notify, mpsc};
41use tokio::time::Instant;
42
43use self::handler::RegionHeartbeatResponseHandler;
44use crate::alive_keeper::{CountdownTaskHandlerExtRef, RegionAliveKeeper};
45use crate::config::DatanodeOptions;
46use crate::error::{self, MetaClientInitSnafu, Result};
47use crate::event_listener::RegionServerEventReceiver;
48use crate::metrics::{self, HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
49use crate::region_server::RegionServer;
50
51pub(crate) mod handler;
52pub(crate) mod task_tracker;
53
54pub struct HeartbeatTask {
56 node_id: u64,
57 workload_types: Vec<DatanodeWorkloadType>,
58 node_epoch: u64,
59 peer_addr: String,
60 running: Arc<AtomicBool>,
61 meta_client: MetaClientRef,
62 region_server: RegionServer,
63 interval: u64,
64 resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
65 region_alive_keeper: Arc<RegionAliveKeeper>,
66 resource_spec: ResourceSpec,
67}
68
69impl Drop for HeartbeatTask {
70 fn drop(&mut self) {
71 self.running.store(false, Ordering::Release);
72 }
73}
74
75impl HeartbeatTask {
76 pub async fn try_new(
78 opts: &DatanodeOptions,
79 region_server: RegionServer,
80 meta_client: MetaClientRef,
81 cache_invalidator: CacheInvalidatorRef,
82 plugins: Plugins,
83 ) -> Result<Self> {
84 let countdown_task_handler_ext = plugins.get::<CountdownTaskHandlerExtRef>();
85 let region_alive_keeper = Arc::new(RegionAliveKeeper::new(
86 region_server.clone(),
87 countdown_task_handler_ext,
88 opts.heartbeat.interval.as_millis() as u64,
89 ));
90 let resp_handler_executor = Arc::new(HandlerGroupExecutor::new(vec![
91 region_alive_keeper.clone(),
92 Arc::new(ParseMailboxMessageHandler),
93 Arc::new(RegionHeartbeatResponseHandler::new(region_server.clone())),
94 Arc::new(InvalidateCacheHandler::new(cache_invalidator)),
95 ]));
96
97 Ok(Self {
98 node_id: opts.node_id.unwrap_or(0),
99 workload_types: opts.workload_types.clone(),
100 node_epoch: common_time::util::current_time_millis() as u64,
102 peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
103 running: Arc::new(AtomicBool::new(false)),
104 meta_client,
105 region_server,
106 interval: opts.heartbeat.interval.as_millis() as u64,
107 resp_handler_executor,
108 region_alive_keeper,
109 resource_spec: Default::default(),
110 })
111 }
112
113 pub async fn create_streams(
114 meta_client: &MetaClient,
115 running: Arc<AtomicBool>,
116 handler_executor: HeartbeatResponseHandlerExecutorRef,
117 mailbox: MailboxRef,
118 mut notify: Option<Arc<Notify>>,
119 quit_signal: Arc<Notify>,
120 ) -> Result<HeartbeatSender> {
121 let client_id = meta_client.id();
122 let (tx, mut rx) = meta_client.heartbeat().await.context(MetaClientInitSnafu)?;
123
124 let mut last_received_lease = Instant::now();
125
126 let _handle = common_runtime::spawn_hb(async move {
127 while let Some(res) = rx.message().await.unwrap_or_else(|e| {
128 error!(e; "Error while reading heartbeat response");
129 None
130 }) {
131 if let Some(msg) = res.mailbox_message.as_ref() {
132 info!("Received mailbox message: {msg:?}, meta_client id: {client_id:?}");
133 }
134 if let Some(lease) = res.region_lease.as_ref() {
135 metrics::LAST_RECEIVED_HEARTBEAT_ELAPSED
136 .set(last_received_lease.elapsed().as_millis() as i64);
137 last_received_lease = Instant::now();
139
140 let mut leader_region_lease_count = 0;
141 let mut follower_region_lease_count = 0;
142 for lease in &lease.regions {
143 match lease.role() {
144 RegionRole::Leader | RegionRole::DowngradingLeader => {
145 leader_region_lease_count += 1
146 }
147 RegionRole::Follower => follower_region_lease_count += 1,
148 }
149 }
150
151 metrics::HEARTBEAT_REGION_LEASES
152 .with_label_values(&["leader"])
153 .set(leader_region_lease_count);
154 metrics::HEARTBEAT_REGION_LEASES
155 .with_label_values(&["follower"])
156 .set(follower_region_lease_count);
157 }
158 let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), res);
159 if let Err(e) = Self::handle_response(ctx, handler_executor.clone()).await {
160 error!(e; "Error while handling heartbeat response");
161 }
162 if let Some(notify) = notify.take() {
163 notify.notify_one();
164 }
165 if !running.load(Ordering::Acquire) {
166 info!("Heartbeat task shutdown");
167 }
168 }
169 quit_signal.notify_one();
170 info!("Heartbeat handling loop exit.");
171 });
172 Ok(tx)
173 }
174
175 async fn handle_response(
176 ctx: HeartbeatResponseHandlerContext,
177 handler_executor: HeartbeatResponseHandlerExecutorRef,
178 ) -> Result<()> {
179 trace!("Heartbeat response: {:?}", ctx.response);
180 handler_executor
181 .handle(ctx)
182 .await
183 .context(error::HandleHeartbeatResponseSnafu)
184 }
185
186 pub async fn start(
188 &self,
189 event_receiver: RegionServerEventReceiver,
190 notify: Option<Arc<Notify>>,
191 ) -> Result<()> {
192 let running = self.running.clone();
193 if running
194 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
195 .is_err()
196 {
197 warn!("Heartbeat task started multiple times");
198 return Ok(());
199 }
200 let interval = self.interval;
201 let node_id = self.node_id;
202 let node_epoch = self.node_epoch;
203 let addr = &self.peer_addr;
204 info!(
205 "Starting heartbeat to Metasrv with interval {interval}. My node id is {node_id}, address is {addr}."
206 );
207
208 let meta_client = self.meta_client.clone();
209 let region_server_clone = self.region_server.clone();
210
211 let handler_executor = self.resp_handler_executor.clone();
212
213 let (outgoing_tx, mut outgoing_rx) = mpsc::channel(16);
214 let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
215
216 let quit_signal = Arc::new(Notify::new());
217
218 let mut tx = Self::create_streams(
219 &meta_client,
220 running.clone(),
221 handler_executor.clone(),
222 mailbox.clone(),
223 notify,
224 quit_signal.clone(),
225 )
226 .await?;
227
228 let self_peer = Some(Peer {
229 id: node_id,
230 addr: addr.clone(),
231 });
232 let epoch = self.region_alive_keeper.epoch();
233 let workload_types = self.workload_types.clone();
234
235 self.region_alive_keeper.start(Some(event_receiver)).await?;
236 let mut last_sent = Instant::now();
237 let cpus = self.resource_spec.cpus as u32;
238 let memory_bytes = self.resource_spec.memory.unwrap_or_default().as_bytes();
239
240 common_runtime::spawn_hb(async move {
241 let sleep = tokio::time::sleep(Duration::from_millis(0));
242 tokio::pin!(sleep);
243
244 let build_info = common_version::build_info();
245 let heartbeat_request = HeartbeatRequest {
246 peer: self_peer,
247 node_epoch,
248 info: Some(NodeInfo {
249 version: build_info.version.to_string(),
250 git_commit: build_info.commit_short.to_string(),
251 start_time_ms: node_epoch,
252 cpus,
253 memory_bytes,
254 }),
255 node_workloads: Some(NodeWorkloads::Datanode(DatanodeWorkloads {
256 types: workload_types.iter().map(|w| w.to_i32()).collect(),
257 })),
258 ..Default::default()
259 };
260
261 loop {
262 if !running.load(Ordering::Relaxed) {
263 info!("shutdown heartbeat task");
264 break;
265 }
266 let req = tokio::select! {
267 message = outgoing_rx.recv() => {
268 if let Some(message) = message {
269 match outgoing_message_to_mailbox_message(message) {
270 Ok(message) => {
271 let req = HeartbeatRequest {
272 mailbox_message: Some(message),
273 ..heartbeat_request.clone()
274 };
275 HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
276 Some(req)
277 }
278 Err(e) => {
279 error!(e; "Failed to encode mailbox messages!");
280 HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
281 None
282 }
283 }
284 } else {
285 None
286 }
287 }
288 _ = &mut sleep => {
289 let region_stats = Self::load_region_stats(®ion_server_clone);
290 let topic_stats = region_server_clone.topic_stats();
291 let now = Instant::now();
292 let duration_since_epoch = (now - epoch).as_millis() as u64;
293 let req = HeartbeatRequest {
294 region_stats,
295 topic_stats,
296 duration_since_epoch,
297 ..heartbeat_request.clone()
298 };
299 sleep.as_mut().reset(now + Duration::from_millis(interval));
300 Some(req)
301 }
302 _ = quit_signal.notified() => {
304 let req = HeartbeatRequest::default();
305 Some(req)
306 }
307 };
308 if let Some(req) = req {
309 metrics::LAST_SENT_HEARTBEAT_ELAPSED
310 .set(last_sent.elapsed().as_millis() as i64);
311 last_sent = Instant::now();
313 debug!("Sending heartbeat request: {:?}", req);
314 if let Err(e) = tx.send(req).await {
315 error!(e; "Failed to send heartbeat to metasrv");
316 match Self::create_streams(
317 &meta_client,
318 running.clone(),
319 handler_executor.clone(),
320 mailbox.clone(),
321 None,
322 quit_signal.clone(),
323 )
324 .await
325 {
326 Ok(new_tx) => {
327 info!("Reconnected to metasrv");
328 tx = new_tx;
329 sleep.as_mut().reset(Instant::now());
331 }
332 Err(e) => {
333 sleep.as_mut().reset(
337 Instant::now()
338 + Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS),
339 );
340 error!(e; "Failed to reconnect to metasrv!");
341 }
342 }
343 } else {
344 HEARTBEAT_SENT_COUNT.inc();
345 }
346 }
347 }
348 });
349
350 Ok(())
351 }
352
353 fn load_region_stats(region_server: &RegionServer) -> Vec<RegionStat> {
354 region_server
355 .reportable_regions()
356 .into_iter()
357 .map(|stat| {
358 let region_stat = region_server
359 .region_statistic(stat.region_id)
360 .unwrap_or_default();
361 let mut extensions = HashMap::new();
362 if let Some(serialized) = region_stat.serialize_to_vec() {
363 extensions.insert(REGION_STATISTIC_KEY.to_string(), serialized);
364 }
365
366 RegionStat {
367 region_id: stat.region_id.as_u64(),
368 engine: stat.engine,
369 role: RegionRole::from(stat.role).into(),
370 rcus: 0,
372 wcus: 0,
373 approximate_bytes: region_stat.estimated_disk_size() as i64,
374 extensions,
375 }
376 })
377 .collect()
378 }
379
380 pub fn close(&self) -> Result<()> {
381 let running = self.running.clone();
382 if running
383 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
384 .is_err()
385 {
386 warn!("Call close heartbeat task multiple times");
387 }
388
389 Ok(())
390 }
391}