datanode/
heartbeat.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicBool, Ordering};
18use std::time::Duration;
19
20use api::v1::meta::heartbeat_request::NodeWorkloads;
21use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat};
22use common_base::Plugins;
23use common_config::utils::ResourceSpec;
24use common_meta::cache_invalidator::CacheInvalidatorRef;
25use common_meta::datanode::REGION_STATISTIC_KEY;
26use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
27use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
28use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
29use common_meta::heartbeat::handler::{
30    HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
31};
32use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef};
33use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
34use common_telemetry::{debug, error, info, trace, warn};
35use common_workload::DatanodeWorkloadType;
36use meta_client::MetaClientRef;
37use meta_client::client::{HeartbeatSender, MetaClient};
38use servers::addrs;
39use snafu::ResultExt;
40use tokio::sync::{Notify, mpsc};
41use tokio::time::Instant;
42
43use self::handler::RegionHeartbeatResponseHandler;
44use crate::alive_keeper::{CountdownTaskHandlerExtRef, RegionAliveKeeper};
45use crate::config::DatanodeOptions;
46use crate::error::{self, MetaClientInitSnafu, Result};
47use crate::event_listener::RegionServerEventReceiver;
48use crate::metrics::{self, HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
49use crate::region_server::RegionServer;
50
51pub(crate) mod handler;
52pub(crate) mod task_tracker;
53
54/// The datanode heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background.
55pub struct HeartbeatTask {
56    node_id: u64,
57    workload_types: Vec<DatanodeWorkloadType>,
58    node_epoch: u64,
59    peer_addr: String,
60    running: Arc<AtomicBool>,
61    meta_client: MetaClientRef,
62    region_server: RegionServer,
63    interval: u64,
64    resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
65    region_alive_keeper: Arc<RegionAliveKeeper>,
66    resource_spec: ResourceSpec,
67}
68
69impl Drop for HeartbeatTask {
70    fn drop(&mut self) {
71        self.running.store(false, Ordering::Release);
72    }
73}
74
75impl HeartbeatTask {
76    /// Create a new heartbeat task instance.
77    pub async fn try_new(
78        opts: &DatanodeOptions,
79        region_server: RegionServer,
80        meta_client: MetaClientRef,
81        cache_invalidator: CacheInvalidatorRef,
82        plugins: Plugins,
83    ) -> Result<Self> {
84        let countdown_task_handler_ext = plugins.get::<CountdownTaskHandlerExtRef>();
85        let region_alive_keeper = Arc::new(RegionAliveKeeper::new(
86            region_server.clone(),
87            countdown_task_handler_ext,
88            opts.heartbeat.interval.as_millis() as u64,
89        ));
90        let resp_handler_executor = Arc::new(HandlerGroupExecutor::new(vec![
91            region_alive_keeper.clone(),
92            Arc::new(ParseMailboxMessageHandler),
93            Arc::new(RegionHeartbeatResponseHandler::new(region_server.clone())),
94            Arc::new(InvalidateCacheHandler::new(cache_invalidator)),
95        ]));
96
97        Ok(Self {
98            node_id: opts.node_id.unwrap_or(0),
99            workload_types: opts.workload_types.clone(),
100            // We use datanode's start time millis as the node's epoch.
101            node_epoch: common_time::util::current_time_millis() as u64,
102            peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
103            running: Arc::new(AtomicBool::new(false)),
104            meta_client,
105            region_server,
106            interval: opts.heartbeat.interval.as_millis() as u64,
107            resp_handler_executor,
108            region_alive_keeper,
109            resource_spec: Default::default(),
110        })
111    }
112
113    pub async fn create_streams(
114        meta_client: &MetaClient,
115        running: Arc<AtomicBool>,
116        handler_executor: HeartbeatResponseHandlerExecutorRef,
117        mailbox: MailboxRef,
118        mut notify: Option<Arc<Notify>>,
119        quit_signal: Arc<Notify>,
120    ) -> Result<HeartbeatSender> {
121        let client_id = meta_client.id();
122        let (tx, mut rx) = meta_client.heartbeat().await.context(MetaClientInitSnafu)?;
123
124        let mut last_received_lease = Instant::now();
125
126        let _handle = common_runtime::spawn_hb(async move {
127            while let Some(res) = rx.message().await.unwrap_or_else(|e| {
128                error!(e; "Error while reading heartbeat response");
129                None
130            }) {
131                if let Some(msg) = res.mailbox_message.as_ref() {
132                    info!("Received mailbox message: {msg:?}, meta_client id: {client_id:?}");
133                }
134                if let Some(lease) = res.region_lease.as_ref() {
135                    metrics::LAST_RECEIVED_HEARTBEAT_ELAPSED
136                        .set(last_received_lease.elapsed().as_millis() as i64);
137                    // Resets the timer.
138                    last_received_lease = Instant::now();
139
140                    let mut leader_region_lease_count = 0;
141                    let mut follower_region_lease_count = 0;
142                    for lease in &lease.regions {
143                        match lease.role() {
144                            RegionRole::Leader | RegionRole::DowngradingLeader => {
145                                leader_region_lease_count += 1
146                            }
147                            RegionRole::Follower => follower_region_lease_count += 1,
148                        }
149                    }
150
151                    metrics::HEARTBEAT_REGION_LEASES
152                        .with_label_values(&["leader"])
153                        .set(leader_region_lease_count);
154                    metrics::HEARTBEAT_REGION_LEASES
155                        .with_label_values(&["follower"])
156                        .set(follower_region_lease_count);
157                }
158                let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), res);
159                if let Err(e) = Self::handle_response(ctx, handler_executor.clone()).await {
160                    error!(e; "Error while handling heartbeat response");
161                }
162                if let Some(notify) = notify.take() {
163                    notify.notify_one();
164                }
165                if !running.load(Ordering::Acquire) {
166                    info!("Heartbeat task shutdown");
167                }
168            }
169            quit_signal.notify_one();
170            info!("Heartbeat handling loop exit.");
171        });
172        Ok(tx)
173    }
174
175    async fn handle_response(
176        ctx: HeartbeatResponseHandlerContext,
177        handler_executor: HeartbeatResponseHandlerExecutorRef,
178    ) -> Result<()> {
179        trace!("Heartbeat response: {:?}", ctx.response);
180        handler_executor
181            .handle(ctx)
182            .await
183            .context(error::HandleHeartbeatResponseSnafu)
184    }
185
186    /// Start heartbeat task, spawn background task.
187    pub async fn start(
188        &self,
189        event_receiver: RegionServerEventReceiver,
190        notify: Option<Arc<Notify>>,
191    ) -> Result<()> {
192        let running = self.running.clone();
193        if running
194            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
195            .is_err()
196        {
197            warn!("Heartbeat task started multiple times");
198            return Ok(());
199        }
200        let interval = self.interval;
201        let node_id = self.node_id;
202        let node_epoch = self.node_epoch;
203        let addr = &self.peer_addr;
204        info!(
205            "Starting heartbeat to Metasrv with interval {interval}. My node id is {node_id}, address is {addr}."
206        );
207
208        let meta_client = self.meta_client.clone();
209        let region_server_clone = self.region_server.clone();
210
211        let handler_executor = self.resp_handler_executor.clone();
212
213        let (outgoing_tx, mut outgoing_rx) = mpsc::channel(16);
214        let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
215
216        let quit_signal = Arc::new(Notify::new());
217
218        let mut tx = Self::create_streams(
219            &meta_client,
220            running.clone(),
221            handler_executor.clone(),
222            mailbox.clone(),
223            notify,
224            quit_signal.clone(),
225        )
226        .await?;
227
228        let self_peer = Some(Peer {
229            id: node_id,
230            addr: addr.clone(),
231        });
232        let epoch = self.region_alive_keeper.epoch();
233        let workload_types = self.workload_types.clone();
234
235        self.region_alive_keeper.start(Some(event_receiver)).await?;
236        let mut last_sent = Instant::now();
237        let cpus = self.resource_spec.cpus as u32;
238        let memory_bytes = self.resource_spec.memory.unwrap_or_default().as_bytes();
239
240        common_runtime::spawn_hb(async move {
241            let sleep = tokio::time::sleep(Duration::from_millis(0));
242            tokio::pin!(sleep);
243
244            let build_info = common_version::build_info();
245            let heartbeat_request = HeartbeatRequest {
246                peer: self_peer,
247                node_epoch,
248                info: Some(NodeInfo {
249                    version: build_info.version.to_string(),
250                    git_commit: build_info.commit_short.to_string(),
251                    start_time_ms: node_epoch,
252                    cpus,
253                    memory_bytes,
254                }),
255                node_workloads: Some(NodeWorkloads::Datanode(DatanodeWorkloads {
256                    types: workload_types.iter().map(|w| w.to_i32()).collect(),
257                })),
258                ..Default::default()
259            };
260
261            loop {
262                if !running.load(Ordering::Relaxed) {
263                    info!("shutdown heartbeat task");
264                    break;
265                }
266                let req = tokio::select! {
267                    message = outgoing_rx.recv() => {
268                        if let Some(message) = message {
269                            match outgoing_message_to_mailbox_message(message) {
270                                Ok(message) => {
271                                    let req = HeartbeatRequest {
272                                        mailbox_message: Some(message),
273                                        ..heartbeat_request.clone()
274                                    };
275                                    HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
276                                    Some(req)
277                                }
278                                Err(e) => {
279                                    error!(e; "Failed to encode mailbox messages!");
280                                    HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
281                                    None
282                                }
283                            }
284                        } else {
285                            None
286                        }
287                    }
288                    _ = &mut sleep => {
289                        let region_stats = Self::load_region_stats(&region_server_clone);
290                        let topic_stats = region_server_clone.topic_stats();
291                        let now = Instant::now();
292                        let duration_since_epoch = (now - epoch).as_millis() as u64;
293                        let req = HeartbeatRequest {
294                            region_stats,
295                            topic_stats,
296                            duration_since_epoch,
297                            ..heartbeat_request.clone()
298                        };
299                        sleep.as_mut().reset(now + Duration::from_millis(interval));
300                        Some(req)
301                    }
302                    // If the heartbeat stream is broken, send a dummy heartbeat request to re-create the heartbeat stream.
303                    _ = quit_signal.notified() => {
304                        let req = HeartbeatRequest::default();
305                        Some(req)
306                    }
307                };
308                if let Some(req) = req {
309                    metrics::LAST_SENT_HEARTBEAT_ELAPSED
310                        .set(last_sent.elapsed().as_millis() as i64);
311                    // Resets the timer.
312                    last_sent = Instant::now();
313                    debug!("Sending heartbeat request: {:?}", req);
314                    if let Err(e) = tx.send(req).await {
315                        error!(e; "Failed to send heartbeat to metasrv");
316                        match Self::create_streams(
317                            &meta_client,
318                            running.clone(),
319                            handler_executor.clone(),
320                            mailbox.clone(),
321                            None,
322                            quit_signal.clone(),
323                        )
324                        .await
325                        {
326                            Ok(new_tx) => {
327                                info!("Reconnected to metasrv");
328                                tx = new_tx;
329                                // Triggers to send heartbeat immediately.
330                                sleep.as_mut().reset(Instant::now());
331                            }
332                            Err(e) => {
333                                // Before the META_LEASE_SECS expires,
334                                // any retries are meaningless, it always reads the old meta leader address.
335                                // Triggers to retry after META_KEEP_ALIVE_INTERVAL_SECS.
336                                sleep.as_mut().reset(
337                                    Instant::now()
338                                        + Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS),
339                                );
340                                error!(e; "Failed to reconnect to metasrv!");
341                            }
342                        }
343                    } else {
344                        HEARTBEAT_SENT_COUNT.inc();
345                    }
346                }
347            }
348        });
349
350        Ok(())
351    }
352
353    fn load_region_stats(region_server: &RegionServer) -> Vec<RegionStat> {
354        region_server
355            .reportable_regions()
356            .into_iter()
357            .map(|stat| {
358                let region_stat = region_server
359                    .region_statistic(stat.region_id)
360                    .unwrap_or_default();
361                let mut extensions = HashMap::new();
362                if let Some(serialized) = region_stat.serialize_to_vec() {
363                    extensions.insert(REGION_STATISTIC_KEY.to_string(), serialized);
364                }
365
366                RegionStat {
367                    region_id: stat.region_id.as_u64(),
368                    engine: stat.engine,
369                    role: RegionRole::from(stat.role).into(),
370                    // TODO(weny): w/rcus
371                    rcus: 0,
372                    wcus: 0,
373                    approximate_bytes: region_stat.estimated_disk_size() as i64,
374                    extensions,
375                }
376            })
377            .collect()
378    }
379
380    pub fn close(&self) -> Result<()> {
381        let running = self.running.clone();
382        if running
383            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
384            .is_err()
385        {
386            warn!("Call close heartbeat task multiple times");
387        }
388
389        Ok(())
390    }
391}