1pub mod lease;
16pub mod node_info;
17pub mod utils;
18
19use std::time::Duration;
20
21use api::v1::meta::heartbeat_request::NodeWorkloads;
22use common_error::ext::BoxedError;
23use common_meta::distributed_time_constants::{
24 DATANODE_LEASE_SECS, FLOWNODE_LEASE_SECS, FRONTEND_HEARTBEAT_INTERVAL_MILLIS,
25};
26use common_meta::error::Result;
27use common_meta::peer::{Peer, PeerDiscovery, PeerResolver};
28use common_meta::{DatanodeId, FlownodeId};
29use snafu::ResultExt;
30
31use crate::cluster::MetaPeerClient;
32use crate::discovery::lease::{LeaseValueAccessor, LeaseValueType};
33
34#[async_trait::async_trait]
35impl PeerDiscovery for MetaPeerClient {
36 async fn active_frontends(&self) -> Result<Vec<Peer>> {
37 utils::alive_frontends(
38 self,
39 Duration::from_millis(FRONTEND_HEARTBEAT_INTERVAL_MILLIS),
40 )
41 .await
42 .map_err(BoxedError::new)
43 .context(common_meta::error::ExternalSnafu)
44 }
45
46 async fn active_datanodes(
47 &self,
48 filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
49 ) -> Result<Vec<Peer>> {
50 utils::alive_datanodes(self, Duration::from_secs(DATANODE_LEASE_SECS), filter)
51 .await
52 .map_err(BoxedError::new)
53 .context(common_meta::error::ExternalSnafu)
54 }
55
56 async fn active_flownodes(
57 &self,
58 filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
59 ) -> Result<Vec<Peer>> {
60 utils::alive_flownodes(self, Duration::from_secs(FLOWNODE_LEASE_SECS), filter)
61 .await
62 .map_err(BoxedError::new)
63 .context(common_meta::error::ExternalSnafu)
64 }
65}
66
67#[async_trait::async_trait]
68impl PeerResolver for MetaPeerClient {
69 async fn datanode(&self, id: DatanodeId) -> Result<Option<Peer>> {
70 let peer = self
71 .lease_value(LeaseValueType::Datanode, id)
72 .await
73 .map_err(BoxedError::new)
74 .context(common_meta::error::ExternalSnafu)?
75 .map(|(peer_id, lease)| Peer::new(peer_id, lease.node_addr));
76 Ok(peer)
77 }
78
79 async fn flownode(&self, id: FlownodeId) -> Result<Option<Peer>> {
80 let peer = self
81 .lease_value(LeaseValueType::Flownode, id)
82 .await
83 .map_err(BoxedError::new)
84 .context(common_meta::error::ExternalSnafu)?
85 .map(|(peer_id, lease)| Peer::new(peer_id, lease.node_addr));
86 Ok(peer)
87 }
88}