datanode/
heartbeat.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::sync::atomic::{AtomicBool, Ordering};
18use std::time::Duration;
19
20use api::v1::meta::heartbeat_request::NodeWorkloads;
21use api::v1::meta::{DatanodeWorkloads, HeartbeatRequest, NodeInfo, Peer, RegionRole, RegionStat};
22use common_base::Plugins;
23use common_meta::cache_invalidator::CacheInvalidatorRef;
24use common_meta::datanode::REGION_STATISTIC_KEY;
25use common_meta::distributed_time_constants::BASE_HEARTBEAT_INTERVAL;
26use common_meta::heartbeat::handler::invalidate_table_cache::InvalidateCacheHandler;
27use common_meta::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler;
28use common_meta::heartbeat::handler::suspend::SuspendHandler;
29use common_meta::heartbeat::handler::{
30    HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutorRef,
31};
32use common_meta::heartbeat::mailbox::{HeartbeatMailbox, MailboxRef};
33use common_meta::heartbeat::utils::outgoing_message_to_mailbox_message;
34use common_meta::kv_backend::KvBackendRef;
35use common_stat::ResourceStatRef;
36use common_telemetry::{debug, error, info, trace, warn};
37use common_workload::DatanodeWorkloadType;
38use meta_client::MetaClientRef;
39use meta_client::client::heartbeat::HeartbeatConfig;
40use meta_client::client::{HeartbeatSender, MetaClient};
41use servers::addrs;
42use snafu::{OptionExt as _, ResultExt};
43use tokio::sync::{Notify, mpsc};
44use tokio::time::Instant;
45
46use self::handler::RegionHeartbeatResponseHandler;
47use crate::alive_keeper::{CountdownTaskHandlerExtRef, RegionAliveKeeper};
48use crate::config::DatanodeOptions;
49use crate::error::{self, MetaClientInitSnafu, RegionEngineNotFoundSnafu, Result};
50use crate::event_listener::RegionServerEventReceiver;
51use crate::metrics::{self, HEARTBEAT_RECV_COUNT, HEARTBEAT_SENT_COUNT};
52use crate::region_server::RegionServer;
53
54pub(crate) mod handler;
55pub(crate) mod task_tracker;
56
57/// The datanode heartbeat task which sending `[HeartbeatRequest]` to Metasrv periodically in background.
58pub struct HeartbeatTask {
59    node_id: u64,
60    workload_types: Vec<DatanodeWorkloadType>,
61    node_epoch: u64,
62    peer_addr: String,
63    running: Arc<AtomicBool>,
64    meta_client: MetaClientRef,
65    region_server: RegionServer,
66    resp_handler_executor: HeartbeatResponseHandlerExecutorRef,
67    region_alive_keeper: Arc<RegionAliveKeeper>,
68    resource_stat: ResourceStatRef,
69}
70
71impl Drop for HeartbeatTask {
72    fn drop(&mut self) {
73        self.running.store(false, Ordering::Release);
74    }
75}
76
77impl HeartbeatTask {
78    /// Create a new heartbeat task instance.
79    pub async fn try_new(
80        opts: &DatanodeOptions,
81        region_server: RegionServer,
82        meta_client: MetaClientRef,
83        kv_backend: KvBackendRef,
84        cache_invalidator: CacheInvalidatorRef,
85        plugins: Plugins,
86        resource_stat: ResourceStatRef,
87    ) -> Result<Self> {
88        let countdown_task_handler_ext = plugins.get::<CountdownTaskHandlerExtRef>();
89        let region_alive_keeper = Arc::new(RegionAliveKeeper::new(
90            region_server.clone(),
91            countdown_task_handler_ext,
92            BASE_HEARTBEAT_INTERVAL,
93        ));
94        let resp_handler_executor = Arc::new(HandlerGroupExecutor::new(vec![
95            region_alive_keeper.clone(),
96            Arc::new(ParseMailboxMessageHandler),
97            Arc::new(SuspendHandler::new(region_server.suspend_state())),
98            Arc::new(
99                RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend)
100                    .with_open_region_parallelism(opts.init_regions_parallelism),
101            ),
102            Arc::new(InvalidateCacheHandler::new(cache_invalidator)),
103        ]));
104
105        Ok(Self {
106            node_id: opts.node_id.unwrap_or(0),
107            workload_types: opts.workload_types.clone(),
108            // We use datanode's start time millis as the node's epoch.
109            node_epoch: common_time::util::current_time_millis() as u64,
110            peer_addr: addrs::resolve_addr(&opts.grpc.bind_addr, Some(&opts.grpc.server_addr)),
111            running: Arc::new(AtomicBool::new(false)),
112            meta_client,
113            region_server,
114            resp_handler_executor,
115            region_alive_keeper,
116            resource_stat,
117        })
118    }
119
120    pub async fn create_streams(
121        meta_client: &MetaClient,
122        running: Arc<AtomicBool>,
123        handler_executor: HeartbeatResponseHandlerExecutorRef,
124        mailbox: MailboxRef,
125        mut notify: Option<Arc<Notify>>,
126        quit_signal: Arc<Notify>,
127    ) -> Result<(HeartbeatSender, HeartbeatConfig)> {
128        let client_id = meta_client.id();
129        let (tx, mut rx, config) = meta_client.heartbeat().await.context(MetaClientInitSnafu)?;
130
131        let mut last_received_lease = Instant::now();
132
133        let _handle = common_runtime::spawn_hb(async move {
134            while let Some(res) = rx.message().await.unwrap_or_else(|e| {
135                error!(e; "Error while reading heartbeat response");
136                None
137            }) {
138                if let Some(msg) = res.mailbox_message.as_ref() {
139                    info!("Received mailbox message: {msg:?}, meta_client id: {client_id:?}");
140                }
141                if let Some(lease) = res.region_lease.as_ref() {
142                    metrics::LAST_RECEIVED_HEARTBEAT_ELAPSED
143                        .set(last_received_lease.elapsed().as_millis() as i64);
144                    // Resets the timer.
145                    last_received_lease = Instant::now();
146
147                    let mut leader_region_lease_count = 0;
148                    let mut follower_region_lease_count = 0;
149                    for lease in &lease.regions {
150                        match lease.role() {
151                            RegionRole::Leader | RegionRole::DowngradingLeader => {
152                                leader_region_lease_count += 1
153                            }
154                            RegionRole::Follower => follower_region_lease_count += 1,
155                        }
156                    }
157
158                    metrics::HEARTBEAT_REGION_LEASES
159                        .with_label_values(&["leader"])
160                        .set(leader_region_lease_count);
161                    metrics::HEARTBEAT_REGION_LEASES
162                        .with_label_values(&["follower"])
163                        .set(follower_region_lease_count);
164                }
165                let ctx = HeartbeatResponseHandlerContext::new(mailbox.clone(), res);
166                if let Err(e) = Self::handle_response(ctx, handler_executor.clone()).await {
167                    error!(e; "Error while handling heartbeat response");
168                }
169                if let Some(notify) = notify.take() {
170                    notify.notify_one();
171                }
172                if !running.load(Ordering::Acquire) {
173                    info!("Heartbeat task shutdown");
174                }
175            }
176            quit_signal.notify_one();
177            info!("Heartbeat handling loop exit.");
178        });
179        Ok((tx, config))
180    }
181
182    async fn handle_response(
183        ctx: HeartbeatResponseHandlerContext,
184        handler_executor: HeartbeatResponseHandlerExecutorRef,
185    ) -> Result<()> {
186        trace!("Heartbeat response: {:?}", ctx.response);
187        handler_executor
188            .handle(ctx)
189            .await
190            .context(error::HandleHeartbeatResponseSnafu)
191    }
192
193    #[allow(deprecated)]
194    /// Start heartbeat task, spawn background task.
195    pub async fn start(
196        &self,
197        event_receiver: RegionServerEventReceiver,
198        notify: Option<Arc<Notify>>,
199    ) -> Result<()> {
200        let running = self.running.clone();
201        if running
202            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
203            .is_err()
204        {
205            warn!("Heartbeat task started multiple times");
206            return Ok(());
207        }
208        let node_id = self.node_id;
209        let node_epoch = self.node_epoch;
210        let addr = &self.peer_addr;
211
212        let meta_client = self.meta_client.clone();
213        let region_server_clone = self.region_server.clone();
214
215        let handler_executor = self.resp_handler_executor.clone();
216
217        let (outgoing_tx, mut outgoing_rx) = mpsc::channel(16);
218        let mailbox = Arc::new(HeartbeatMailbox::new(outgoing_tx));
219
220        let quit_signal = Arc::new(Notify::new());
221
222        let (mut tx, config) = Self::create_streams(
223            &meta_client,
224            running.clone(),
225            handler_executor.clone(),
226            mailbox.clone(),
227            notify,
228            quit_signal.clone(),
229        )
230        .await?;
231
232        let interval = config.interval.as_millis() as u64;
233        let mut retry_interval = config.retry_interval;
234
235        // Update RegionAliveKeeper with the interval from Metasrv
236        self.region_alive_keeper.update_heartbeat_interval(interval);
237
238        info!(
239            "Starting heartbeat to Metasrv with config: {}. My node id is {}, address is {}.",
240            config, node_id, addr
241        );
242
243        let self_peer = Some(Peer {
244            id: node_id,
245            addr: addr.clone(),
246        });
247        let epoch = self.region_alive_keeper.epoch();
248        let workload_types = self.workload_types.clone();
249
250        self.region_alive_keeper.start(Some(event_receiver)).await?;
251        let mut last_sent = Instant::now();
252        let total_cpu_millicores = self.resource_stat.get_total_cpu_millicores();
253        let total_memory_bytes = self.resource_stat.get_total_memory_bytes();
254        let resource_stat = self.resource_stat.clone();
255        let region_alive_keeper = self.region_alive_keeper.clone();
256        let gc_limiter = self
257            .region_server
258            .mito_engine()
259            .context(RegionEngineNotFoundSnafu { name: "mito" })?
260            .gc_limiter();
261
262        common_runtime::spawn_hb(async move {
263            let sleep = tokio::time::sleep(Duration::from_millis(0));
264            tokio::pin!(sleep);
265
266            let build_info = common_version::build_info();
267
268            let heartbeat_request = HeartbeatRequest {
269                peer: self_peer,
270                node_epoch,
271                info: Some(NodeInfo {
272                    version: build_info.version.to_string(),
273                    git_commit: build_info.commit_short.to_string(),
274                    start_time_ms: node_epoch,
275                    total_cpu_millicores,
276                    total_memory_bytes,
277                    cpu_usage_millicores: 0,
278                    memory_usage_bytes: 0,
279                    // TODO(zyy17): Remove these deprecated fields when the deprecated fields are removed from the proto.
280                    cpus: total_cpu_millicores as u32,
281                    memory_bytes: total_memory_bytes as u64,
282                    hostname: hostname::get()
283                        .unwrap_or_default()
284                        .to_string_lossy()
285                        .to_string(),
286                }),
287                node_workloads: Some(NodeWorkloads::Datanode(DatanodeWorkloads {
288                    types: workload_types.iter().map(|w| w.to_i32()).collect(),
289                })),
290                ..Default::default()
291            };
292
293            loop {
294                if !running.load(Ordering::Relaxed) {
295                    info!("shutdown heartbeat task");
296                    break;
297                }
298                let req = tokio::select! {
299                    message = outgoing_rx.recv() => {
300                        if let Some(message) = message {
301                            match outgoing_message_to_mailbox_message(message) {
302                                Ok(message) => {
303                                    let mut extensions = heartbeat_request.extensions.clone();
304                                    let gc_stat = gc_limiter.gc_stat();
305                                    gc_stat.into_extensions(&mut extensions);
306
307                                    let req = HeartbeatRequest {
308                                        mailbox_message: Some(message),
309                                        extensions,
310                                        ..heartbeat_request.clone()
311                                    };
312                                    HEARTBEAT_RECV_COUNT.with_label_values(&["success"]).inc();
313                                    Some(req)
314                                }
315                                Err(e) => {
316                                    error!(e; "Failed to encode mailbox messages!");
317                                    HEARTBEAT_RECV_COUNT.with_label_values(&["error"]).inc();
318                                    None
319                                }
320                            }
321                        } else {
322                            None
323                        }
324                    }
325                    _ = &mut sleep => {
326                        let region_stats = Self::load_region_stats(&region_server_clone);
327                        let topic_stats = region_server_clone.topic_stats();
328                        let now = Instant::now();
329                        let duration_since_epoch = (now - epoch).as_millis() as u64;
330
331                        let mut extensions = heartbeat_request.extensions.clone();
332                        let gc_stat = gc_limiter.gc_stat();
333                        gc_stat.into_extensions(&mut extensions);
334
335                        let mut req = HeartbeatRequest {
336                            region_stats,
337                            topic_stats,
338                            duration_since_epoch,
339                            extensions,
340                            ..heartbeat_request.clone()
341                        };
342
343                        if let Some(info) = req.info.as_mut() {
344                            info.cpu_usage_millicores = resource_stat.get_cpu_usage_millicores();
345                            info.memory_usage_bytes = resource_stat.get_memory_usage_bytes();
346                        }
347
348                        sleep.as_mut().reset(now + Duration::from_millis(interval));
349                        Some(req)
350                    }
351                    // If the heartbeat stream is broken, send a dummy heartbeat request to re-create the heartbeat stream.
352                    _ = quit_signal.notified() => {
353                        let req = HeartbeatRequest::default();
354                        Some(req)
355                    }
356                };
357                if let Some(req) = req {
358                    metrics::LAST_SENT_HEARTBEAT_ELAPSED
359                        .set(last_sent.elapsed().as_millis() as i64);
360                    // Resets the timer.
361                    last_sent = Instant::now();
362                    debug!("Sending heartbeat request: {:?}", req);
363                    if let Err(e) = tx.send(req).await {
364                        error!(e; "Failed to send heartbeat to metasrv");
365                        match Self::create_streams(
366                            &meta_client,
367                            running.clone(),
368                            handler_executor.clone(),
369                            mailbox.clone(),
370                            None,
371                            quit_signal.clone(),
372                        )
373                        .await
374                        {
375                            Ok((new_tx, new_config)) => {
376                                info!("Reconnected to metasrv, heartbeat config: {}", new_config);
377                                tx = new_tx;
378                                // Update retry_interval from new config
379                                retry_interval = new_config.retry_interval;
380                                // Update region_alive_keeper's heartbeat interval
381                                region_alive_keeper.update_heartbeat_interval(
382                                    new_config.interval.as_millis() as u64,
383                                );
384                                // Triggers to send heartbeat immediately.
385                                sleep.as_mut().reset(Instant::now());
386                            }
387                            Err(e) => {
388                                // Before the META_LEASE_SECS expires,
389                                // any retries are meaningless, it always reads the old meta leader address.
390                                // Triggers to retry after retry_interval from Metasrv config.
391                                sleep.as_mut().reset(Instant::now() + retry_interval);
392                                error!(e; "Failed to reconnect to metasrv!");
393                            }
394                        }
395                    } else {
396                        HEARTBEAT_SENT_COUNT.inc();
397                    }
398                }
399            }
400        });
401
402        Ok(())
403    }
404
405    fn load_region_stats(region_server: &RegionServer) -> Vec<RegionStat> {
406        region_server
407            .reportable_regions()
408            .into_iter()
409            .map(|stat| {
410                let region_stat = region_server
411                    .region_statistic(stat.region_id)
412                    .unwrap_or_default();
413                let mut extensions = HashMap::new();
414                if let Some(serialized) = region_stat.serialize_to_vec() {
415                    extensions.insert(REGION_STATISTIC_KEY.to_string(), serialized);
416                }
417
418                RegionStat {
419                    region_id: stat.region_id.as_u64(),
420                    engine: stat.engine,
421                    role: RegionRole::from(stat.role).into(),
422                    // TODO(weny): w/rcus
423                    rcus: 0,
424                    wcus: 0,
425                    approximate_bytes: region_stat.estimated_disk_size() as i64,
426                    extensions,
427                }
428            })
429            .collect()
430    }
431
432    pub fn close(&self) -> Result<()> {
433        let running = self.running.clone();
434        if running
435            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
436            .is_err()
437        {
438            warn!("Call close heartbeat task multiple times");
439        }
440
441        Ok(())
442    }
443}