meta_srv/discovery/
node_info.rs1use common_meta::cluster::{NodeInfo, NodeInfoKey, Role};
16use common_meta::kv_backend::KvBackend;
17use common_meta::rpc::store::RangeRequest;
18use snafu::ResultExt;
19
20use crate::cluster::MetaPeerClient;
21use crate::error::{InvalidNodeInfoFormatSnafu, Result};
22
23#[derive(Clone, Copy)]
24pub enum NodeInfoType {
25 Frontend,
26 Datanode,
27 Flownode,
28}
29
30impl From<NodeInfoType> for Role {
31 fn from(node_info_type: NodeInfoType) -> Self {
32 match node_info_type {
33 NodeInfoType::Frontend => Role::Frontend,
34 NodeInfoType::Datanode => Role::Datanode,
35 NodeInfoType::Flownode => Role::Flownode,
36 }
37 }
38}
39
40#[async_trait::async_trait]
42pub trait NodeInfoAccessor: Send + Sync {
43 async fn node_infos(&self, node_info_type: NodeInfoType) -> Result<Vec<(u64, NodeInfo)>>;
45}
46
47#[async_trait::async_trait]
48impl NodeInfoAccessor for MetaPeerClient {
49 async fn node_infos(&self, node_info_type: NodeInfoType) -> Result<Vec<(u64, NodeInfo)>> {
50 let range_request = RangeRequest::new()
51 .with_prefix(NodeInfoKey::key_prefix_with_role(node_info_type.into()));
52 let response = self.range(range_request).await?;
53
54 response
55 .kvs
56 .into_iter()
57 .map(|kv| {
58 let node_info = NodeInfo::try_from(kv.value).context(InvalidNodeInfoFormatSnafu)?;
59 Ok((node_info.peer.id, node_info))
60 })
61 .collect::<Result<Vec<_>>>()
62 }
63}