meta_srv/
discovery.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod lease;
16pub mod node_info;
17pub mod utils;
18
19use api::v1::meta::heartbeat_request::NodeWorkloads;
20use common_error::ext::BoxedError;
21use common_meta::distributed_time_constants::default_distributed_time_constants;
22use common_meta::error::Result;
23use common_meta::peer::{Peer, PeerDiscovery, PeerResolver};
24use common_meta::{DatanodeId, FlownodeId};
25use common_time::util::DefaultSystemTimer;
26use snafu::ResultExt;
27
28use crate::cluster::MetaPeerClient;
29use crate::discovery::lease::{LeaseValueAccessor, LeaseValueType};
30
31#[async_trait::async_trait]
32impl PeerDiscovery for MetaPeerClient {
33    async fn active_frontends(&self) -> Result<Vec<Peer>> {
34        utils::alive_frontends(
35            &DefaultSystemTimer,
36            self,
37            default_distributed_time_constants().frontend_heartbeat_interval,
38        )
39        .await
40        .map_err(BoxedError::new)
41        .context(common_meta::error::ExternalSnafu)
42    }
43
44    async fn active_datanodes(
45        &self,
46        filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
47    ) -> Result<Vec<Peer>> {
48        utils::alive_datanodes(
49            &DefaultSystemTimer,
50            self,
51            default_distributed_time_constants().datanode_lease,
52            filter,
53        )
54        .await
55        .map_err(BoxedError::new)
56        .context(common_meta::error::ExternalSnafu)
57    }
58
59    async fn active_flownodes(
60        &self,
61        filter: Option<for<'a> fn(&'a NodeWorkloads) -> bool>,
62    ) -> Result<Vec<Peer>> {
63        utils::alive_flownodes(
64            &DefaultSystemTimer,
65            self,
66            default_distributed_time_constants().flownode_lease,
67            filter,
68        )
69        .await
70        .map_err(BoxedError::new)
71        .context(common_meta::error::ExternalSnafu)
72    }
73}
74
75#[async_trait::async_trait]
76impl PeerResolver for MetaPeerClient {
77    async fn datanode(&self, id: DatanodeId) -> Result<Option<Peer>> {
78        let peer = self
79            .lease_value(LeaseValueType::Datanode, id)
80            .await
81            .map_err(BoxedError::new)
82            .context(common_meta::error::ExternalSnafu)?
83            .map(|(peer_id, lease)| Peer::new(peer_id, lease.node_addr));
84        Ok(peer)
85    }
86
87    async fn flownode(&self, id: FlownodeId) -> Result<Option<Peer>> {
88        let peer = self
89            .lease_value(LeaseValueType::Flownode, id)
90            .await
91            .map_err(BoxedError::new)
92            .context(common_meta::error::ExternalSnafu)?
93            .map(|(peer_id, lease)| Peer::new(peer_id, lease.node_addr));
94        Ok(peer)
95    }
96}