meta_srv/service/
cluster.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::meta::{
16    cluster_server, BatchGetRequest as PbBatchGetRequest, BatchGetResponse as PbBatchGetResponse,
17    Error, MetasrvNodeInfo, MetasrvPeersRequest, MetasrvPeersResponse,
18    RangeRequest as PbRangeRequest, RangeResponse as PbRangeResponse, ResponseHeader,
19};
20use common_telemetry::warn;
21use snafu::ResultExt;
22use tonic::{Request, Response};
23
24use crate::metasrv::Metasrv;
25use crate::service::GrpcResult;
26use crate::{error, metasrv};
27
28#[async_trait::async_trait]
29impl cluster_server::Cluster for Metasrv {
30    async fn batch_get(&self, req: Request<PbBatchGetRequest>) -> GrpcResult<PbBatchGetResponse> {
31        if !self.is_leader() {
32            let is_not_leader = ResponseHeader::failed(Error::is_not_leader());
33            let resp = PbBatchGetResponse {
34                header: Some(is_not_leader),
35                ..Default::default()
36            };
37
38            warn!("The current meta is not leader, but a `batch_get` request have reached the meta. Detail: {:?}.", req);
39            return Ok(Response::new(resp));
40        }
41
42        let req = req.into_inner().into();
43        let resp = self
44            .in_memory()
45            .batch_get(req)
46            .await
47            .context(error::KvBackendSnafu)?;
48
49        let resp = resp.to_proto_resp(ResponseHeader::success());
50        Ok(Response::new(resp))
51    }
52
53    async fn range(&self, req: Request<PbRangeRequest>) -> GrpcResult<PbRangeResponse> {
54        if !self.is_leader() {
55            let is_not_leader = ResponseHeader::failed(Error::is_not_leader());
56            let resp = PbRangeResponse {
57                header: Some(is_not_leader),
58                ..Default::default()
59            };
60
61            warn!("The current meta is not leader, but a `range` request have reached the meta. Detail: {:?}.", req);
62            return Ok(Response::new(resp));
63        }
64
65        let req = req.into_inner().into();
66        let res = self
67            .in_memory()
68            .range(req)
69            .await
70            .context(error::KvBackendSnafu)?;
71
72        let resp = res.to_proto_resp(ResponseHeader::success());
73        Ok(Response::new(resp))
74    }
75
76    async fn metasrv_peers(
77        &self,
78        req: Request<MetasrvPeersRequest>,
79    ) -> GrpcResult<MetasrvPeersResponse> {
80        if !self.is_leader() {
81            let is_not_leader = ResponseHeader::failed(Error::is_not_leader());
82            let resp = MetasrvPeersResponse {
83                header: Some(is_not_leader),
84                ..Default::default()
85            };
86
87            warn!("The current meta is not leader, but a `metasrv_peers` request have reached the meta. Detail: {:?}.", req);
88            return Ok(Response::new(resp));
89        }
90
91        let leader_addr = &self.options().server_addr;
92        let (leader, followers) = match self.election() {
93            Some(election) => {
94                let nodes = election.all_candidates().await?;
95                let followers = nodes
96                    .into_iter()
97                    .filter(|node_info| &node_info.addr != leader_addr)
98                    .map(api::v1::meta::MetasrvNodeInfo::from)
99                    .collect();
100                (self.node_info().into(), followers)
101            }
102            None => (self.make_node_info(leader_addr), vec![]),
103        };
104
105        let resp = MetasrvPeersResponse {
106            header: Some(ResponseHeader::success()),
107            leader: Some(leader),
108            followers,
109        };
110
111        Ok(Response::new(resp))
112    }
113}
114
115impl Metasrv {
116    pub fn is_leader(&self) -> bool {
117        // Returns true when there is no `election`, indicating the presence of only one `Metasrv` node, which is the leader.
118        self.election().map(|x| x.is_leader()).unwrap_or(true)
119    }
120
121    fn make_node_info(&self, addr: &str) -> MetasrvNodeInfo {
122        let build_info = common_version::build_info();
123        metasrv::MetasrvNodeInfo {
124            addr: addr.to_string(),
125            version: build_info.version.to_string(),
126            git_commit: build_info.commit_short.to_string(),
127            start_time_ms: self.start_time_ms(),
128        }
129        .into()
130    }
131}