1pub mod lease;
16pub mod node_info;
17pub mod utils;
18
19use std::time::Duration;
20
21use api::v1::meta::heartbeat_request::NodeWorkloads;
22use common_error::ext::BoxedError;
23use common_meta::distributed_time_constants::{
24 DATANODE_LEASE_SECS, FLOWNODE_LEASE_SECS, FRONTEND_HEARTBEAT_INTERVAL_MILLIS,
25};
26use common_meta::error::Result;
27use common_meta::peer::{Peer, PeerDiscovery, PeerResolver};
28use common_meta::{DatanodeId, FlownodeId};
29use common_time::util::DefaultSystemTimer;
30use snafu::ResultExt;
31
32use crate::cluster::MetaPeerClient;
33use crate::discovery::lease::{LeaseValueAccessor, LeaseValueType};
34
35#[async_trait::async_trait]
36impl PeerDiscovery for MetaPeerClient {
37 async fn active_frontends(&self) -> Result<Vec<Peer>> {
38 utils::alive_frontends(
39 &DefaultSystemTimer,
40 self,
41 Duration::from_millis(FRONTEND_HEARTBEAT_INTERVAL_MILLIS),
42 )
43 .await
44 .map_err(BoxedError::new)
45 .context(common_meta::error::ExternalSnafu)
46 }
47
48 async fn active_datanodes(
49 &self,
50 filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
51 ) -> Result<Vec<Peer>> {
52 utils::alive_datanodes(
53 &DefaultSystemTimer,
54 self,
55 Duration::from_secs(DATANODE_LEASE_SECS),
56 filter,
57 )
58 .await
59 .map_err(BoxedError::new)
60 .context(common_meta::error::ExternalSnafu)
61 }
62
63 async fn active_flownodes(
64 &self,
65 filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
66 ) -> Result<Vec<Peer>> {
67 utils::alive_flownodes(
68 &DefaultSystemTimer,
69 self,
70 Duration::from_secs(FLOWNODE_LEASE_SECS),
71 filter,
72 )
73 .await
74 .map_err(BoxedError::new)
75 .context(common_meta::error::ExternalSnafu)
76 }
77}
78
79#[async_trait::async_trait]
80impl PeerResolver for MetaPeerClient {
81 async fn datanode(&self, id: DatanodeId) -> Result<Option<Peer>> {
82 let peer = self
83 .lease_value(LeaseValueType::Datanode, id)
84 .await
85 .map_err(BoxedError::new)
86 .context(common_meta::error::ExternalSnafu)?
87 .map(|(peer_id, lease)| Peer::new(peer_id, lease.node_addr));
88 Ok(peer)
89 }
90
91 async fn flownode(&self, id: FlownodeId) -> Result<Option<Peer>> {
92 let peer = self
93 .lease_value(LeaseValueType::Flownode, id)
94 .await
95 .map_err(BoxedError::new)
96 .context(common_meta::error::ExternalSnafu)?
97 .map(|(peer_id, lease)| Peer::new(peer_id, lease.node_addr));
98 Ok(peer)
99 }
100}