meta_srv/service/
cluster.rs1use api::v1::meta::{
16 cluster_server, BatchGetRequest as PbBatchGetRequest, BatchGetResponse as PbBatchGetResponse,
17 Error, MetasrvNodeInfo, MetasrvPeersRequest, MetasrvPeersResponse,
18 RangeRequest as PbRangeRequest, RangeResponse as PbRangeResponse, ResponseHeader,
19};
20use common_telemetry::warn;
21use snafu::ResultExt;
22use tonic::{Request, Response};
23
24use crate::metasrv::Metasrv;
25use crate::service::GrpcResult;
26use crate::{error, metasrv};
27
28#[async_trait::async_trait]
29impl cluster_server::Cluster for Metasrv {
30 async fn batch_get(&self, req: Request<PbBatchGetRequest>) -> GrpcResult<PbBatchGetResponse> {
31 if !self.is_leader() {
32 let is_not_leader = ResponseHeader::failed(Error::is_not_leader());
33 let resp = PbBatchGetResponse {
34 header: Some(is_not_leader),
35 ..Default::default()
36 };
37
38 warn!("The current meta is not leader, but a `batch_get` request have reached the meta. Detail: {:?}.", req);
39 return Ok(Response::new(resp));
40 }
41
42 let req = req.into_inner().into();
43 let resp = self
44 .in_memory()
45 .batch_get(req)
46 .await
47 .context(error::KvBackendSnafu)?;
48
49 let resp = resp.to_proto_resp(ResponseHeader::success());
50 Ok(Response::new(resp))
51 }
52
53 async fn range(&self, req: Request<PbRangeRequest>) -> GrpcResult<PbRangeResponse> {
54 if !self.is_leader() {
55 let is_not_leader = ResponseHeader::failed(Error::is_not_leader());
56 let resp = PbRangeResponse {
57 header: Some(is_not_leader),
58 ..Default::default()
59 };
60
61 warn!("The current meta is not leader, but a `range` request have reached the meta. Detail: {:?}.", req);
62 return Ok(Response::new(resp));
63 }
64
65 let req = req.into_inner().into();
66 let res = self
67 .in_memory()
68 .range(req)
69 .await
70 .context(error::KvBackendSnafu)?;
71
72 let resp = res.to_proto_resp(ResponseHeader::success());
73 Ok(Response::new(resp))
74 }
75
76 async fn metasrv_peers(
77 &self,
78 req: Request<MetasrvPeersRequest>,
79 ) -> GrpcResult<MetasrvPeersResponse> {
80 if !self.is_leader() {
81 let is_not_leader = ResponseHeader::failed(Error::is_not_leader());
82 let resp = MetasrvPeersResponse {
83 header: Some(is_not_leader),
84 ..Default::default()
85 };
86
87 warn!("The current meta is not leader, but a `metasrv_peers` request have reached the meta. Detail: {:?}.", req);
88 return Ok(Response::new(resp));
89 }
90
91 let leader_addr = &self.options().server_addr;
92 let (leader, followers) = match self.election() {
93 Some(election) => {
94 let nodes = election.all_candidates().await?;
95 let followers = nodes
96 .into_iter()
97 .filter(|node_info| &node_info.addr != leader_addr)
98 .map(api::v1::meta::MetasrvNodeInfo::from)
99 .collect();
100 (self.node_info().into(), followers)
101 }
102 None => (self.make_node_info(leader_addr), vec![]),
103 };
104
105 let resp = MetasrvPeersResponse {
106 header: Some(ResponseHeader::success()),
107 leader: Some(leader),
108 followers,
109 };
110
111 Ok(Response::new(resp))
112 }
113}
114
115impl Metasrv {
116 pub fn is_leader(&self) -> bool {
117 self.election().map(|x| x.is_leader()).unwrap_or(true)
119 }
120
121 fn make_node_info(&self, addr: &str) -> MetasrvNodeInfo {
122 let build_info = common_version::build_info();
123 metasrv::MetasrvNodeInfo {
124 addr: addr.to_string(),
125 version: build_info.version.to_string(),
126 git_commit: build_info.commit_short.to_string(),
127 start_time_ms: self.start_time_ms(),
128 }
129 .into()
130 }
131}