meta_srv/
discovery.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod lease;
16pub mod node_info;
17pub mod utils;
18
19use std::time::Duration;
20
21use api::v1::meta::heartbeat_request::NodeWorkloads;
22use common_error::ext::BoxedError;
23use common_meta::distributed_time_constants::{
24    DATANODE_LEASE_SECS, FLOWNODE_LEASE_SECS, FRONTEND_HEARTBEAT_INTERVAL_MILLIS,
25};
26use common_meta::error::Result;
27use common_meta::peer::{Peer, PeerDiscovery, PeerResolver};
28use common_meta::{DatanodeId, FlownodeId};
29use common_time::util::DefaultSystemTimer;
30use snafu::ResultExt;
31
32use crate::cluster::MetaPeerClient;
33use crate::discovery::lease::{LeaseValueAccessor, LeaseValueType};
34
35#[async_trait::async_trait]
36impl PeerDiscovery for MetaPeerClient {
37    async fn active_frontends(&self) -> Result<Vec<Peer>> {
38        utils::alive_frontends(
39            &DefaultSystemTimer,
40            self,
41            Duration::from_millis(FRONTEND_HEARTBEAT_INTERVAL_MILLIS),
42        )
43        .await
44        .map_err(BoxedError::new)
45        .context(common_meta::error::ExternalSnafu)
46    }
47
48    async fn active_datanodes(
49        &self,
50        filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
51    ) -> Result<Vec<Peer>> {
52        utils::alive_datanodes(
53            &DefaultSystemTimer,
54            self,
55            Duration::from_secs(DATANODE_LEASE_SECS),
56            filter,
57        )
58        .await
59        .map_err(BoxedError::new)
60        .context(common_meta::error::ExternalSnafu)
61    }
62
63    async fn active_flownodes(
64        &self,
65        filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
66    ) -> Result<Vec<Peer>> {
67        utils::alive_flownodes(
68            &DefaultSystemTimer,
69            self,
70            Duration::from_secs(FLOWNODE_LEASE_SECS),
71            filter,
72        )
73        .await
74        .map_err(BoxedError::new)
75        .context(common_meta::error::ExternalSnafu)
76    }
77}
78
79#[async_trait::async_trait]
80impl PeerResolver for MetaPeerClient {
81    async fn datanode(&self, id: DatanodeId) -> Result<Option<Peer>> {
82        let peer = self
83            .lease_value(LeaseValueType::Datanode, id)
84            .await
85            .map_err(BoxedError::new)
86            .context(common_meta::error::ExternalSnafu)?
87            .map(|(peer_id, lease)| Peer::new(peer_id, lease.node_addr));
88        Ok(peer)
89    }
90
91    async fn flownode(&self, id: FlownodeId) -> Result<Option<Peer>> {
92        let peer = self
93            .lease_value(LeaseValueType::Flownode, id)
94            .await
95            .map_err(BoxedError::new)
96            .context(common_meta::error::ExternalSnafu)?
97            .map(|(peer_id, lease)| Peer::new(peer_id, lease.node_addr));
98        Ok(peer)
99    }
100}