meta_srv/
discovery.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod lease;
16pub mod node_info;
17pub mod utils;
18
19use std::time::Duration;
20
21use api::v1::meta::heartbeat_request::NodeWorkloads;
22use common_error::ext::BoxedError;
23use common_meta::distributed_time_constants::{
24    DATANODE_LEASE_SECS, FLOWNODE_LEASE_SECS, FRONTEND_HEARTBEAT_INTERVAL_MILLIS,
25};
26use common_meta::error::Result;
27use common_meta::peer::{Peer, PeerDiscovery, PeerResolver};
28use common_meta::{DatanodeId, FlownodeId};
29use snafu::ResultExt;
30
31use crate::cluster::MetaPeerClient;
32use crate::discovery::lease::{LeaseValueAccessor, LeaseValueType};
33
34#[async_trait::async_trait]
35impl PeerDiscovery for MetaPeerClient {
36    async fn active_frontends(&self) -> Result<Vec<Peer>> {
37        utils::alive_frontends(
38            self,
39            Duration::from_millis(FRONTEND_HEARTBEAT_INTERVAL_MILLIS),
40        )
41        .await
42        .map_err(BoxedError::new)
43        .context(common_meta::error::ExternalSnafu)
44    }
45
46    async fn active_datanodes(
47        &self,
48        filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
49    ) -> Result<Vec<Peer>> {
50        utils::alive_datanodes(self, Duration::from_secs(DATANODE_LEASE_SECS), filter)
51            .await
52            .map_err(BoxedError::new)
53            .context(common_meta::error::ExternalSnafu)
54    }
55
56    async fn active_flownodes(
57        &self,
58        filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
59    ) -> Result<Vec<Peer>> {
60        utils::alive_flownodes(self, Duration::from_secs(FLOWNODE_LEASE_SECS), filter)
61            .await
62            .map_err(BoxedError::new)
63            .context(common_meta::error::ExternalSnafu)
64    }
65}
66
67#[async_trait::async_trait]
68impl PeerResolver for MetaPeerClient {
69    async fn datanode(&self, id: DatanodeId) -> Result<Option<Peer>> {
70        let peer = self
71            .lease_value(LeaseValueType::Datanode, id)
72            .await
73            .map_err(BoxedError::new)
74            .context(common_meta::error::ExternalSnafu)?
75            .map(|(peer_id, lease)| Peer::new(peer_id, lease.node_addr));
76        Ok(peer)
77    }
78
79    async fn flownode(&self, id: FlownodeId) -> Result<Option<Peer>> {
80        let peer = self
81            .lease_value(LeaseValueType::Flownode, id)
82            .await
83            .map_err(BoxedError::new)
84            .context(common_meta::error::ExternalSnafu)?
85            .map(|(peer_id, lease)| Peer::new(peer_id, lease.node_addr));
86        Ok(peer)
87    }
88}