meta_srv/service/
cluster.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::meta::{
16    BatchGetRequest as PbBatchGetRequest, BatchGetResponse as PbBatchGetResponse, MetasrvNodeInfo,
17    MetasrvPeersRequest, MetasrvPeersResponse, RangeRequest as PbRangeRequest,
18    RangeResponse as PbRangeResponse, cluster_server,
19};
20use snafu::ResultExt;
21use tonic::Request;
22
23use crate::metasrv::Metasrv;
24use crate::service::GrpcResult;
25use crate::{check_leader, error, metasrv};
26
27#[async_trait::async_trait]
28impl cluster_server::Cluster for Metasrv {
29    async fn batch_get(&self, req: Request<PbBatchGetRequest>) -> GrpcResult<PbBatchGetResponse> {
30        check_leader!(self, req, PbBatchGetResponse, "`batch_get`");
31
32        let req = req.into_inner().into();
33        let resp = self
34            .in_memory()
35            .batch_get(req)
36            .await
37            .context(error::KvBackendSnafu)?;
38
39        let resp = resp.to_proto_resp(ResponseHeader::success());
40        Ok(Response::new(resp))
41    }
42
43    async fn range(&self, req: Request<PbRangeRequest>) -> GrpcResult<PbRangeResponse> {
44        check_leader!(self, req, PbRangeResponse, "`range`");
45
46        let req = req.into_inner().into();
47        let res = self
48            .in_memory()
49            .range(req)
50            .await
51            .context(error::KvBackendSnafu)?;
52
53        let resp = res.to_proto_resp(ResponseHeader::success());
54        Ok(Response::new(resp))
55    }
56
57    async fn metasrv_peers(
58        &self,
59        req: Request<MetasrvPeersRequest>,
60    ) -> GrpcResult<MetasrvPeersResponse> {
61        check_leader!(self, req, MetasrvPeersResponse, "`metasrv_peers`");
62
63        let leader_addr = &self.options().grpc.server_addr;
64        let (leader, followers) = match self.election() {
65            Some(election) => {
66                let nodes = election.all_candidates().await?;
67                let followers = nodes
68                    .into_iter()
69                    .filter(|node_info| &node_info.addr != leader_addr)
70                    .map(api::v1::meta::MetasrvNodeInfo::from)
71                    .collect();
72                (self.node_info().into(), followers)
73            }
74            None => (self.make_node_info(leader_addr), vec![]),
75        };
76
77        let resp = MetasrvPeersResponse {
78            header: Some(ResponseHeader::success()),
79            leader: Some(leader),
80            followers,
81        };
82
83        Ok(Response::new(resp))
84    }
85}
86
87impl Metasrv {
88    pub fn is_leader(&self) -> bool {
89        // Returns true when there is no `election`, indicating the presence of only one `Metasrv` node, which is the leader.
90        self.election().map(|x| x.is_leader()).unwrap_or(true)
91    }
92
93    fn make_node_info(&self, addr: &str) -> MetasrvNodeInfo {
94        let build_info = common_version::build_info();
95        metasrv::MetasrvNodeInfo {
96            addr: addr.to_string(),
97            version: build_info.version.to_string(),
98            git_commit: build_info.commit_short.to_string(),
99            start_time_ms: self.start_time_ms(),
100            total_cpu_millicores: self.resource_stat().get_total_cpu_millicores(),
101            total_memory_bytes: self.resource_stat().get_total_memory_bytes(),
102            cpu_usage_millicores: self.resource_stat().get_cpu_usage_millicores(),
103            memory_usage_bytes: self.resource_stat().get_memory_usage_bytes(),
104            hostname: hostname::get()
105                .unwrap_or_default()
106                .to_string_lossy()
107                .to_string(),
108        }
109        .into()
110    }
111}