datanode/
heartbeat.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::sync::Arc;
18use std::time::Duration;
19
20use api::v1::meta::{HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat};
21use common_base::Plugins;
22use common_meta::cache_invalidator::CacheInvalidatorRef;
23use common_meta::datanode::REGION_STATISTIC_KEY;
24use common_meta::distributed_time_constants::META_KEEP_ALIVE_INTERVAL_SECS;
25use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
26use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
27use common_meta::heartbeat::handler::{
28    HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
29};
30use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef};
31use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
32use common_telemetry::{debug, error, info, trace, warn};
33use meta_client::client::{HeartbeatSender, MetaClient};
34use meta_client::MetaClientRef;
35use servers::addrs;
36use snafu::ResultExt;
37use tokio::sync::{mpsc, Notify};
38use tokio::time::Instant;
39
40use self::handler::RegionHeartbeatResponseHandler;
41use crate::alive_keeper::{CountdownTaskHandlerExtRef, RegionAliveKeeper};
42use crate::config::DatanodeOptions;
43use crate::error::{self, MetaClientInitSnafu, Result};
44use crate::event_listener::RegionServerEventReceiver;
45use crate::metrics::{self, HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
46use crate::region_server::RegionServer;
47
48pub(crate) mod handler;
49pub(crate) mod task_tracker;
50
51/// The datanode heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background.
52pub struct HeartbeatTask {
53    node_id: u64,
54    node_epoch: u64,
55    peer_addr: String,
56    running: Arc<AtomicBool>,
57    meta_client: MetaClientRef,
58    region_server: RegionServer,
59    interval: u64,
60    resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
61    region_alive_keeper: Arc<RegionAliveKeeper>,
62}
63
64impl Drop for HeartbeatTask {
65    fn drop(&mut self) {
66        self.running.store(false, Ordering::Release);
67    }
68}
69
70impl HeartbeatTask {
71    /// Create a new heartbeat task instance.
72    pub async fn try_new(
73        opts: &DatanodeOptions,
74        region_server: RegionServer,
75        meta_client: MetaClientRef,
76        cache_invalidator: CacheInvalidatorRef,
77        plugins: Plugins,
78    ) -> Result<Self> {
79        let countdown_task_handler_ext = plugins.get::<CountdownTaskHandlerExtRef>();
80        let region_alive_keeper = Arc::new(RegionAliveKeeper::new(
81            region_server.clone(),
82            countdown_task_handler_ext,
83            opts.heartbeat.interval.as_millis() as u64,
84        ));
85        let resp_handler_executor = Arc::new(HandlerGroupExecutor::new(vec![
86            region_alive_keeper.clone(),
87            Arc::new(ParseMailboxMessageHandler),
88            Arc::new(RegionHeartbeatResponseHandler::new(region_server.clone())),
89            Arc::new(InvalidateCacheHandler::new(cache_invalidator)),
90        ]));
91
92        Ok(Self {
93            node_id: opts.node_id.unwrap_or(0),
94            // We use datanode's start time millis as the node's epoch.
95            node_epoch: common_time::util::current_time_millis() as u64,
96            peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
97            running: Arc::new(AtomicBool::new(false)),
98            meta_client,
99            region_server,
100            interval: opts.heartbeat.interval.as_millis() as u64,
101            resp_handler_executor,
102            region_alive_keeper,
103        })
104    }
105
106    pub async fn create_streams(
107        meta_client: &MetaClient,
108        running: Arc<AtomicBool>,
109        handler_executor: HeartbeatResponseHandlerExecutorRef,
110        mailbox: MailboxRef,
111        mut notify: Option<Arc<Notify>>,
112        quit_signal: Arc<Notify>,
113    ) -> Result<HeartbeatSender> {
114        let client_id = meta_client.id();
115        let (tx, mut rx) = meta_client.heartbeat().await.context(MetaClientInitSnafu)?;
116
117        let mut last_received_lease = Instant::now();
118
119        let _handle = common_runtime::spawn_hb(async move {
120            while let Some(res) = rx.message().await.unwrap_or_else(|e| {
121                error!(e; "Error while reading heartbeat response");
122                None
123            }) {
124                if let Some(msg) = res.mailbox_message.as_ref() {
125                    info!("Received mailbox message: {msg:?}, meta_client id: {client_id:?}");
126                }
127                if let Some(lease) = res.region_lease.as_ref() {
128                    metrics::LAST_RECEIVED_HEARTBEAT_ELAPSED
129                        .set(last_received_lease.elapsed().as_millis() as i64);
130                    // Resets the timer.
131                    last_received_lease = Instant::now();
132
133                    let mut leader_region_lease_count = 0;
134                    let mut follower_region_lease_count = 0;
135                    for lease in &lease.regions {
136                        match lease.role() {
137                            RegionRole::Leader | RegionRole::DowngradingLeader => {
138                                leader_region_lease_count += 1
139                            }
140                            RegionRole::Follower => follower_region_lease_count += 1,
141                        }
142                    }
143
144                    metrics::HEARTBEAT_REGION_LEASES
145                        .with_label_values(&["leader"])
146                        .set(leader_region_lease_count);
147                    metrics::HEARTBEAT_REGION_LEASES
148                        .with_label_values(&["follower"])
149                        .set(follower_region_lease_count);
150                }
151                let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), res);
152                if let Err(e) = Self::handle_response(ctx, handler_executor.clone()).await {
153                    error!(e; "Error while handling heartbeat response");
154                }
155                if let Some(notify) = notify.take() {
156                    notify.notify_one();
157                }
158                if !running.load(Ordering::Acquire) {
159                    info!("Heartbeat task shutdown");
160                }
161            }
162            quit_signal.notify_one();
163            info!("Heartbeat handling loop exit.");
164        });
165        Ok(tx)
166    }
167
168    async fn handle_response(
169        ctx: HeartbeatResponseHandlerContext,
170        handler_executor: HeartbeatResponseHandlerExecutorRef,
171    ) -> Result<()> {
172        trace!("Heartbeat response: {:?}", ctx.response);
173        handler_executor
174            .handle(ctx)
175            .await
176            .context(error::HandleHeartbeatResponseSnafu)
177    }
178
179    /// Start heartbeat task, spawn background task.
180    pub async fn start(
181        &self,
182        event_receiver: RegionServerEventReceiver,
183        notify: Option<Arc<Notify>>,
184    ) -> Result<()> {
185        let running = self.running.clone();
186        if running
187            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
188            .is_err()
189        {
190            warn!("Heartbeat task started multiple times");
191            return Ok(());
192        }
193        let interval = self.interval;
194        let node_id = self.node_id;
195        let node_epoch = self.node_epoch;
196        let addr = &self.peer_addr;
197        info!("Starting heartbeat to Metasrv with interval {interval}. My node id is {node_id}, address is {addr}.");
198
199        let meta_client = self.meta_client.clone();
200        let region_server_clone = self.region_server.clone();
201
202        let handler_executor = self.resp_handler_executor.clone();
203
204        let (outgoing_tx, mut outgoing_rx) = mpsc::channel(16);
205        let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
206
207        let quit_signal = Arc::new(Notify::new());
208
209        let mut tx = Self::create_streams(
210            &meta_client,
211            running.clone(),
212            handler_executor.clone(),
213            mailbox.clone(),
214            notify,
215            quit_signal.clone(),
216        )
217        .await?;
218
219        let self_peer = Some(Peer {
220            id: node_id,
221            addr: addr.clone(),
222        });
223        let epoch = self.region_alive_keeper.epoch();
224
225        self.region_alive_keeper.start(Some(event_receiver)).await?;
226        let mut last_sent = Instant::now();
227
228        common_runtime::spawn_hb(async move {
229            let sleep = tokio::time::sleep(Duration::from_millis(0));
230            tokio::pin!(sleep);
231
232            let build_info = common_version::build_info();
233            let heartbeat_request = HeartbeatRequest {
234                peer: self_peer,
235                node_epoch,
236                info: Some(NodeInfo {
237                    version: build_info.version.to_string(),
238                    git_commit: build_info.commit_short.to_string(),
239                    start_time_ms: node_epoch,
240                    cpus: num_cpus::get() as u32,
241                }),
242                ..Default::default()
243            };
244
245            loop {
246                if !running.load(Ordering::Relaxed) {
247                    info!("shutdown heartbeat task");
248                    break;
249                }
250                let req = tokio::select! {
251                    message = outgoing_rx.recv() => {
252                        if let Some(message) = message {
253                            match outgoing_message_to_mailbox_message(message) {
254                                Ok(message) => {
255                                    let req = HeartbeatRequest {
256                                        mailbox_message: Some(message),
257                                        ..heartbeat_request.clone()
258                                    };
259                                    HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
260                                    Some(req)
261                                }
262                                Err(e) => {
263                                    error!(e; "Failed to encode mailbox messages!");
264                                    HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
265                                    None
266                                }
267                            }
268                        } else {
269                            None
270                        }
271                    }
272                    _ = &mut sleep => {
273                        let region_stats = Self::load_region_stats(&region_server_clone);
274                        let now = Instant::now();
275                        let duration_since_epoch = (now - epoch).as_millis() as u64;
276                        let req = HeartbeatRequest {
277                            region_stats,
278                            duration_since_epoch,
279                            ..heartbeat_request.clone()
280                        };
281                        sleep.as_mut().reset(now + Duration::from_millis(interval));
282                        Some(req)
283                    }
284                    // If the heartbeat stream is broken, send a dummy heartbeat request to re-create the heartbeat stream.
285                    _ = quit_signal.notified() => {
286                        let req = HeartbeatRequest::default();
287                        Some(req)
288                    }
289                };
290                if let Some(req) = req {
291                    metrics::LAST_SENT_HEARTBEAT_ELAPSED
292                        .set(last_sent.elapsed().as_millis() as i64);
293                    // Resets the timer.
294                    last_sent = Instant::now();
295                    debug!("Sending heartbeat request: {:?}", req);
296                    if let Err(e) = tx.send(req).await {
297                        error!(e; "Failed to send heartbeat to metasrv");
298                        match Self::create_streams(
299                            &meta_client,
300                            running.clone(),
301                            handler_executor.clone(),
302                            mailbox.clone(),
303                            None,
304                            quit_signal.clone(),
305                        )
306                        .await
307                        {
308                            Ok(new_tx) => {
309                                info!("Reconnected to metasrv");
310                                tx = new_tx;
311                                // Triggers to send heartbeat immediately.
312                                sleep.as_mut().reset(Instant::now());
313                            }
314                            Err(e) => {
315                                // Before the META_LEASE_SECS expires,
316                                // any retries are meaningless, it always reads the old meta leader address.
317                                // Triggers to retry after META_KEEP_ALIVE_INTERVAL_SECS.
318                                sleep.as_mut().reset(
319                                    Instant::now()
320                                        + Duration::from_secs(META_KEEP_ALIVE_INTERVAL_SECS),
321                                );
322                                error!(e; "Failed to reconnect to metasrv!");
323                            }
324                        }
325                    } else {
326                        HEARTBEAT_SENT_COUNT.inc();
327                    }
328                }
329            }
330        });
331
332        Ok(())
333    }
334
335    fn load_region_stats(region_server: &RegionServer) -> Vec<RegionStat> {
336        region_server
337            .reportable_regions()
338            .into_iter()
339            .map(|stat| {
340                let region_stat = region_server
341                    .region_statistic(stat.region_id)
342                    .unwrap_or_default();
343                let mut extensions = HashMap::new();
344                if let Some(serialized) = region_stat.serialize_to_vec() {
345                    extensions.insert(REGION_STATISTIC_KEY.to_string(), serialized);
346                }
347
348                RegionStat {
349                    region_id: stat.region_id.as_u64(),
350                    engine: stat.engine,
351                    role: RegionRole::from(stat.role).into(),
352                    // TODO(weny): w/rcus
353                    rcus: 0,
354                    wcus: 0,
355                    approximate_bytes: region_stat.estimated_disk_size() as i64,
356                    extensions,
357                }
358            })
359            .collect()
360    }
361
362    pub fn close(&self) -> Result<()> {
363        let running = self.running.clone();
364        if running
365            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
366            .is_err()
367        {
368            warn!("Call close heartbeat task multiple times");
369        }
370
371        Ok(())
372    }
373}