1pub mod lease;
16pub mod node_info;
17pub mod utils;
18
19use api::v1::meta::heartbeat_request::NodeWorkloads;
20use common_error::ext::BoxedError;
21use common_meta::cluster::NodeInfo;
22use common_meta::distributed_time_constants::default_distributed_time_constants;
23use common_meta::error::Result;
24use common_meta::peer::{Peer, PeerDiscovery, PeerResolver};
25use common_meta::{DatanodeId, FlownodeId};
26use common_time::util::DefaultSystemTimer;
27use snafu::ResultExt;
28
29use crate::cluster::MetaPeerClient;
30use crate::discovery::lease::{LeaseValueAccessor, LeaseValueType};
31
32#[async_trait::async_trait]
33impl PeerDiscovery for MetaPeerClient {
34 async fn active_frontends(&self) -> Result<Vec<NodeInfo>> {
35 utils::alive_frontend_infos(
36 &DefaultSystemTimer,
37 self,
38 default_distributed_time_constants().frontend_heartbeat_interval,
39 )
40 .await
41 .map_err(BoxedError::new)
42 .context(common_meta::error::ExternalSnafu)
43 }
44
45 async fn active_datanodes(
46 &self,
47 filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
48 ) -> Result<Vec<NodeInfo>> {
49 utils::alive_datanode_infos(
50 &DefaultSystemTimer,
51 self,
52 default_distributed_time_constants().datanode_lease,
53 filter,
54 )
55 .await
56 .map_err(BoxedError::new)
57 .context(common_meta::error::ExternalSnafu)
58 }
59
60 async fn active_flownodes(
61 &self,
62 filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
63 ) -> Result<Vec<NodeInfo>> {
64 utils::alive_flownode_infos(
65 &DefaultSystemTimer,
66 self,
67 default_distributed_time_constants().flownode_lease,
68 filter,
69 )
70 .await
71 .map_err(BoxedError::new)
72 .context(common_meta::error::ExternalSnafu)
73 }
74}
75
76#[async_trait::async_trait]
77impl PeerResolver for MetaPeerClient {
78 async fn datanode(&self, id: DatanodeId) -> Result<Option<Peer>> {
79 let peer = self
80 .lease_value(LeaseValueType::Datanode, id)
81 .await
82 .map_err(BoxedError::new)
83 .context(common_meta::error::ExternalSnafu)?
84 .map(|(peer_id, lease)| Peer::new(peer_id, lease.node_addr));
85 Ok(peer)
86 }
87
88 async fn flownode(&self, id: FlownodeId) -> Result<Option<Peer>> {
89 let peer = self
90 .lease_value(LeaseValueType::Flownode, id)
91 .await
92 .map_err(BoxedError::new)
93 .context(common_meta::error::ExternalSnafu)?
94 .map(|(peer_id, lease)| Peer::new(peer_id, lease.node_addr));
95 Ok(peer)
96 }
97}