Skip to main content

meta_srv/
discovery.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod lease;
16pub mod node_info;
17pub mod utils;
18
19use api::v1::meta::heartbeat_request::NodeWorkloads;
20use common_error::ext::BoxedError;
21use common_meta::cluster::NodeInfo;
22use common_meta::distributed_time_constants::default_distributed_time_constants;
23use common_meta::error::Result;
24use common_meta::peer::{Peer, PeerDiscovery, PeerResolver};
25use common_meta::{DatanodeId, FlownodeId};
26use common_time::util::DefaultSystemTimer;
27use snafu::ResultExt;
28
29use crate::cluster::MetaPeerClient;
30use crate::discovery::lease::{LeaseValueAccessor, LeaseValueType};
31
32#[async_trait::async_trait]
33impl PeerDiscovery for MetaPeerClient {
34    async fn active_frontends(&self) -> Result<Vec<NodeInfo>> {
35        utils::alive_frontend_infos(
36            &DefaultSystemTimer,
37            self,
38            default_distributed_time_constants().frontend_heartbeat_interval,
39        )
40        .await
41        .map_err(BoxedError::new)
42        .context(common_meta::error::ExternalSnafu)
43    }
44
45    async fn active_datanodes(
46        &self,
47        filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
48    ) -> Result<Vec<NodeInfo>> {
49        utils::alive_datanode_infos(
50            &DefaultSystemTimer,
51            self,
52            default_distributed_time_constants().datanode_lease,
53            filter,
54        )
55        .await
56        .map_err(BoxedError::new)
57        .context(common_meta::error::ExternalSnafu)
58    }
59
60    async fn active_flownodes(
61        &self,
62        filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
63    ) -> Result<Vec<NodeInfo>> {
64        utils::alive_flownode_infos(
65            &DefaultSystemTimer,
66            self,
67            default_distributed_time_constants().flownode_lease,
68            filter,
69        )
70        .await
71        .map_err(BoxedError::new)
72        .context(common_meta::error::ExternalSnafu)
73    }
74}
75
76#[async_trait::async_trait]
77impl PeerResolver for MetaPeerClient {
78    async fn datanode(&self, id: DatanodeId) -> Result<Option<Peer>> {
79        let peer = self
80            .lease_value(LeaseValueType::Datanode, id)
81            .await
82            .map_err(BoxedError::new)
83            .context(common_meta::error::ExternalSnafu)?
84            .map(|(peer_id, lease)| Peer::new(peer_id, lease.node_addr));
85        Ok(peer)
86    }
87
88    async fn flownode(&self, id: FlownodeId) -> Result<Option<Peer>> {
89        let peer = self
90            .lease_value(LeaseValueType::Flownode, id)
91            .await
92            .map_err(BoxedError::new)
93            .context(common_meta::error::ExternalSnafu)?
94            .map(|(peer_id, lease)| Peer::new(peer_id, lease.node_addr));
95        Ok(peer)
96    }
97}