meta_srv/service/
store.rs1pub mod cached_kv;
16
17use api::v1::meta::{
18 store_server, BatchDeleteRequest as PbBatchDeleteRequest,
19 BatchDeleteResponse as PbBatchDeleteResponse, BatchGetRequest as PbBatchGetRequest,
20 BatchGetResponse as PbBatchGetResponse, BatchPutRequest as PbBatchPutRequest,
21 BatchPutResponse as PbBatchPutResponse, CompareAndPutRequest as PbCompareAndPutRequest,
22 CompareAndPutResponse as PbCompareAndPutResponse, DeleteRangeRequest as PbDeleteRangeRequest,
23 DeleteRangeResponse as PbDeleteRangeResponse, PutRequest as PbPutRequest,
24 PutResponse as PbPutResponse, RangeRequest as PbRangeRequest, RangeResponse as PbRangeResponse,
25 ResponseHeader,
26};
27use common_meta::rpc::store::{
28 BatchDeleteRequest, BatchGetRequest, BatchPutRequest, CompareAndPutRequest, DeleteRangeRequest,
29 PutRequest, RangeRequest,
30};
31use snafu::ResultExt;
32use tonic::{Request, Response};
33
34use crate::error::{self};
35use crate::metasrv::Metasrv;
36use crate::metrics::METRIC_META_KV_REQUEST_ELAPSED;
37use crate::service::GrpcResult;
38
39#[async_trait::async_trait]
40impl store_server::Store for Metasrv {
41 async fn range(&self, req: Request<PbRangeRequest>) -> GrpcResult<PbRangeResponse> {
42 let req = req.into_inner();
43
44 let _timer = METRIC_META_KV_REQUEST_ELAPSED
45 .with_label_values(&[self.kv_backend().name(), "range"])
46 .start_timer();
47
48 let req: RangeRequest = req.into();
49
50 let res = self
51 .kv_backend()
52 .range(req)
53 .await
54 .context(error::KvBackendSnafu)?;
55
56 let res = res.to_proto_resp(ResponseHeader::success());
57 Ok(Response::new(res))
58 }
59
60 async fn put(&self, req: Request<PbPutRequest>) -> GrpcResult<PbPutResponse> {
61 let req = req.into_inner();
62 let _timer = METRIC_META_KV_REQUEST_ELAPSED
63 .with_label_values(&[self.kv_backend().name(), "put"])
64 .start_timer();
65
66 let req: PutRequest = req.into();
67
68 let res = self
69 .kv_backend()
70 .put(req)
71 .await
72 .context(error::KvBackendSnafu)?;
73
74 let res = res.to_proto_resp(ResponseHeader::success());
75 Ok(Response::new(res))
76 }
77
78 async fn batch_get(&self, req: Request<PbBatchGetRequest>) -> GrpcResult<PbBatchGetResponse> {
79 let req = req.into_inner();
80 let _timer = METRIC_META_KV_REQUEST_ELAPSED
81 .with_label_values(&[self.kv_backend().name(), "batch_get"])
82 .start_timer();
83
84 let req: BatchGetRequest = req.into();
85
86 let res = self
87 .kv_backend()
88 .batch_get(req)
89 .await
90 .context(error::KvBackendSnafu)?;
91
92 let res = res.to_proto_resp(ResponseHeader::success());
93 Ok(Response::new(res))
94 }
95
96 async fn batch_put(&self, req: Request<PbBatchPutRequest>) -> GrpcResult<PbBatchPutResponse> {
97 let req = req.into_inner();
98
99 let _timer = METRIC_META_KV_REQUEST_ELAPSED
100 .with_label_values(&[self.kv_backend().name(), "batch_pub"])
101 .start_timer();
102
103 let req: BatchPutRequest = req.into();
104
105 let res = self
106 .kv_backend()
107 .batch_put(req)
108 .await
109 .context(error::KvBackendSnafu)?;
110
111 let res = res.to_proto_resp(ResponseHeader::success());
112 Ok(Response::new(res))
113 }
114
115 async fn batch_delete(
116 &self,
117 req: Request<PbBatchDeleteRequest>,
118 ) -> GrpcResult<PbBatchDeleteResponse> {
119 let req = req.into_inner();
120
121 let _timer = METRIC_META_KV_REQUEST_ELAPSED
122 .with_label_values(&[self.kv_backend().name(), "batch_delete"])
123 .start_timer();
124
125 let req: BatchDeleteRequest = req.into();
126
127 let res = self
128 .kv_backend()
129 .batch_delete(req)
130 .await
131 .context(error::KvBackendSnafu)?;
132
133 let res = res.to_proto_resp(ResponseHeader::success());
134 Ok(Response::new(res))
135 }
136
137 async fn compare_and_put(
138 &self,
139 req: Request<PbCompareAndPutRequest>,
140 ) -> GrpcResult<PbCompareAndPutResponse> {
141 let req = req.into_inner();
142
143 let _timer = METRIC_META_KV_REQUEST_ELAPSED
144 .with_label_values(&[self.kv_backend().name(), "compare_and_put"])
145 .start_timer();
146
147 let req: CompareAndPutRequest = req.into();
148
149 let res = self
150 .kv_backend()
151 .compare_and_put(req)
152 .await
153 .context(error::KvBackendSnafu)?;
154
155 let res = res.to_proto_resp(ResponseHeader::success());
156 Ok(Response::new(res))
157 }
158
159 async fn delete_range(
160 &self,
161 req: Request<PbDeleteRangeRequest>,
162 ) -> GrpcResult<PbDeleteRangeResponse> {
163 let req = req.into_inner();
164
165 let _timer = METRIC_META_KV_REQUEST_ELAPSED
166 .with_label_values(&[self.kv_backend().name(), "delete_range"])
167 .start_timer();
168
169 let req: DeleteRangeRequest = req.into();
170
171 let res = self
172 .kv_backend()
173 .delete_range(req)
174 .await
175 .context(error::KvBackendSnafu)?;
176
177 let res = res.to_proto_resp(ResponseHeader::success());
178 Ok(Response::new(res))
179 }
180}
181
182#[cfg(test)]
183mod tests {
184 use std::sync::Arc;
185
186 use api::v1::meta::store_server::Store;
187 use api::v1::meta::*;
188 use common_meta::kv_backend::memory::MemoryKvBackend;
189 use common_telemetry::tracing_context::W3cTrace;
190 use tonic::IntoRequest;
191
192 use crate::metasrv::builder::MetasrvBuilder;
193 use crate::metasrv::Metasrv;
194
195 async fn new_metasrv() -> Metasrv {
196 MetasrvBuilder::new()
197 .kv_backend(Arc::new(MemoryKvBackend::new()))
198 .build()
199 .await
200 .unwrap()
201 }
202
203 #[tokio::test]
204 async fn test_range() {
205 let metasrv = new_metasrv().await;
206
207 let mut req = RangeRequest::default();
208 req.set_header(1, Role::Datanode, W3cTrace::new());
209 let res = metasrv.range(req.into_request()).await;
210
211 let _ = res.unwrap();
212 }
213
214 #[tokio::test]
215 async fn test_put() {
216 let metasrv = new_metasrv().await;
217
218 let mut req = PutRequest::default();
219 req.set_header(1, Role::Datanode, W3cTrace::new());
220 let res = metasrv.put(req.into_request()).await;
221
222 let _ = res.unwrap();
223 }
224
225 #[tokio::test]
226 async fn test_batch_get() {
227 let metasrv = new_metasrv().await;
228
229 let mut req = BatchGetRequest::default();
230 req.set_header(1, Role::Datanode, W3cTrace::new());
231 let res = metasrv.batch_get(req.into_request()).await;
232
233 let _ = res.unwrap();
234 }
235
236 #[tokio::test]
237 async fn test_batch_put() {
238 common_telemetry::init_default_ut_logging();
239 let metasrv = new_metasrv().await;
240
241 let mut req = BatchPutRequest::default();
242 req.set_header(1, Role::Datanode, W3cTrace::new());
243 let res = metasrv.batch_put(req.into_request()).await;
244
245 let _ = res.unwrap();
246 }
247
248 #[tokio::test]
249 async fn test_batch_delete() {
250 let metasrv = new_metasrv().await;
251
252 let mut req = BatchDeleteRequest::default();
253 req.set_header(1, Role::Datanode, W3cTrace::new());
254 let res = metasrv.batch_delete(req.into_request()).await;
255
256 let _ = res.unwrap();
257 }
258
259 #[tokio::test]
260 async fn test_compare_and_put() {
261 let metasrv = new_metasrv().await;
262
263 let mut req = CompareAndPutRequest::default();
264 req.set_header(1, Role::Datanode, W3cTrace::new());
265 let res = metasrv.compare_and_put(req.into_request()).await;
266
267 let _ = res.unwrap();
268 }
269
270 #[tokio::test]
271 async fn test_delete_range() {
272 let metasrv = new_metasrv().await;
273
274 let mut req = DeleteRangeRequest::default();
275 req.set_header(1, Role::Datanode, W3cTrace::new());
276 let res = metasrv.delete_range(req.into_request()).await;
277
278 let _ = res.unwrap();
279 }
280}