meta_srv/discovery/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Duration;
16
17use api::v1::meta::heartbeat_request::NodeWorkloads;
18use common_meta::DatanodeId;
19use common_meta::cluster::NodeInfo;
20use common_meta::kv_backend::KvBackendRef;
21use common_meta::peer::Peer;
22use common_time::util::SystemTimer;
23use common_workload::DatanodeWorkloadType;
24use snafu::ResultExt;
25
26use crate::discovery::lease::{LeaseValueAccessor, LeaseValueType};
27use crate::discovery::node_info::{NodeInfoAccessor, NodeInfoType};
28use crate::error::{KvBackendSnafu, Result};
29use crate::key::{DatanodeLeaseKey, LeaseValue};
30
31impl LastActiveTs for LeaseValue {
32    fn last_active_ts(&self) -> i64 {
33        self.timestamp_millis
34    }
35}
36
37impl LastActiveTs for NodeInfo {
38    fn last_active_ts(&self) -> i64 {
39        self.last_activity_ts
40    }
41}
42
43/// Trait for types that have a last active timestamp.
44pub trait LastActiveTs {
45    /// Returns the last active timestamp in milliseconds.
46    fn last_active_ts(&self) -> i64;
47}
48
49/// Builds a filter closure that checks whether a [`LastActiveTs`] item
50/// is still within the specified active duration, relative to the
51/// current time provided by the given [`SystemTimer`].
52pub fn build_active_filter<T: LastActiveTs>(active_duration: Duration) -> impl Fn(i64, &T) -> bool {
53    move |now: i64, item: &T| {
54        let active_duration = active_duration.as_millis() as u64;
55        let elapsed = now.saturating_sub(item.last_active_ts()) as u64;
56        elapsed < active_duration
57    }
58}
59
60/// Returns the alive datanodes.
61pub async fn alive_datanodes(
62    timer: &impl SystemTimer,
63    accessor: &impl LeaseValueAccessor,
64    active_duration: Duration,
65    condition: Option<fn(&NodeWorkloads) -> bool>,
66) -> Result<Vec<Peer>> {
67    let active_filter = build_active_filter(active_duration);
68    let condition = condition.unwrap_or(|_| true);
69    let lease_values = accessor.lease_values(LeaseValueType::Datanode).await?;
70    let now = timer.current_time_millis();
71    Ok(lease_values
72        .into_iter()
73        .filter_map(|(peer_id, lease_value)| {
74            if active_filter(now, &lease_value) && condition(&lease_value.workloads) {
75                Some(Peer::new(peer_id, lease_value.node_addr))
76            } else {
77                None
78            }
79        })
80        .collect::<Vec<_>>())
81}
82
83/// Returns the alive flownodes.
84pub async fn alive_flownodes(
85    timer: &impl SystemTimer,
86    accessor: &impl LeaseValueAccessor,
87    active_duration: Duration,
88    condition: Option<fn(&NodeWorkloads) -> bool>,
89) -> Result<Vec<Peer>> {
90    let active_filter = build_active_filter(active_duration);
91    let condition = condition.unwrap_or(|_| true);
92    let lease_values = accessor.lease_values(LeaseValueType::Flownode).await?;
93    let now = timer.current_time_millis();
94    Ok(lease_values
95        .into_iter()
96        .filter_map(|(peer_id, lease_value)| {
97            if active_filter(now, &lease_value) && condition(&lease_value.workloads) {
98                Some(Peer::new(peer_id, lease_value.node_addr))
99            } else {
100                None
101            }
102        })
103        .collect::<Vec<_>>())
104}
105
106/// Returns the alive frontends.
107pub async fn alive_frontends(
108    timer: &impl SystemTimer,
109    lister: &impl NodeInfoAccessor,
110    active_duration: Duration,
111) -> Result<Vec<Peer>> {
112    let active_filter = build_active_filter(active_duration);
113    let node_infos = lister.node_infos(NodeInfoType::Frontend).await?;
114    let now = timer.current_time_millis();
115    Ok(node_infos
116        .into_iter()
117        .filter_map(|(_, node_info)| {
118            if active_filter(now, &node_info) {
119                Some(node_info.peer)
120            } else {
121                None
122            }
123        })
124        .collect::<Vec<_>>())
125}
126
127/// Returns the alive datanode peer.
128pub async fn alive_datanode(
129    timer: &impl SystemTimer,
130    lister: &impl LeaseValueAccessor,
131    peer_id: u64,
132    active_duration: Duration,
133) -> Result<Option<Peer>> {
134    let active_filter = build_active_filter(active_duration);
135    let lease_value = lister
136        .lease_value(LeaseValueType::Datanode, peer_id)
137        .await?;
138    let now = timer.current_time_millis();
139    let v = lease_value
140        .filter(|(_, lease)| active_filter(now, lease))
141        .map(|(peer_id, lease)| Peer::new(peer_id, lease.node_addr));
142
143    Ok(v)
144}
145
146/// Determines if a datanode is capable of accepting ingest workloads.
147/// Returns `true` if the datanode's workload types include ingest capability,
148/// or if the node is not of type [NodeWorkloads::Datanode].
149///
150/// A datanode is considered to accept ingest workload if it supports either:
151/// - Hybrid workload (both ingest and query workloads)
152/// - Ingest workload (only ingest workload)
153pub fn accept_ingest_workload(datanode_workloads: &NodeWorkloads) -> bool {
154    match &datanode_workloads {
155        NodeWorkloads::Datanode(workloads) => workloads
156            .types
157            .iter()
158            .filter_map(|w| DatanodeWorkloadType::from_i32(*w))
159            .any(|w| w.accept_ingest()),
160        // If the [NodeWorkloads] type is not [NodeWorkloads::Datanode], returns true.
161        _ => true,
162    }
163}
164
165/// Returns the lease value of the given datanode id, if the datanode is not found, returns None.
166pub async fn find_datanode_lease_value(
167    in_memory: &KvBackendRef,
168    datanode_id: DatanodeId,
169) -> Result<Option<LeaseValue>> {
170    let lease_key = DatanodeLeaseKey {
171        node_id: datanode_id,
172    };
173    let lease_key_bytes: Vec<u8> = lease_key.try_into()?;
174    let Some(kv) = in_memory
175        .get(&lease_key_bytes)
176        .await
177        .context(KvBackendSnafu)?
178    else {
179        return Ok(None);
180    };
181
182    let lease_value: LeaseValue = kv.value.try_into()?;
183    Ok(Some(lease_value))
184}