datanode/
heartbeat.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicBool, Ordering};
18use std::time::Duration;
19
20use api::v1::meta::heartbeat_request::NodeWorkloads;
21use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat};
22use common_base::Plugins;
23use common_meta::cache_invalidator::CacheInvalidatorRef;
24use common_meta::datanode::REGION_STATISTIC_KEY;
25use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
26use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
27use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
28use common_meta::heartbeat::handler::{
29    HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
30};
31use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef};
32use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
33use common_stat::ResourceStatRef;
34use common_telemetry::{debug, error, info, trace, warn};
35use common_workload::DatanodeWorkloadType;
36use meta_client::MetaClientRef;
37use meta_client::client::{HeartbeatSender, MetaClient};
38use servers::addrs;
39use snafu::{OptionExt as _, ResultExt};
40use tokio::sync::{Notify, mpsc};
41use tokio::time::Instant;
42
43use self::handler::RegionHeartbeatResponseHandler;
44use crate::alive_keeper::{CountdownTaskHandlerExtRef, RegionAliveKeeper};
45use crate::config::DatanodeOptions;
46use crate::error::{self, MetaClientInitSnafu, RegionEngineNotFoundSnafu, Result};
47use crate::event_listener::RegionServerEventReceiver;
48use crate::metrics::{self, HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
49use crate::region_server::RegionServer;
50
51pub(crate) mod handler;
52pub(crate) mod task_tracker;
53
54/// The datanode heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background.
55pub struct HeartbeatTask {
56    node_id: u64,
57    workload_types: Vec<DatanodeWorkloadType>,
58    node_epoch: u64,
59    peer_addr: String,
60    running: Arc<AtomicBool>,
61    meta_client: MetaClientRef,
62    region_server: RegionServer,
63    interval: u64,
64    resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
65    region_alive_keeper: Arc<RegionAliveKeeper>,
66    resource_stat: ResourceStatRef,
67}
68
69impl Drop for HeartbeatTask {
70    fn drop(&mut self) {
71        self.running.store(false, Ordering::Release);
72    }
73}
74
75impl HeartbeatTask {
76    /// Create a new heartbeat task instance.
77    pub async fn try_new(
78        opts: &DatanodeOptions,
79        region_server: RegionServer,
80        meta_client: MetaClientRef,
81        cache_invalidator: CacheInvalidatorRef,
82        plugins: Plugins,
83        resource_stat: ResourceStatRef,
84    ) -> Result<Self> {
85        let countdown_task_handler_ext = plugins.get::<CountdownTaskHandlerExtRef>();
86        let region_alive_keeper = Arc::new(RegionAliveKeeper::new(
87            region_server.clone(),
88            countdown_task_handler_ext,
89            opts.heartbeat.interval.as_millis() as u64,
90        ));
91        let resp_handler_executor = Arc::new(HandlerGroupExecutor::new(vec![
92            region_alive_keeper.clone(),
93            Arc::new(ParseMailboxMessageHandler),
94            Arc::new(
95                RegionHeartbeatResponseHandler::new(region_server.clone())
96                    .with_open_region_parallelism(opts.init_regions_parallelism),
97            ),
98            Arc::new(InvalidateCacheHandler::new(cache_invalidator)),
99        ]));
100
101        Ok(Self {
102            node_id: opts.node_id.unwrap_or(0),
103            workload_types: opts.workload_types.clone(),
104            // We use datanode's start time millis as the node's epoch.
105            node_epoch: common_time::util::current_time_millis() as u64,
106            peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
107            running: Arc::new(AtomicBool::new(false)),
108            meta_client,
109            region_server,
110            interval: opts.heartbeat.interval.as_millis() as u64,
111            resp_handler_executor,
112            region_alive_keeper,
113            resource_stat,
114        })
115    }
116
117    pub async fn create_streams(
118        meta_client: &MetaClient,
119        running: Arc<AtomicBool>,
120        handler_executor: HeartbeatResponseHandlerExecutorRef,
121        mailbox: MailboxRef,
122        mut notify: Option<Arc<Notify>>,
123        quit_signal: Arc<Notify>,
124    ) -> Result<HeartbeatSender> {
125        let client_id = meta_client.id();
126        let (tx, mut rx) = meta_client.heartbeat().await.context(MetaClientInitSnafu)?;
127
128        let mut last_received_lease = Instant::now();
129
130        let _handle = common_runtime::spawn_hb(async move {
131            while let Some(res) = rx.message().await.unwrap_or_else(|e| {
132                error!(e; "Error while reading heartbeat response");
133                None
134            }) {
135                if let Some(msg) = res.mailbox_message.as_ref() {
136                    info!("Received mailbox message: {msg:?}, meta_client id: {client_id:?}");
137                }
138                if let Some(lease) = res.region_lease.as_ref() {
139                    metrics::LAST_RECEIVED_HEARTBEAT_ELAPSED
140                        .set(last_received_lease.elapsed().as_millis() as i64);
141                    // Resets the timer.
142                    last_received_lease = Instant::now();
143
144                    let mut leader_region_lease_count = 0;
145                    let mut follower_region_lease_count = 0;
146                    for lease in &lease.regions {
147                        match lease.role() {
148                            RegionRole::Leader | RegionRole::DowngradingLeader => {
149                                leader_region_lease_count += 1
150                            }
151                            RegionRole::Follower => follower_region_lease_count += 1,
152                        }
153                    }
154
155                    metrics::HEARTBEAT_REGION_LEASES
156                        .with_label_values(&["leader"])
157                        .set(leader_region_lease_count);
158                    metrics::HEARTBEAT_REGION_LEASES
159                        .with_label_values(&["follower"])
160                        .set(follower_region_lease_count);
161                }
162                let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), res);
163                if let Err(e) = Self::handle_response(ctx, handler_executor.clone()).await {
164                    error!(e; "Error while handling heartbeat response");
165                }
166                if let Some(notify) = notify.take() {
167                    notify.notify_one();
168                }
169                if !running.load(Ordering::Acquire) {
170                    info!("Heartbeat task shutdown");
171                }
172            }
173            quit_signal.notify_one();
174            info!("Heartbeat handling loop exit.");
175        });
176        Ok(tx)
177    }
178
179    async fn handle_response(
180        ctx: HeartbeatResponseHandlerContext,
181        handler_executor: HeartbeatResponseHandlerExecutorRef,
182    ) -> Result<()> {
183        trace!("Heartbeat response: {:?}", ctx.response);
184        handler_executor
185            .handle(ctx)
186            .await
187            .context(error::HandleHeartbeatResponseSnafu)
188    }
189
190    #[allow(deprecated)]
191    /// Start heartbeat task, spawn background task.
192    pub async fn start(
193        &self,
194        event_receiver: RegionServerEventReceiver,
195        notify: Option<Arc<Notify>>,
196    ) -> Result<()> {
197        let running = self.running.clone();
198        if running
199            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
200            .is_err()
201        {
202            warn!("Heartbeat task started multiple times");
203            return Ok(());
204        }
205        let interval = self.interval;
206        let node_id = self.node_id;
207        let node_epoch = self.node_epoch;
208        let addr = &self.peer_addr;
209        info!(
210            "Starting heartbeat to Metasrv with interval {interval}. My node id is {node_id}, address is {addr}."
211        );
212
213        let meta_client = self.meta_client.clone();
214        let region_server_clone = self.region_server.clone();
215
216        let handler_executor = self.resp_handler_executor.clone();
217
218        let (outgoing_tx, mut outgoing_rx) = mpsc::channel(16);
219        let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
220
221        let quit_signal = Arc::new(Notify::new());
222
223        let mut tx = Self::create_streams(
224            &meta_client,
225            running.clone(),
226            handler_executor.clone(),
227            mailbox.clone(),
228            notify,
229            quit_signal.clone(),
230        )
231        .await?;
232
233        let self_peer = Some(Peer {
234            id: node_id,
235            addr: addr.clone(),
236        });
237        let epoch = self.region_alive_keeper.epoch();
238        let workload_types = self.workload_types.clone();
239
240        self.region_alive_keeper.start(Some(event_receiver)).await?;
241        let mut last_sent = Instant::now();
242        let total_cpu_millicores = self.resource_stat.get_total_cpu_millicores();
243        let total_memory_bytes = self.resource_stat.get_total_memory_bytes();
244        let resource_stat = self.resource_stat.clone();
245        let gc_limiter = self
246            .region_server
247            .mito_engine()
248            .context(RegionEngineNotFoundSnafu { name: "mito" })?
249            .gc_limiter();
250
251        common_runtime::spawn_hb(async move {
252            let sleep = tokio::time::sleep(Duration::from_millis(0));
253            tokio::pin!(sleep);
254
255            let build_info = common_version::build_info();
256
257            let heartbeat_request = HeartbeatRequest {
258                peer: self_peer,
259                node_epoch,
260                info: Some(NodeInfo {
261                    version: build_info.version.to_string(),
262                    git_commit: build_info.commit_short.to_string(),
263                    start_time_ms: node_epoch,
264                    total_cpu_millicores,
265                    total_memory_bytes,
266                    cpu_usage_millicores: 0,
267                    memory_usage_bytes: 0,
268                    // TODO(zyy17): Remove these deprecated fields when the deprecated fields are removed from the proto.
269                    cpus: total_cpu_millicores as u32,
270                    memory_bytes: total_memory_bytes as u64,
271                    hostname: hostname::get()
272                        .unwrap_or_default()
273                        .to_string_lossy()
274                        .to_string(),
275                }),
276                node_workloads: Some(NodeWorkloads::Datanode(DatanodeWorkloads {
277                    types: workload_types.iter().map(|w| w.to_i32()).collect(),
278                })),
279                ..Default::default()
280            };
281
282            loop {
283                if !running.load(Ordering::Relaxed) {
284                    info!("shutdown heartbeat task");
285                    break;
286                }
287                let req = tokio::select! {
288                    message = outgoing_rx.recv() => {
289                        if let Some(message) = message {
290                            match outgoing_message_to_mailbox_message(message) {
291                                Ok(message) => {
292                                    let mut extensions = heartbeat_request.extensions.clone();
293                                    let gc_stat = gc_limiter.gc_stat();
294                                    gc_stat.into_extensions(&mut extensions);
295
296                                    let req = HeartbeatRequest {
297                                        mailbox_message: Some(message),
298                                        extensions,
299                                        ..heartbeat_request.clone()
300                                    };
301                                    HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
302                                    Some(req)
303                                }
304                                Err(e) => {
305                                    error!(e; "Failed to encode mailbox messages!");
306                                    HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
307                                    None
308                                }
309                            }
310                        } else {
311                            None
312                        }
313                    }
314                    _ = &mut sleep => {
315                        let region_stats = Self::load_region_stats(&region_server_clone);
316                        let topic_stats = region_server_clone.topic_stats();
317                        let now = Instant::now();
318                        let duration_since_epoch = (now - epoch).as_millis() as u64;
319
320                        let mut extensions = heartbeat_request.extensions.clone();
321                        let gc_stat = gc_limiter.gc_stat();
322                        gc_stat.into_extensions(&mut extensions);
323
324                        let mut req = HeartbeatRequest {
325                            region_stats,
326                            topic_stats,
327                            duration_since_epoch,
328                            extensions,
329                            ..heartbeat_request.clone()
330                        };
331
332                        if let Some(info) = req.info.as_mut() {
333                            info.cpu_usage_millicores = resource_stat.get_cpu_usage_millicores();
334                            info.memory_usage_bytes = resource_stat.get_memory_usage_bytes();
335                        }
336
337                        sleep.as_mut().reset(now + Duration::from_millis(interval));
338                        Some(req)
339                    }
340                    // If the heartbeat stream is broken, send a dummy heartbeat request to re-create the heartbeat stream.
341                    _ = quit_signal.notified() => {
342                        let req = HeartbeatRequest::default();
343                        Some(req)
344                    }
345                };
346                if let Some(req) = req {
347                    metrics::LAST_SENT_HEARTBEAT_ELAPSED
348                        .set(last_sent.elapsed().as_millis() as i64);
349                    // Resets the timer.
350                    last_sent = Instant::now();
351                    debug!("Sending heartbeat request: {:?}", req);
352                    if let Err(e) = tx.send(req).await {
353                        error!(e; "Failed to send heartbeat to metasrv");
354                        match Self::create_streams(
355                            &meta_client,
356                            running.clone(),
357                            handler_executor.clone(),
358                            mailbox.clone(),
359                            None,
360                            quit_signal.clone(),
361                        )
362                        .await
363                        {
364                            Ok(new_tx) => {
365                                info!("Reconnected to metasrv");
366                                tx = new_tx;
367                                // Triggers to send heartbeat immediately.
368                                sleep.as_mut().reset(Instant::now());
369                            }
370                            Err(e) => {
371                                // Before the META_LEASE_SECS expires,
372                                // any retries are meaningless, it always reads the old meta leader address.
373                                // Triggers to retry after META_KEEP_ALIVE_INTERVAL_SECS.
374                                sleep.as_mut().reset(
375                                    Instant::now()
376                                        + Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS),
377                                );
378                                error!(e; "Failed to reconnect to metasrv!");
379                            }
380                        }
381                    } else {
382                        HEARTBEAT_SENT_COUNT.inc();
383                    }
384                }
385            }
386        });
387
388        Ok(())
389    }
390
391    fn load_region_stats(region_server: &RegionServer) -> Vec<RegionStat> {
392        region_server
393            .reportable_regions()
394            .into_iter()
395            .map(|stat| {
396                let region_stat = region_server
397                    .region_statistic(stat.region_id)
398                    .unwrap_or_default();
399                let mut extensions = HashMap::new();
400                if let Some(serialized) = region_stat.serialize_to_vec() {
401                    extensions.insert(REGION_STATISTIC_KEY.to_string(), serialized);
402                }
403
404                RegionStat {
405                    region_id: stat.region_id.as_u64(),
406                    engine: stat.engine,
407                    role: RegionRole::from(stat.role).into(),
408                    // TODO(weny): w/rcus
409                    rcus: 0,
410                    wcus: 0,
411                    approximate_bytes: region_stat.estimated_disk_size() as i64,
412                    extensions,
413                }
414            })
415            .collect()
416    }
417
418    pub fn close(&self) -> Result<()> {
419        let running = self.running.clone();
420        if running
421            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
422            .is_err()
423        {
424            warn!("Call close heartbeat task multiple times");
425        }
426
427        Ok(())
428    }
429}