meta_srv/service/
cluster.rs1use api::v1::meta::{
16 cluster_server, BatchGetRequest as PbBatchGetRequest, BatchGetResponse as PbBatchGetResponse,
17 MetasrvNodeInfo, MetasrvPeersRequest, MetasrvPeersResponse, RangeRequest as PbRangeRequest,
18 RangeResponse as PbRangeResponse,
19};
20use common_telemetry::warn;
21use snafu::ResultExt;
22use tonic::Request;
23
24use crate::metasrv::Metasrv;
25use crate::service::GrpcResult;
26use crate::{check_leader, error, metasrv};
27
28#[async_trait::async_trait]
29impl cluster_server::Cluster for Metasrv {
30 async fn batch_get(&self, req: Request<PbBatchGetRequest>) -> GrpcResult<PbBatchGetResponse> {
31 check_leader!(self, req, PbBatchGetResponse, "`batch_get`");
32
33 let req = req.into_inner().into();
34 let resp = self
35 .in_memory()
36 .batch_get(req)
37 .await
38 .context(error::KvBackendSnafu)?;
39
40 let resp = resp.to_proto_resp(ResponseHeader::success());
41 Ok(Response::new(resp))
42 }
43
44 async fn range(&self, req: Request<PbRangeRequest>) -> GrpcResult<PbRangeResponse> {
45 check_leader!(self, req, PbRangeResponse, "`range`");
46
47 let req = req.into_inner().into();
48 let res = self
49 .in_memory()
50 .range(req)
51 .await
52 .context(error::KvBackendSnafu)?;
53
54 let resp = res.to_proto_resp(ResponseHeader::success());
55 Ok(Response::new(resp))
56 }
57
58 async fn metasrv_peers(
59 &self,
60 req: Request<MetasrvPeersRequest>,
61 ) -> GrpcResult<MetasrvPeersResponse> {
62 check_leader!(self, req, MetasrvPeersResponse, "`metasrv_peers`");
63
64 let leader_addr = &self.options().grpc.server_addr;
65 let (leader, followers) = match self.election() {
66 Some(election) => {
67 let nodes = election.all_candidates().await?;
68 let followers = nodes
69 .into_iter()
70 .filter(|node_info| &node_info.addr != leader_addr)
71 .map(api::v1::meta::MetasrvNodeInfo::from)
72 .collect();
73 (self.node_info().into(), followers)
74 }
75 None => (self.make_node_info(leader_addr), vec![]),
76 };
77
78 let resp = MetasrvPeersResponse {
79 header: Some(ResponseHeader::success()),
80 leader: Some(leader),
81 followers,
82 };
83
84 Ok(Response::new(resp))
85 }
86}
87
88impl Metasrv {
89 pub fn is_leader(&self) -> bool {
90 self.election().map(|x| x.is_leader()).unwrap_or(true)
92 }
93
94 fn make_node_info(&self, addr: &str) -> MetasrvNodeInfo {
95 let build_info = common_version::build_info();
96 metasrv::MetasrvNodeInfo {
97 addr: addr.to_string(),
98 version: build_info.version.to_string(),
99 git_commit: build_info.commit_short.to_string(),
100 start_time_ms: self.start_time_ms(),
101 }
102 .into()
103 }
104}