Skip to main content

common_meta/
peer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18pub use api::v1::meta::Peer;
19use api::v1::meta::heartbeat_request::NodeWorkloads;
20
21use crate::cluster::NodeInfo;
22use crate::error::Error;
23use crate::{DatanodeId, FlownodeId};
24
25/// [`PeerResolver`] provides methods to look up peer information by node ID.
26///
27/// This trait allows resolving both datanode and flownode peers, regardless of their current activity status.
28/// Returned peers may be inactive (i.e., not currently alive in the cluster).
29#[async_trait::async_trait]
30pub trait PeerResolver: Send + Sync {
31    /// Looks up a datanode peer by its ID.
32    ///
33    /// Returns `Some(Peer)` if the datanode exists, or `None` if not found.
34    /// The returned peer may be inactive.
35    async fn datanode(&self, id: DatanodeId) -> Result<Option<Peer>, Error>;
36
37    /// Looks up a flownode peer by its ID.
38    ///
39    /// Returns `Some(Peer)` if the flownode exists, or `None` if not found.
40    /// The returned peer may be inactive.
41    async fn flownode(&self, id: FlownodeId) -> Result<Option<Peer>, Error>;
42}
43
44pub type PeerResolverRef = Arc<dyn PeerResolver>;
45
46/// [`PeerDiscovery`] is a service for discovering active peers in the cluster.
47#[async_trait::async_trait]
48pub trait PeerDiscovery: Send + Sync {
49    /// Returns all currently active frontend nodes.
50    ///
51    /// A frontend is considered active if it has reported a heartbeat within the most recent heartbeat interval,
52    /// as determined by the in-memory backend.
53    async fn active_frontends(&self) -> Result<Vec<NodeInfo>, Error>;
54
55    /// Returns all currently active datanodes, optionally filtered by a predicate on their workloads.
56    ///
57    /// A datanode is considered active if it has reported a heartbeat within the most recent heartbeat interval,
58    /// as determined by the in-memory backend.
59    /// The optional `filter` allows further selection based on the node's workloads.
60    async fn active_datanodes(
61        &self,
62        filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
63    ) -> Result<Vec<NodeInfo>, Error>;
64
65    /// Returns all currently active flownodes, optionally filtered by a predicate on their workloads.
66    ///
67    /// A flownode is considered active if it has reported a heartbeat within the most recent heartbeat interval,
68    /// as determined by the in-memory backend.
69    /// The optional `filter` allows further selection based on the node's workloads.
70    async fn active_flownodes(
71        &self,
72        filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
73    ) -> Result<Vec<NodeInfo>, Error>;
74}
75
76pub type PeerDiscoveryRef = Arc<dyn PeerDiscovery>;
77
78#[derive(Debug, Clone, Default)]
79pub struct PeerAllocContext {
80    pub extensions: HashMap<String, String>,
81}
82
83/// [`PeerAllocator`] allocates [`Peer`]s for creating region or flow.
84#[async_trait::async_trait]
85pub trait PeerAllocator: Send + Sync {
86    async fn alloc(&self, num: usize, ctx: &PeerAllocContext) -> Result<Vec<Peer>, Error>;
87}
88
89pub type PeerAllocatorRef = Arc<dyn PeerAllocator>;
90
91#[async_trait::async_trait]
92impl<T: PeerAllocator + ?Sized> PeerAllocator for Arc<T> {
93    async fn alloc(&self, num: usize, ctx: &PeerAllocContext) -> Result<Vec<Peer>, Error> {
94        T::alloc(self, num, ctx).await
95    }
96}
97
98pub struct NoopPeerAllocator;
99
100#[async_trait::async_trait]
101impl PeerAllocator for NoopPeerAllocator {
102    async fn alloc(&self, num: usize, _ctx: &PeerAllocContext) -> Result<Vec<Peer>, Error> {
103        Ok(vec![Peer::default(); num])
104    }
105}