meta_srv/service/
cluster.rs1use api::v1::meta::{
16 BatchGetRequest as PbBatchGetRequest, BatchGetResponse as PbBatchGetResponse, MetasrvNodeInfo,
17 MetasrvPeersRequest, MetasrvPeersResponse, RangeRequest as PbRangeRequest,
18 RangeResponse as PbRangeResponse, cluster_server,
19};
20use snafu::ResultExt;
21use tonic::Request;
22
23use crate::metasrv::Metasrv;
24use crate::service::GrpcResult;
25use crate::{check_leader, error, metasrv};
26
27#[async_trait::async_trait]
28impl cluster_server::Cluster for Metasrv {
29 async fn batch_get(&self, req: Request<PbBatchGetRequest>) -> GrpcResult<PbBatchGetResponse> {
30 check_leader!(self, req, PbBatchGetResponse, "`batch_get`");
31
32 let req = req.into_inner().into();
33 let resp = self
34 .in_memory()
35 .batch_get(req)
36 .await
37 .context(error::KvBackendSnafu)?;
38
39 let resp = resp.to_proto_resp(ResponseHeader::success());
40 Ok(Response::new(resp))
41 }
42
43 async fn range(&self, req: Request<PbRangeRequest>) -> GrpcResult<PbRangeResponse> {
44 check_leader!(self, req, PbRangeResponse, "`range`");
45
46 let req = req.into_inner().into();
47 let res = self
48 .in_memory()
49 .range(req)
50 .await
51 .context(error::KvBackendSnafu)?;
52
53 let resp = res.to_proto_resp(ResponseHeader::success());
54 Ok(Response::new(resp))
55 }
56
57 async fn metasrv_peers(
58 &self,
59 req: Request<MetasrvPeersRequest>,
60 ) -> GrpcResult<MetasrvPeersResponse> {
61 check_leader!(self, req, MetasrvPeersResponse, "`metasrv_peers`");
62
63 let leader_addr = &self.options().grpc.server_addr;
64 let (leader, followers) = match self.election() {
65 Some(election) => {
66 let nodes = election.all_candidates().await?;
67 let followers = nodes
68 .into_iter()
69 .filter(|node_info| &node_info.addr != leader_addr)
70 .map(api::v1::meta::MetasrvNodeInfo::from)
71 .collect();
72 (self.node_info().into(), followers)
73 }
74 None => (self.make_node_info(leader_addr), vec![]),
75 };
76
77 let resp = MetasrvPeersResponse {
78 header: Some(ResponseHeader::success()),
79 leader: Some(leader),
80 followers,
81 };
82
83 Ok(Response::new(resp))
84 }
85}
86
87impl Metasrv {
88 pub fn is_leader(&self) -> bool {
89 self.election().map(|x| x.is_leader()).unwrap_or(true)
91 }
92
93 fn make_node_info(&self, addr: &str) -> MetasrvNodeInfo {
94 let build_info = common_version::build_info();
95 metasrv::MetasrvNodeInfo {
96 addr: addr.to_string(),
97 version: build_info.version.to_string(),
98 git_commit: build_info.commit_short.to_string(),
99 start_time_ms: self.start_time_ms(),
100 total_cpu_millicores: self.resource_stat().get_total_cpu_millicores(),
101 total_memory_bytes: self.resource_stat().get_total_memory_bytes(),
102 cpu_usage_millicores: self.resource_stat().get_cpu_usage_millicores(),
103 memory_usage_bytes: self.resource_stat().get_memory_usage_bytes(),
104 hostname: hostname::get()
105 .unwrap_or_default()
106 .to_string_lossy()
107 .to_string(),
108 }
109 .into()
110 }
111}