Skip to main content

meta_srv/service/
cluster.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::meta::{
16    BatchGetRequest as PbBatchGetRequest, BatchGetResponse as PbBatchGetResponse, MetasrvNodeInfo,
17    MetasrvPeersRequest, MetasrvPeersResponse, RangeRequest as PbRangeRequest,
18    RangeResponse as PbRangeResponse, cluster_server,
19};
20use snafu::ResultExt;
21use tonic::Request;
22
23use crate::metasrv::Metasrv;
24use crate::service::GrpcResult;
25use crate::{check_leader, error, metasrv};
26
27#[async_trait::async_trait]
28impl cluster_server::Cluster for Metasrv {
29    async fn batch_get(&self, req: Request<PbBatchGetRequest>) -> GrpcResult<PbBatchGetResponse> {
30        check_leader!(self, req, PbBatchGetResponse, "`batch_get`");
31
32        let req = req.into_inner().into();
33        let resp = self
34            .in_memory()
35            .batch_get(req)
36            .await
37            .context(error::KvBackendSnafu)?;
38
39        let resp = resp.to_proto_resp(ResponseHeader::success());
40        Ok(Response::new(resp))
41    }
42
43    async fn range(&self, req: Request<PbRangeRequest>) -> GrpcResult<PbRangeResponse> {
44        check_leader!(self, req, PbRangeResponse, "`range`");
45
46        let req = req.into_inner().into();
47        let res = self
48            .in_memory()
49            .range(req)
50            .await
51            .context(error::KvBackendSnafu)?;
52
53        let resp = res.to_proto_resp(ResponseHeader::success());
54        Ok(Response::new(resp))
55    }
56
57    async fn metasrv_peers(
58        &self,
59        req: Request<MetasrvPeersRequest>,
60    ) -> GrpcResult<MetasrvPeersResponse> {
61        check_leader!(self, req, MetasrvPeersResponse, "`metasrv_peers`");
62
63        let leader_addr = &self.options().grpc.server_addr;
64        let (leader, followers) = match self.election() {
65            Some(election) => {
66                let nodes = election
67                    .all_candidates()
68                    .await
69                    .context(error::KvBackendSnafu)?;
70                let followers = nodes
71                    .into_iter()
72                    .filter(|node_info| &node_info.addr != leader_addr)
73                    .map(api::v1::meta::MetasrvNodeInfo::from)
74                    .collect();
75                (self.node_info().into(), followers)
76            }
77            None => (self.make_node_info(leader_addr), vec![]),
78        };
79
80        let resp = MetasrvPeersResponse {
81            header: Some(ResponseHeader::success()),
82            leader: Some(leader),
83            followers,
84        };
85
86        Ok(Response::new(resp))
87    }
88}
89
90impl Metasrv {
91    pub fn is_leader(&self) -> bool {
92        // Returns true when there is no `election`, indicating the presence of only one `Metasrv` node, which is the leader.
93        self.election().map(|x| x.is_leader()).unwrap_or(true)
94    }
95
96    fn make_node_info(&self, addr: &str) -> MetasrvNodeInfo {
97        let build_info = common_version::build_info();
98        metasrv::MetasrvNodeInfo {
99            addr: addr.to_string(),
100            version: build_info.version.to_string(),
101            git_commit: build_info.commit_short.to_string(),
102            start_time_ms: self.start_time_ms(),
103            total_cpu_millicores: self.resource_stat().get_total_cpu_millicores(),
104            total_memory_bytes: self.resource_stat().get_total_memory_bytes(),
105            cpu_usage_millicores: self.resource_stat().get_cpu_usage_millicores(),
106            memory_usage_bytes: self.resource_stat().get_memory_usage_bytes(),
107            hostname: hostname::get()
108                .unwrap_or_default()
109                .to_string_lossy()
110                .to_string(),
111        }
112        .into()
113    }
114}