meta_srv/service/
cluster.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::meta::{
16    cluster_server, BatchGetRequest as PbBatchGetRequest, BatchGetResponse as PbBatchGetResponse,
17    MetasrvNodeInfo, MetasrvPeersRequest, MetasrvPeersResponse, RangeRequest as PbRangeRequest,
18    RangeResponse as PbRangeResponse,
19};
20use common_telemetry::warn;
21use snafu::ResultExt;
22use tonic::Request;
23
24use crate::metasrv::Metasrv;
25use crate::service::GrpcResult;
26use crate::{check_leader, error, metasrv};
27
28#[async_trait::async_trait]
29impl cluster_server::Cluster for Metasrv {
30    async fn batch_get(&self, req: Request<PbBatchGetRequest>) -> GrpcResult<PbBatchGetResponse> {
31        check_leader!(self, req, PbBatchGetResponse, "`batch_get`");
32
33        let req = req.into_inner().into();
34        let resp = self
35            .in_memory()
36            .batch_get(req)
37            .await
38            .context(error::KvBackendSnafu)?;
39
40        let resp = resp.to_proto_resp(ResponseHeader::success());
41        Ok(Response::new(resp))
42    }
43
44    async fn range(&self, req: Request<PbRangeRequest>) -> GrpcResult<PbRangeResponse> {
45        check_leader!(self, req, PbRangeResponse, "`range`");
46
47        let req = req.into_inner().into();
48        let res = self
49            .in_memory()
50            .range(req)
51            .await
52            .context(error::KvBackendSnafu)?;
53
54        let resp = res.to_proto_resp(ResponseHeader::success());
55        Ok(Response::new(resp))
56    }
57
58    async fn metasrv_peers(
59        &self,
60        req: Request<MetasrvPeersRequest>,
61    ) -> GrpcResult<MetasrvPeersResponse> {
62        check_leader!(self, req, MetasrvPeersResponse, "`metasrv_peers`");
63
64        let leader_addr = &self.options().grpc.server_addr;
65        let (leader, followers) = match self.election() {
66            Some(election) => {
67                let nodes = election.all_candidates().await?;
68                let followers = nodes
69                    .into_iter()
70                    .filter(|node_info| &node_info.addr != leader_addr)
71                    .map(api::v1::meta::MetasrvNodeInfo::from)
72                    .collect();
73                (self.node_info().into(), followers)
74            }
75            None => (self.make_node_info(leader_addr), vec![]),
76        };
77
78        let resp = MetasrvPeersResponse {
79            header: Some(ResponseHeader::success()),
80            leader: Some(leader),
81            followers,
82        };
83
84        Ok(Response::new(resp))
85    }
86}
87
88impl Metasrv {
89    pub fn is_leader(&self) -> bool {
90        // Returns true when there is no `election`, indicating the presence of only one `Metasrv` node, which is the leader.
91        self.election().map(|x| x.is_leader()).unwrap_or(true)
92    }
93
94    fn make_node_info(&self, addr: &str) -> MetasrvNodeInfo {
95        let build_info = common_version::build_info();
96        metasrv::MetasrvNodeInfo {
97            addr: addr.to_string(),
98            version: build_info.version.to_string(),
99            git_commit: build_info.commit_short.to_string(),
100            start_time_ms: self.start_time_ms(),
101        }
102        .into()
103    }
104}