1pub mod lease;
16pub mod node_info;
17pub mod utils;
18
19use api::v1::meta::heartbeat_request::NodeWorkloads;
20use common_error::ext::BoxedError;
21use common_meta::distributed_time_constants::default_distributed_time_constants;
22use common_meta::error::Result;
23use common_meta::peer::{Peer, PeerDiscovery, PeerResolver};
24use common_meta::{DatanodeId, FlownodeId};
25use common_time::util::DefaultSystemTimer;
26use snafu::ResultExt;
27
28use crate::cluster::MetaPeerClient;
29use crate::discovery::lease::{LeaseValueAccessor, LeaseValueType};
30
31#[async_trait::async_trait]
32impl PeerDiscovery for MetaPeerClient {
33 async fn active_frontends(&self) -> Result<Vec<Peer>> {
34 utils::alive_frontends(
35 &DefaultSystemTimer,
36 self,
37 default_distributed_time_constants().frontend_heartbeat_interval,
38 )
39 .await
40 .map_err(BoxedError::new)
41 .context(common_meta::error::ExternalSnafu)
42 }
43
44 async fn active_datanodes(
45 &self,
46 filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
47 ) -> Result<Vec<Peer>> {
48 utils::alive_datanodes(
49 &DefaultSystemTimer,
50 self,
51 default_distributed_time_constants().datanode_lease,
52 filter,
53 )
54 .await
55 .map_err(BoxedError::new)
56 .context(common_meta::error::ExternalSnafu)
57 }
58
59 async fn active_flownodes(
60 &self,
61 filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
62 ) -> Result<Vec<Peer>> {
63 utils::alive_flownodes(
64 &DefaultSystemTimer,
65 self,
66 default_distributed_time_constants().flownode_lease,
67 filter,
68 )
69 .await
70 .map_err(BoxedError::new)
71 .context(common_meta::error::ExternalSnafu)
72 }
73}
74
75#[async_trait::async_trait]
76impl PeerResolver for MetaPeerClient {
77 async fn datanode(&self, id: DatanodeId) -> Result<Option<Peer>> {
78 let peer = self
79 .lease_value(LeaseValueType::Datanode, id)
80 .await
81 .map_err(BoxedError::new)
82 .context(common_meta::error::ExternalSnafu)?
83 .map(|(peer_id, lease)| Peer::new(peer_id, lease.node_addr));
84 Ok(peer)
85 }
86
87 async fn flownode(&self, id: FlownodeId) -> Result<Option<Peer>> {
88 let peer = self
89 .lease_value(LeaseValueType::Flownode, id)
90 .await
91 .map_err(BoxedError::new)
92 .context(common_meta::error::ExternalSnafu)?
93 .map(|(peer_id, lease)| Peer::new(peer_id, lease.node_addr));
94 Ok(peer)
95 }
96}