datanode/
heartbeat.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicBool, Ordering};
18use std::time::Duration;
19
20use api::v1::meta::heartbeat_request::NodeWorkloads;
21use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat};
22use common_base::Plugins;
23use common_meta::cache_invalidator::CacheInvalidatorRef;
24use common_meta::datanode::REGION_STATISTIC_KEY;
25use common_meta::distributed_time_constants::BASE_HEARTBEAT_INTERVAL;
26use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
27use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
28use common_meta::heartbeat::handler::suspend::SuspendHandler;
29use common_meta::heartbeat::handler::{
30    HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
31};
32use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef};
33use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
34use common_stat::ResourceStatRef;
35use common_telemetry::{debug, error, info, trace, warn};
36use common_workload::DatanodeWorkloadType;
37use meta_client::MetaClientRef;
38use meta_client::client::heartbeat::HeartbeatConfig;
39use meta_client::client::{HeartbeatSender, MetaClient};
40use servers::addrs;
41use snafu::{OptionExt as _, ResultExt};
42use tokio::sync::{Notify, mpsc};
43use tokio::time::Instant;
44
45use self::handler::RegionHeartbeatResponseHandler;
46use crate::alive_keeper::{CountdownTaskHandlerExtRef, RegionAliveKeeper};
47use crate::config::DatanodeOptions;
48use crate::error::{self, MetaClientInitSnafu, RegionEngineNotFoundSnafu, Result};
49use crate::event_listener::RegionServerEventReceiver;
50use crate::metrics::{self, HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
51use crate::region_server::RegionServer;
52
53pub(crate) mod handler;
54pub(crate) mod task_tracker;
55
56/// The datanode heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background.
57pub struct HeartbeatTask {
58    node_id: u64,
59    workload_types: Vec<DatanodeWorkloadType>,
60    node_epoch: u64,
61    peer_addr: String,
62    running: Arc<AtomicBool>,
63    meta_client: MetaClientRef,
64    region_server: RegionServer,
65    resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
66    region_alive_keeper: Arc<RegionAliveKeeper>,
67    resource_stat: ResourceStatRef,
68}
69
70impl Drop for HeartbeatTask {
71    fn drop(&mut self) {
72        self.running.store(false, Ordering::Release);
73    }
74}
75
76impl HeartbeatTask {
77    /// Create a new heartbeat task instance.
78    pub async fn try_new(
79        opts: &DatanodeOptions,
80        region_server: RegionServer,
81        meta_client: MetaClientRef,
82        cache_invalidator: CacheInvalidatorRef,
83        plugins: Plugins,
84        resource_stat: ResourceStatRef,
85    ) -> Result<Self> {
86        let countdown_task_handler_ext = plugins.get::<CountdownTaskHandlerExtRef>();
87        let region_alive_keeper = Arc::new(RegionAliveKeeper::new(
88            region_server.clone(),
89            countdown_task_handler_ext,
90            BASE_HEARTBEAT_INTERVAL,
91        ));
92        let resp_handler_executor = Arc::new(HandlerGroupExecutor::new(vec![
93            region_alive_keeper.clone(),
94            Arc::new(ParseMailboxMessageHandler),
95            Arc::new(SuspendHandler::new(region_server.suspend_state())),
96            Arc::new(
97                RegionHeartbeatResponseHandler::new(region_server.clone())
98                    .with_open_region_parallelism(opts.init_regions_parallelism),
99            ),
100            Arc::new(InvalidateCacheHandler::new(cache_invalidator)),
101        ]));
102
103        Ok(Self {
104            node_id: opts.node_id.unwrap_or(0),
105            workload_types: opts.workload_types.clone(),
106            // We use datanode's start time millis as the node's epoch.
107            node_epoch: common_time::util::current_time_millis() as u64,
108            peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
109            running: Arc::new(AtomicBool::new(false)),
110            meta_client,
111            region_server,
112            resp_handler_executor,
113            region_alive_keeper,
114            resource_stat,
115        })
116    }
117
118    pub async fn create_streams(
119        meta_client: &MetaClient,
120        running: Arc<AtomicBool>,
121        handler_executor: HeartbeatResponseHandlerExecutorRef,
122        mailbox: MailboxRef,
123        mut notify: Option<Arc<Notify>>,
124        quit_signal: Arc<Notify>,
125    ) -> Result<(HeartbeatSender, HeartbeatConfig)> {
126        let client_id = meta_client.id();
127        let (tx, mut rx, config) = meta_client.heartbeat().await.context(MetaClientInitSnafu)?;
128
129        let mut last_received_lease = Instant::now();
130
131        let _handle = common_runtime::spawn_hb(async move {
132            while let Some(res) = rx.message().await.unwrap_or_else(|e| {
133                error!(e; "Error while reading heartbeat response");
134                None
135            }) {
136                if let Some(msg) = res.mailbox_message.as_ref() {
137                    info!("Received mailbox message: {msg:?}, meta_client id: {client_id:?}");
138                }
139                if let Some(lease) = res.region_lease.as_ref() {
140                    metrics::LAST_RECEIVED_HEARTBEAT_ELAPSED
141                        .set(last_received_lease.elapsed().as_millis() as i64);
142                    // Resets the timer.
143                    last_received_lease = Instant::now();
144
145                    let mut leader_region_lease_count = 0;
146                    let mut follower_region_lease_count = 0;
147                    for lease in &lease.regions {
148                        match lease.role() {
149                            RegionRole::Leader | RegionRole::DowngradingLeader => {
150                                leader_region_lease_count += 1
151                            }
152                            RegionRole::Follower => follower_region_lease_count += 1,
153                        }
154                    }
155
156                    metrics::HEARTBEAT_REGION_LEASES
157                        .with_label_values(&["leader"])
158                        .set(leader_region_lease_count);
159                    metrics::HEARTBEAT_REGION_LEASES
160                        .with_label_values(&["follower"])
161                        .set(follower_region_lease_count);
162                }
163                let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), res);
164                if let Err(e) = Self::handle_response(ctx, handler_executor.clone()).await {
165                    error!(e; "Error while handling heartbeat response");
166                }
167                if let Some(notify) = notify.take() {
168                    notify.notify_one();
169                }
170                if !running.load(Ordering::Acquire) {
171                    info!("Heartbeat task shutdown");
172                }
173            }
174            quit_signal.notify_one();
175            info!("Heartbeat handling loop exit.");
176        });
177        Ok((tx, config))
178    }
179
180    async fn handle_response(
181        ctx: HeartbeatResponseHandlerContext,
182        handler_executor: HeartbeatResponseHandlerExecutorRef,
183    ) -> Result<()> {
184        trace!("Heartbeat response: {:?}", ctx.response);
185        handler_executor
186            .handle(ctx)
187            .await
188            .context(error::HandleHeartbeatResponseSnafu)
189    }
190
191    #[allow(deprecated)]
192    /// Start heartbeat task, spawn background task.
193    pub async fn start(
194        &self,
195        event_receiver: RegionServerEventReceiver,
196        notify: Option<Arc<Notify>>,
197    ) -> Result<()> {
198        let running = self.running.clone();
199        if running
200            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
201            .is_err()
202        {
203            warn!("Heartbeat task started multiple times");
204            return Ok(());
205        }
206        let node_id = self.node_id;
207        let node_epoch = self.node_epoch;
208        let addr = &self.peer_addr;
209
210        let meta_client = self.meta_client.clone();
211        let region_server_clone = self.region_server.clone();
212
213        let handler_executor = self.resp_handler_executor.clone();
214
215        let (outgoing_tx, mut outgoing_rx) = mpsc::channel(16);
216        let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
217
218        let quit_signal = Arc::new(Notify::new());
219
220        let (mut tx, config) = Self::create_streams(
221            &meta_client,
222            running.clone(),
223            handler_executor.clone(),
224            mailbox.clone(),
225            notify,
226            quit_signal.clone(),
227        )
228        .await?;
229
230        let interval = config.interval.as_millis() as u64;
231        let mut retry_interval = config.retry_interval;
232
233        // Update RegionAliveKeeper with the interval from Metasrv
234        self.region_alive_keeper.update_heartbeat_interval(interval);
235
236        info!(
237            "Starting heartbeat to Metasrv with config: {}. My node id is {}, address is {}.",
238            config, node_id, addr
239        );
240
241        let self_peer = Some(Peer {
242            id: node_id,
243            addr: addr.clone(),
244        });
245        let epoch = self.region_alive_keeper.epoch();
246        let workload_types = self.workload_types.clone();
247
248        self.region_alive_keeper.start(Some(event_receiver)).await?;
249        let mut last_sent = Instant::now();
250        let total_cpu_millicores = self.resource_stat.get_total_cpu_millicores();
251        let total_memory_bytes = self.resource_stat.get_total_memory_bytes();
252        let resource_stat = self.resource_stat.clone();
253        let region_alive_keeper = self.region_alive_keeper.clone();
254        let gc_limiter = self
255            .region_server
256            .mito_engine()
257            .context(RegionEngineNotFoundSnafu { name: "mito" })?
258            .gc_limiter();
259
260        common_runtime::spawn_hb(async move {
261            let sleep = tokio::time::sleep(Duration::from_millis(0));
262            tokio::pin!(sleep);
263
264            let build_info = common_version::build_info();
265
266            let heartbeat_request = HeartbeatRequest {
267                peer: self_peer,
268                node_epoch,
269                info: Some(NodeInfo {
270                    version: build_info.version.to_string(),
271                    git_commit: build_info.commit_short.to_string(),
272                    start_time_ms: node_epoch,
273                    total_cpu_millicores,
274                    total_memory_bytes,
275                    cpu_usage_millicores: 0,
276                    memory_usage_bytes: 0,
277                    // TODO(zyy17): Remove these deprecated fields when the deprecated fields are removed from the proto.
278                    cpus: total_cpu_millicores as u32,
279                    memory_bytes: total_memory_bytes as u64,
280                    hostname: hostname::get()
281                        .unwrap_or_default()
282                        .to_string_lossy()
283                        .to_string(),
284                }),
285                node_workloads: Some(NodeWorkloads::Datanode(DatanodeWorkloads {
286                    types: workload_types.iter().map(|w| w.to_i32()).collect(),
287                })),
288                ..Default::default()
289            };
290
291            loop {
292                if !running.load(Ordering::Relaxed) {
293                    info!("shutdown heartbeat task");
294                    break;
295                }
296                let req = tokio::select! {
297                    message = outgoing_rx.recv() => {
298                        if let Some(message) = message {
299                            match outgoing_message_to_mailbox_message(message) {
300                                Ok(message) => {
301                                    let mut extensions = heartbeat_request.extensions.clone();
302                                    let gc_stat = gc_limiter.gc_stat();
303                                    gc_stat.into_extensions(&mut extensions);
304
305                                    let req = HeartbeatRequest {
306                                        mailbox_message: Some(message),
307                                        extensions,
308                                        ..heartbeat_request.clone()
309                                    };
310                                    HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
311                                    Some(req)
312                                }
313                                Err(e) => {
314                                    error!(e; "Failed to encode mailbox messages!");
315                                    HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
316                                    None
317                                }
318                            }
319                        } else {
320                            None
321                        }
322                    }
323                    _ = &mut sleep => {
324                        let region_stats = Self::load_region_stats(&region_server_clone);
325                        let topic_stats = region_server_clone.topic_stats();
326                        let now = Instant::now();
327                        let duration_since_epoch = (now - epoch).as_millis() as u64;
328
329                        let mut extensions = heartbeat_request.extensions.clone();
330                        let gc_stat = gc_limiter.gc_stat();
331                        gc_stat.into_extensions(&mut extensions);
332
333                        let mut req = HeartbeatRequest {
334                            region_stats,
335                            topic_stats,
336                            duration_since_epoch,
337                            extensions,
338                            ..heartbeat_request.clone()
339                        };
340
341                        if let Some(info) = req.info.as_mut() {
342                            info.cpu_usage_millicores = resource_stat.get_cpu_usage_millicores();
343                            info.memory_usage_bytes = resource_stat.get_memory_usage_bytes();
344                        }
345
346                        sleep.as_mut().reset(now + Duration::from_millis(interval));
347                        Some(req)
348                    }
349                    // If the heartbeat stream is broken, send a dummy heartbeat request to re-create the heartbeat stream.
350                    _ = quit_signal.notified() => {
351                        let req = HeartbeatRequest::default();
352                        Some(req)
353                    }
354                };
355                if let Some(req) = req {
356                    metrics::LAST_SENT_HEARTBEAT_ELAPSED
357                        .set(last_sent.elapsed().as_millis() as i64);
358                    // Resets the timer.
359                    last_sent = Instant::now();
360                    debug!("Sending heartbeat request: {:?}", req);
361                    if let Err(e) = tx.send(req).await {
362                        error!(e; "Failed to send heartbeat to metasrv");
363                        match Self::create_streams(
364                            &meta_client,
365                            running.clone(),
366                            handler_executor.clone(),
367                            mailbox.clone(),
368                            None,
369                            quit_signal.clone(),
370                        )
371                        .await
372                        {
373                            Ok((new_tx, new_config)) => {
374                                info!("Reconnected to metasrv, heartbeat config: {}", new_config);
375                                tx = new_tx;
376                                // Update retry_interval from new config
377                                retry_interval = new_config.retry_interval;
378                                // Update region_alive_keeper's heartbeat interval
379                                region_alive_keeper.update_heartbeat_interval(
380                                    new_config.interval.as_millis() as u64,
381                                );
382                                // Triggers to send heartbeat immediately.
383                                sleep.as_mut().reset(Instant::now());
384                            }
385                            Err(e) => {
386                                // Before the META_LEASE_SECS expires,
387                                // any retries are meaningless, it always reads the old meta leader address.
388                                // Triggers to retry after retry_interval from Metasrv config.
389                                sleep.as_mut().reset(Instant::now() + retry_interval);
390                                error!(e; "Failed to reconnect to metasrv!");
391                            }
392                        }
393                    } else {
394                        HEARTBEAT_SENT_COUNT.inc();
395                    }
396                }
397            }
398        });
399
400        Ok(())
401    }
402
403    fn load_region_stats(region_server: &RegionServer) -> Vec<RegionStat> {
404        region_server
405            .reportable_regions()
406            .into_iter()
407            .map(|stat| {
408                let region_stat = region_server
409                    .region_statistic(stat.region_id)
410                    .unwrap_or_default();
411                let mut extensions = HashMap::new();
412                if let Some(serialized) = region_stat.serialize_to_vec() {
413                    extensions.insert(REGION_STATISTIC_KEY.to_string(), serialized);
414                }
415
416                RegionStat {
417                    region_id: stat.region_id.as_u64(),
418                    engine: stat.engine,
419                    role: RegionRole::from(stat.role).into(),
420                    // TODO(weny): w/rcus
421                    rcus: 0,
422                    wcus: 0,
423                    approximate_bytes: region_stat.estimated_disk_size() as i64,
424                    extensions,
425                }
426            })
427            .collect()
428    }
429
430    pub fn close(&self) -> Result<()> {
431        let running = self.running.clone();
432        if running
433            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
434            .is_err()
435        {
436            warn!("Call close heartbeat task multiple times");
437        }
438
439        Ok(())
440    }
441}