common_meta/peer.rs
1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18pub use api::v1::meta::Peer;
19use api::v1::meta::heartbeat_request::NodeWorkloads;
20
21use crate::cluster::NodeInfo;
22use crate::error::Error;
23use crate::{DatanodeId, FlownodeId};
24
25/// [`PeerResolver`] provides methods to look up peer information by node ID.
26///
27/// This trait allows resolving both datanode and flownode peers, regardless of their current activity status.
28/// Returned peers may be inactive (i.e., not currently alive in the cluster).
29#[async_trait::async_trait]
30pub trait PeerResolver: Send + Sync {
31 /// Looks up a datanode peer by its ID.
32 ///
33 /// Returns `Some(Peer)` if the datanode exists, or `None` if not found.
34 /// The returned peer may be inactive.
35 async fn datanode(&self, id: DatanodeId) -> Result<Option<Peer>, Error>;
36
37 /// Looks up a flownode peer by its ID.
38 ///
39 /// Returns `Some(Peer)` if the flownode exists, or `None` if not found.
40 /// The returned peer may be inactive.
41 async fn flownode(&self, id: FlownodeId) -> Result<Option<Peer>, Error>;
42}
43
44pub type PeerResolverRef = Arc<dyn PeerResolver>;
45
46/// [`PeerDiscovery`] is a service for discovering active peers in the cluster.
47#[async_trait::async_trait]
48pub trait PeerDiscovery: Send + Sync {
49 /// Returns all currently active frontend nodes.
50 ///
51 /// A frontend is considered active if it has reported a heartbeat within the most recent heartbeat interval,
52 /// as determined by the in-memory backend.
53 async fn active_frontends(&self) -> Result<Vec<NodeInfo>, Error>;
54
55 /// Returns all currently active datanodes, optionally filtered by a predicate on their workloads.
56 ///
57 /// A datanode is considered active if it has reported a heartbeat within the most recent heartbeat interval,
58 /// as determined by the in-memory backend.
59 /// The optional `filter` allows further selection based on the node's workloads.
60 async fn active_datanodes(
61 &self,
62 filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
63 ) -> Result<Vec<NodeInfo>, Error>;
64
65 /// Returns all currently active flownodes, optionally filtered by a predicate on their workloads.
66 ///
67 /// A flownode is considered active if it has reported a heartbeat within the most recent heartbeat interval,
68 /// as determined by the in-memory backend.
69 /// The optional `filter` allows further selection based on the node's workloads.
70 async fn active_flownodes(
71 &self,
72 filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
73 ) -> Result<Vec<NodeInfo>, Error>;
74}
75
76pub type PeerDiscoveryRef = Arc<dyn PeerDiscovery>;
77
78#[derive(Debug, Clone, Default)]
79pub struct PeerAllocContext {
80 pub extensions: HashMap<String, String>,
81}
82
83/// [`PeerAllocator`] allocates [`Peer`]s for creating region or flow.
84#[async_trait::async_trait]
85pub trait PeerAllocator: Send + Sync {
86 async fn alloc(&self, num: usize, ctx: &PeerAllocContext) -> Result<Vec<Peer>, Error>;
87}
88
89pub type PeerAllocatorRef = Arc<dyn PeerAllocator>;
90
91#[async_trait::async_trait]
92impl<T: PeerAllocator + ?Sized> PeerAllocator for Arc<T> {
93 async fn alloc(&self, num: usize, ctx: &PeerAllocContext) -> Result<Vec<Peer>, Error> {
94 T::alloc(self, num, ctx).await
95 }
96}
97
98pub struct NoopPeerAllocator;
99
100#[async_trait::async_trait]
101impl PeerAllocator for NoopPeerAllocator {
102 async fn alloc(&self, num: usize, _ctx: &PeerAllocContext) -> Result<Vec<Peer>, Error> {
103 Ok(vec![Peer::default(); num])
104 }
105}