Skip to main content

meta_srv/discovery/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Duration;
16
17use api::v1::meta::heartbeat_request::NodeWorkloads;
18use common_meta::DatanodeId;
19use common_meta::cluster::{NodeInfo, NodeStatus};
20use common_meta::kv_backend::KvBackendRef;
21use common_meta::peer::Peer;
22use common_time::util::SystemTimer;
23use common_workload::DatanodeWorkloadType;
24use snafu::ResultExt;
25
26use crate::discovery::lease::{LeaseValueAccessor, LeaseValueType};
27use crate::discovery::node_info::{NodeInfoAccessor, NodeInfoType};
28use crate::error::{KvBackendSnafu, Result};
29use crate::key::{DatanodeLeaseKey, LeaseValue};
30
31impl LastActiveTs for LeaseValue {
32    fn last_active_ts(&self) -> i64 {
33        self.timestamp_millis
34    }
35}
36
37impl LastActiveTs for NodeInfo {
38    fn last_active_ts(&self) -> i64 {
39        self.last_activity_ts
40    }
41}
42
43/// Trait for types that have a last active timestamp.
44pub trait LastActiveTs {
45    /// Returns the last active timestamp in milliseconds.
46    fn last_active_ts(&self) -> i64;
47}
48
49/// Builds a filter closure that checks whether a [`LastActiveTs`] item
50/// is still within the specified active duration, relative to the
51/// current time provided by the given [`SystemTimer`].
52pub fn build_active_filter<T: LastActiveTs>(active_duration: Duration) -> impl Fn(i64, &T) -> bool {
53    move |now: i64, item: &T| {
54        let active_duration = active_duration.as_millis() as u64;
55        let elapsed = now.saturating_sub(item.last_active_ts()) as u64;
56        elapsed < active_duration
57    }
58}
59
60/// Returns the alive frontend node infos.
61pub async fn alive_frontend_infos(
62    timer: &impl SystemTimer,
63    lister: &impl NodeInfoAccessor,
64    active_duration: Duration,
65) -> Result<Vec<NodeInfo>> {
66    alive_node_infos(timer, lister, NodeInfoType::Frontend, active_duration, None).await
67}
68
69/// Returns the alive datanode node infos.
70pub async fn alive_datanode_infos(
71    timer: &impl SystemTimer,
72    lister: &impl NodeInfoAccessor,
73    active_duration: Duration,
74    condition: Option<fn(&NodeWorkloads) -> bool>,
75) -> Result<Vec<NodeInfo>> {
76    alive_node_infos(
77        timer,
78        lister,
79        NodeInfoType::Datanode,
80        active_duration,
81        condition,
82    )
83    .await
84}
85
86/// Returns the alive flownode node infos.
87pub async fn alive_flownode_infos(
88    timer: &impl SystemTimer,
89    lister: &impl NodeInfoAccessor,
90    active_duration: Duration,
91    condition: Option<fn(&NodeWorkloads) -> bool>,
92) -> Result<Vec<NodeInfo>> {
93    alive_node_infos(
94        timer,
95        lister,
96        NodeInfoType::Flownode,
97        active_duration,
98        condition,
99    )
100    .await
101}
102
103async fn alive_node_infos(
104    timer: &impl SystemTimer,
105    lister: &impl NodeInfoAccessor,
106    node_info_type: NodeInfoType,
107    active_duration: Duration,
108    condition: Option<fn(&NodeWorkloads) -> bool>,
109) -> Result<Vec<NodeInfo>> {
110    let active_filter = build_active_filter(active_duration);
111    let node_infos = lister.node_infos(node_info_type).await?;
112    let now = timer.current_time_millis();
113    Ok(node_infos
114        .into_iter()
115        .filter_map(|(_, node_info)| {
116            if !active_filter(now, &node_info) {
117                return None;
118            }
119
120            match (&node_info.status, condition) {
121                (NodeStatus::Frontend(_), None) => Some(node_info),
122                (NodeStatus::Frontend(status), Some(condition)) => {
123                    let workloads = NodeWorkloads::Frontend(status.workloads.clone());
124                    condition(&workloads).then_some(node_info)
125                }
126                (NodeStatus::Datanode(status), Some(condition)) => {
127                    let workloads = NodeWorkloads::Datanode(status.workloads.clone());
128                    condition(&workloads).then_some(node_info)
129                }
130                (NodeStatus::Flownode(status), Some(condition)) => {
131                    let workloads = NodeWorkloads::Flownode(status.workloads.clone());
132                    condition(&workloads).then_some(node_info)
133                }
134                (NodeStatus::Datanode(_), None) | (NodeStatus::Flownode(_), None) => {
135                    Some(node_info)
136                }
137                _ => None,
138            }
139        })
140        .collect::<Vec<_>>())
141}
142
143/// Returns the alive datanode peer.
144pub async fn alive_datanode(
145    timer: &impl SystemTimer,
146    lister: &impl LeaseValueAccessor,
147    peer_id: u64,
148    active_duration: Duration,
149) -> Result<Option<Peer>> {
150    let active_filter = build_active_filter(active_duration);
151    let lease_value = lister
152        .lease_value(LeaseValueType::Datanode, peer_id)
153        .await?;
154    let now = timer.current_time_millis();
155    let v = lease_value
156        .filter(|(_, lease)| active_filter(now, lease))
157        .map(|(peer_id, lease)| Peer::new(peer_id, lease.node_addr));
158
159    Ok(v)
160}
161
162/// Determines if a datanode is capable of accepting ingest workloads.
163/// Returns `true` if the datanode's workload types include ingest capability,
164/// or if the node is not of type [NodeWorkloads::Datanode].
165///
166/// A datanode is considered to accept ingest workload if it supports either:
167/// - Hybrid workload (both ingest and query workloads)
168/// - Ingest workload (only ingest workload)
169pub fn accept_ingest_workload(datanode_workloads: &NodeWorkloads) -> bool {
170    match &datanode_workloads {
171        NodeWorkloads::Datanode(workloads) => workloads
172            .types
173            .iter()
174            .filter_map(|w| DatanodeWorkloadType::from_i32(*w))
175            .any(|w| w.accept_ingest()),
176        // If the [NodeWorkloads] type is not [NodeWorkloads::Datanode], returns true.
177        _ => true,
178    }
179}
180
181/// Returns the lease value of the given datanode id, if the datanode is not found, returns None.
182pub async fn find_datanode_lease_value(
183    in_memory: &KvBackendRef,
184    datanode_id: DatanodeId,
185) -> Result<Option<LeaseValue>> {
186    let lease_key = DatanodeLeaseKey {
187        node_id: datanode_id,
188    };
189    let lease_key_bytes: Vec<u8> = lease_key.try_into()?;
190    let Some(kv) = in_memory
191        .get(&lease_key_bytes)
192        .await
193        .context(KvBackendSnafu)?
194    else {
195        return Ok(None);
196    };
197
198    let lease_value: LeaseValue = kv.value.try_into()?;
199    Ok(Some(lease_value))
200}