meta_srv/discovery/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Duration;
16
17use api::v1::meta::heartbeat_request::NodeWorkloads;
18use common_meta::DatanodeId;
19use common_meta::cluster::NodeInfo;
20use common_meta::kv_backend::KvBackendRef;
21use common_meta::peer::Peer;
22use common_time::util::{DefaultSystemTimer, SystemTimer};
23use common_workload::DatanodeWorkloadType;
24use snafu::ResultExt;
25
26use crate::discovery::lease::{LeaseValueAccessor, LeaseValueType};
27use crate::discovery::node_info::{NodeInfoAccessor, NodeInfoType};
28use crate::error::{KvBackendSnafu, Result};
29use crate::key::{DatanodeLeaseKey, LeaseValue};
30
31impl LastActiveTs for LeaseValue {
32    fn last_active_ts(&self) -> i64 {
33        self.timestamp_millis
34    }
35}
36
37impl LastActiveTs for NodeInfo {
38    fn last_active_ts(&self) -> i64 {
39        self.last_activity_ts
40    }
41}
42
43/// Trait for types that have a last active timestamp.
44pub trait LastActiveTs {
45    /// Returns the last active timestamp in milliseconds.
46    fn last_active_ts(&self) -> i64;
47}
48
49/// Builds a filter closure that checks whether a [`LastActiveTs`] item
50/// is still within the specified active duration, relative to the
51/// current time provided by the given [`SystemTimer`].
52///
53/// The returned closure uses the timestamp at the time of building,
54/// so the "now" reference point is fixed when this function is called.
55pub fn build_active_filter<T: LastActiveTs>(
56    timer: impl SystemTimer,
57    active_duration: Duration,
58) -> impl Fn(&T) -> bool {
59    let now = timer.current_time_millis();
60    let active_duration = active_duration.as_millis() as u64;
61    move |item: &T| {
62        let elapsed = now.saturating_sub(item.last_active_ts()) as u64;
63        elapsed < active_duration
64    }
65}
66
67/// Returns the alive datanodes.
68pub async fn alive_datanodes(
69    accessor: &impl LeaseValueAccessor,
70    active_duration: Duration,
71    condition: Option<fn(&NodeWorkloads) -> bool>,
72) -> Result<Vec<Peer>> {
73    let active_filter = build_active_filter(DefaultSystemTimer, active_duration);
74    let condition = condition.unwrap_or(|_| true);
75    Ok(accessor
76        .lease_values(LeaseValueType::Datanode)
77        .await?
78        .into_iter()
79        .filter_map(|(peer_id, lease_value)| {
80            if active_filter(&lease_value) && condition(&lease_value.workloads) {
81                Some(Peer::new(peer_id, lease_value.node_addr))
82            } else {
83                None
84            }
85        })
86        .collect::<Vec<_>>())
87}
88
89/// Returns the alive flownodes.
90pub async fn alive_flownodes(
91    accessor: &impl LeaseValueAccessor,
92    active_duration: Duration,
93    condition: Option<fn(&NodeWorkloads) -> bool>,
94) -> Result<Vec<Peer>> {
95    let active_filter = build_active_filter(DefaultSystemTimer, active_duration);
96    let condition = condition.unwrap_or(|_| true);
97    Ok(accessor
98        .lease_values(LeaseValueType::Flownode)
99        .await?
100        .into_iter()
101        .filter_map(|(peer_id, lease_value)| {
102            if active_filter(&lease_value) && condition(&lease_value.workloads) {
103                Some(Peer::new(peer_id, lease_value.node_addr))
104            } else {
105                None
106            }
107        })
108        .collect::<Vec<_>>())
109}
110
111/// Returns the alive frontends.
112pub async fn alive_frontends(
113    lister: &impl NodeInfoAccessor,
114    active_duration: Duration,
115) -> Result<Vec<Peer>> {
116    let active_filter = build_active_filter(DefaultSystemTimer, active_duration);
117    Ok(lister
118        .node_infos(NodeInfoType::Frontend)
119        .await?
120        .into_iter()
121        .filter_map(|(_, node_info)| {
122            if active_filter(&node_info) {
123                Some(node_info.peer)
124            } else {
125                None
126            }
127        })
128        .collect::<Vec<_>>())
129}
130
131/// Returns the alive datanode peer.
132pub async fn alive_datanode(
133    lister: &impl LeaseValueAccessor,
134    peer_id: u64,
135    active_duration: Duration,
136) -> Result<Option<Peer>> {
137    let active_filter = build_active_filter(DefaultSystemTimer, active_duration);
138    let v = lister
139        .lease_value(LeaseValueType::Datanode, peer_id)
140        .await?
141        .filter(|(_, lease)| active_filter(lease))
142        .map(|(peer_id, lease)| Peer::new(peer_id, lease.node_addr));
143
144    Ok(v)
145}
146
147/// Determines if a datanode is capable of accepting ingest workloads.
148/// Returns `true` if the datanode's workload types include ingest capability,
149/// or if the node is not of type [NodeWorkloads::Datanode].
150///
151/// A datanode is considered to accept ingest workload if it supports either:
152/// - Hybrid workload (both ingest and query workloads)
153/// - Ingest workload (only ingest workload)
154pub fn accept_ingest_workload(datanode_workloads: &NodeWorkloads) -> bool {
155    match &datanode_workloads {
156        NodeWorkloads::Datanode(workloads) => workloads
157            .types
158            .iter()
159            .filter_map(|w| DatanodeWorkloadType::from_i32(*w))
160            .any(|w| w.accept_ingest()),
161        // If the [NodeWorkloads] type is not [NodeWorkloads::Datanode], returns true.
162        _ => true,
163    }
164}
165
166/// Returns the lease value of the given datanode id, if the datanode is not found, returns None.
167pub async fn find_datanode_lease_value(
168    in_memory: &KvBackendRef,
169    datanode_id: DatanodeId,
170) -> Result<Option<LeaseValue>> {
171    let lease_key = DatanodeLeaseKey {
172        node_id: datanode_id,
173    };
174    let lease_key_bytes: Vec<u8> = lease_key.try_into()?;
175    let Some(kv) = in_memory
176        .get(&lease_key_bytes)
177        .await
178        .context(KvBackendSnafu)?
179    else {
180        return Ok(None);
181    };
182
183    let lease_value: LeaseValue = kv.value.try_into()?;
184    Ok(Some(lease_value))
185}