datanode/
heartbeat.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::sync::Arc;
18use std::time::Duration;
19
20use api::v1::meta::heartbeat_request::NodeWorkloads;
21use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat};
22use common_base::Plugins;
23use common_meta::cache_invalidator::CacheInvalidatorRef;
24use common_meta::datanode::REGION_STATISTIC_KEY;
25use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
26use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
27use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
28use common_meta::heartbeat::handler::{
29    HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
30};
31use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef};
32use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
33use common_telemetry::{debug, error, info, trace, warn};
34use common_workload::DatanodeWorkloadType;
35use meta_client::client::{HeartbeatSender, MetaClient};
36use meta_client::MetaClientRef;
37use servers::addrs;
38use snafu::ResultExt;
39use tokio::sync::{mpsc, Notify};
40use tokio::time::Instant;
41
42use self::handler::RegionHeartbeatResponseHandler;
43use crate::alive_keeper::{CountdownTaskHandlerExtRef, RegionAliveKeeper};
44use crate::config::DatanodeOptions;
45use crate::error::{self, MetaClientInitSnafu, Result};
46use crate::event_listener::RegionServerEventReceiver;
47use crate::metrics::{self, HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
48use crate::region_server::RegionServer;
49
50pub(crate) mod handler;
51pub(crate) mod task_tracker;
52
53/// The datanode heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background.
54pub struct HeartbeatTask {
55    node_id: u64,
56    workload_types: Vec<DatanodeWorkloadType>,
57    node_epoch: u64,
58    peer_addr: String,
59    running: Arc<AtomicBool>,
60    meta_client: MetaClientRef,
61    region_server: RegionServer,
62    interval: u64,
63    resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
64    region_alive_keeper: Arc<RegionAliveKeeper>,
65}
66
67impl Drop for HeartbeatTask {
68    fn drop(&mut self) {
69        self.running.store(false, Ordering::Release);
70    }
71}
72
73impl HeartbeatTask {
74    /// Create a new heartbeat task instance.
75    pub async fn try_new(
76        opts: &DatanodeOptions,
77        region_server: RegionServer,
78        meta_client: MetaClientRef,
79        cache_invalidator: CacheInvalidatorRef,
80        plugins: Plugins,
81    ) -> Result<Self> {
82        let countdown_task_handler_ext = plugins.get::<CountdownTaskHandlerExtRef>();
83        let region_alive_keeper = Arc::new(RegionAliveKeeper::new(
84            region_server.clone(),
85            countdown_task_handler_ext,
86            opts.heartbeat.interval.as_millis() as u64,
87        ));
88        let resp_handler_executor = Arc::new(HandlerGroupExecutor::new(vec![
89            region_alive_keeper.clone(),
90            Arc::new(ParseMailboxMessageHandler),
91            Arc::new(RegionHeartbeatResponseHandler::new(region_server.clone())),
92            Arc::new(InvalidateCacheHandler::new(cache_invalidator)),
93        ]));
94
95        Ok(Self {
96            node_id: opts.node_id.unwrap_or(0),
97            workload_types: opts.workload_types.clone(),
98            // We use datanode's start time millis as the node's epoch.
99            node_epoch: common_time::util::current_time_millis() as u64,
100            peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
101            running: Arc::new(AtomicBool::new(false)),
102            meta_client,
103            region_server,
104            interval: opts.heartbeat.interval.as_millis() as u64,
105            resp_handler_executor,
106            region_alive_keeper,
107        })
108    }
109
110    pub async fn create_streams(
111        meta_client: &MetaClient,
112        running: Arc<AtomicBool>,
113        handler_executor: HeartbeatResponseHandlerExecutorRef,
114        mailbox: MailboxRef,
115        mut notify: Option<Arc<Notify>>,
116        quit_signal: Arc<Notify>,
117    ) -> Result<HeartbeatSender> {
118        let client_id = meta_client.id();
119        let (tx, mut rx) = meta_client.heartbeat().await.context(MetaClientInitSnafu)?;
120
121        let mut last_received_lease = Instant::now();
122
123        let _handle = common_runtime::spawn_hb(async move {
124            while let Some(res) = rx.message().await.unwrap_or_else(|e| {
125                error!(e; "Error while reading heartbeat response");
126                None
127            }) {
128                if let Some(msg) = res.mailbox_message.as_ref() {
129                    info!("Received mailbox message: {msg:?}, meta_client id: {client_id:?}");
130                }
131                if let Some(lease) = res.region_lease.as_ref() {
132                    metrics::LAST_RECEIVED_HEARTBEAT_ELAPSED
133                        .set(last_received_lease.elapsed().as_millis() as i64);
134                    // Resets the timer.
135                    last_received_lease = Instant::now();
136
137                    let mut leader_region_lease_count = 0;
138                    let mut follower_region_lease_count = 0;
139                    for lease in &lease.regions {
140                        match lease.role() {
141                            RegionRole::Leader | RegionRole::DowngradingLeader => {
142                                leader_region_lease_count += 1
143                            }
144                            RegionRole::Follower => follower_region_lease_count += 1,
145                        }
146                    }
147
148                    metrics::HEARTBEAT_REGION_LEASES
149                        .with_label_values(&["leader"])
150                        .set(leader_region_lease_count);
151                    metrics::HEARTBEAT_REGION_LEASES
152                        .with_label_values(&["follower"])
153                        .set(follower_region_lease_count);
154                }
155                let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), res);
156                if let Err(e) = Self::handle_response(ctx, handler_executor.clone()).await {
157                    error!(e; "Error while handling heartbeat response");
158                }
159                if let Some(notify) = notify.take() {
160                    notify.notify_one();
161                }
162                if !running.load(Ordering::Acquire) {
163                    info!("Heartbeat task shutdown");
164                }
165            }
166            quit_signal.notify_one();
167            info!("Heartbeat handling loop exit.");
168        });
169        Ok(tx)
170    }
171
172    async fn handle_response(
173        ctx: HeartbeatResponseHandlerContext,
174        handler_executor: HeartbeatResponseHandlerExecutorRef,
175    ) -> Result<()> {
176        trace!("Heartbeat response: {:?}", ctx.response);
177        handler_executor
178            .handle(ctx)
179            .await
180            .context(error::HandleHeartbeatResponseSnafu)
181    }
182
183    /// Start heartbeat task, spawn background task.
184    pub async fn start(
185        &self,
186        event_receiver: RegionServerEventReceiver,
187        notify: Option<Arc<Notify>>,
188    ) -> Result<()> {
189        let running = self.running.clone();
190        if running
191            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
192            .is_err()
193        {
194            warn!("Heartbeat task started multiple times");
195            return Ok(());
196        }
197        let interval = self.interval;
198        let node_id = self.node_id;
199        let node_epoch = self.node_epoch;
200        let addr = &self.peer_addr;
201        info!("Starting heartbeat to Metasrv with interval {interval}. My node id is {node_id}, address is {addr}.");
202
203        let meta_client = self.meta_client.clone();
204        let region_server_clone = self.region_server.clone();
205
206        let handler_executor = self.resp_handler_executor.clone();
207
208        let (outgoing_tx, mut outgoing_rx) = mpsc::channel(16);
209        let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
210
211        let quit_signal = Arc::new(Notify::new());
212
213        let mut tx = Self::create_streams(
214            &meta_client,
215            running.clone(),
216            handler_executor.clone(),
217            mailbox.clone(),
218            notify,
219            quit_signal.clone(),
220        )
221        .await?;
222
223        let self_peer = Some(Peer {
224            id: node_id,
225            addr: addr.clone(),
226        });
227        let epoch = self.region_alive_keeper.epoch();
228        let workload_types = self.workload_types.clone();
229
230        self.region_alive_keeper.start(Some(event_receiver)).await?;
231        let mut last_sent = Instant::now();
232
233        common_runtime::spawn_hb(async move {
234            let sleep = tokio::time::sleep(Duration::from_millis(0));
235            tokio::pin!(sleep);
236
237            let build_info = common_version::build_info();
238            let heartbeat_request = HeartbeatRequest {
239                peer: self_peer,
240                node_epoch,
241                info: Some(NodeInfo {
242                    version: build_info.version.to_string(),
243                    git_commit: build_info.commit_short.to_string(),
244                    start_time_ms: node_epoch,
245                    cpus: num_cpus::get() as u32,
246                }),
247                node_workloads: Some(NodeWorkloads::Datanode(DatanodeWorkloads {
248                    types: workload_types.iter().map(|w| w.to_i32()).collect(),
249                })),
250                ..Default::default()
251            };
252
253            loop {
254                if !running.load(Ordering::Relaxed) {
255                    info!("shutdown heartbeat task");
256                    break;
257                }
258                let req = tokio::select! {
259                    message = outgoing_rx.recv() => {
260                        if let Some(message) = message {
261                            match outgoing_message_to_mailbox_message(message) {
262                                Ok(message) => {
263                                    let req = HeartbeatRequest {
264                                        mailbox_message: Some(message),
265                                        ..heartbeat_request.clone()
266                                    };
267                                    HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
268                                    Some(req)
269                                }
270                                Err(e) => {
271                                    error!(e; "Failed to encode mailbox messages!");
272                                    HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
273                                    None
274                                }
275                            }
276                        } else {
277                            None
278                        }
279                    }
280                    _ = &mut sleep => {
281                        let region_stats = Self::load_region_stats(&region_server_clone);
282                        let topic_stats = region_server_clone.topic_stats();
283                        let now = Instant::now();
284                        let duration_since_epoch = (now - epoch).as_millis() as u64;
285                        let req = HeartbeatRequest {
286                            region_stats,
287                            topic_stats,
288                            duration_since_epoch,
289                            ..heartbeat_request.clone()
290                        };
291                        sleep.as_mut().reset(now + Duration::from_millis(interval));
292                        Some(req)
293                    }
294                    // If the heartbeat stream is broken, send a dummy heartbeat request to re-create the heartbeat stream.
295                    _ = quit_signal.notified() => {
296                        let req = HeartbeatRequest::default();
297                        Some(req)
298                    }
299                };
300                if let Some(req) = req {
301                    metrics::LAST_SENT_HEARTBEAT_ELAPSED
302                        .set(last_sent.elapsed().as_millis() as i64);
303                    // Resets the timer.
304                    last_sent = Instant::now();
305                    debug!("Sending heartbeat request: {:?}", req);
306                    if let Err(e) = tx.send(req).await {
307                        error!(e; "Failed to send heartbeat to metasrv");
308                        match Self::create_streams(
309                            &meta_client,
310                            running.clone(),
311                            handler_executor.clone(),
312                            mailbox.clone(),
313                            None,
314                            quit_signal.clone(),
315                        )
316                        .await
317                        {
318                            Ok(new_tx) => {
319                                info!("Reconnected to metasrv");
320                                tx = new_tx;
321                                // Triggers to send heartbeat immediately.
322                                sleep.as_mut().reset(Instant::now());
323                            }
324                            Err(e) => {
325                                // Before the META_LEASE_SECS expires,
326                                // any retries are meaningless, it always reads the old meta leader address.
327                                // Triggers to retry after META_KEEP_ALIVE_INTERVAL_SECS.
328                                sleep.as_mut().reset(
329                                    Instant::now()
330                                        + Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS),
331                                );
332                                error!(e; "Failed to reconnect to metasrv!");
333                            }
334                        }
335                    } else {
336                        HEARTBEAT_SENT_COUNT.inc();
337                    }
338                }
339            }
340        });
341
342        Ok(())
343    }
344
345    fn load_region_stats(region_server: &RegionServer) -> Vec<RegionStat> {
346        region_server
347            .reportable_regions()
348            .into_iter()
349            .map(|stat| {
350                let region_stat = region_server
351                    .region_statistic(stat.region_id)
352                    .unwrap_or_default();
353                let mut extensions = HashMap::new();
354                if let Some(serialized) = region_stat.serialize_to_vec() {
355                    extensions.insert(REGION_STATISTIC_KEY.to_string(), serialized);
356                }
357
358                RegionStat {
359                    region_id: stat.region_id.as_u64(),
360                    engine: stat.engine,
361                    role: RegionRole::from(stat.role).into(),
362                    // TODO(weny): w/rcus
363                    rcus: 0,
364                    wcus: 0,
365                    approximate_bytes: region_stat.estimated_disk_size() as i64,
366                    extensions,
367                }
368            })
369            .collect()
370    }
371
372    pub fn close(&self) -> Result<()> {
373        let running = self.running.clone();
374        if running
375            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
376            .is_err()
377        {
378            warn!("Call close heartbeat task multiple times");
379        }
380
381        Ok(())
382    }
383}