meta_srv/service/
store.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod cached_kv;
16
17use api::v1::meta::{
18    store_server, BatchDeleteRequest as PbBatchDeleteRequest,
19    BatchDeleteResponse as PbBatchDeleteResponse, BatchGetRequest as PbBatchGetRequest,
20    BatchGetResponse as PbBatchGetResponse, BatchPutRequest as PbBatchPutRequest,
21    BatchPutResponse as PbBatchPutResponse, CompareAndPutRequest as PbCompareAndPutRequest,
22    CompareAndPutResponse as PbCompareAndPutResponse, DeleteRangeRequest as PbDeleteRangeRequest,
23    DeleteRangeResponse as PbDeleteRangeResponse, PutRequest as PbPutRequest,
24    PutResponse as PbPutResponse, RangeRequest as PbRangeRequest, RangeResponse as PbRangeResponse,
25    ResponseHeader,
26};
27use common_meta::rpc::store::{
28    BatchDeleteRequest, BatchGetRequest, BatchPutRequest, CompareAndPutRequest, DeleteRangeRequest,
29    PutRequest, RangeRequest,
30};
31use snafu::ResultExt;
32use tonic::{Request, Response};
33
34use crate::error::{self};
35use crate::metasrv::Metasrv;
36use crate::metrics::METRIC_META_KV_REQUEST_ELAPSED;
37use crate::service::GrpcResult;
38
39#[async_trait::async_trait]
40impl store_server::Store for Metasrv {
41    async fn range(&self, req: Request<PbRangeRequest>) -> GrpcResult<PbRangeResponse> {
42        let req = req.into_inner();
43
44        let _timer = METRIC_META_KV_REQUEST_ELAPSED
45            .with_label_values(&[self.kv_backend().name(), "range"])
46            .start_timer();
47
48        let req: RangeRequest = req.into();
49
50        let res = self
51            .kv_backend()
52            .range(req)
53            .await
54            .context(error::KvBackendSnafu)?;
55
56        let res = res.to_proto_resp(ResponseHeader::success());
57        Ok(Response::new(res))
58    }
59
60    async fn put(&self, req: Request<PbPutRequest>) -> GrpcResult<PbPutResponse> {
61        let req = req.into_inner();
62        let _timer = METRIC_META_KV_REQUEST_ELAPSED
63            .with_label_values(&[self.kv_backend().name(), "put"])
64            .start_timer();
65
66        let req: PutRequest = req.into();
67
68        let res = self
69            .kv_backend()
70            .put(req)
71            .await
72            .context(error::KvBackendSnafu)?;
73
74        let res = res.to_proto_resp(ResponseHeader::success());
75        Ok(Response::new(res))
76    }
77
78    async fn batch_get(&self, req: Request<PbBatchGetRequest>) -> GrpcResult<PbBatchGetResponse> {
79        let req = req.into_inner();
80        let _timer = METRIC_META_KV_REQUEST_ELAPSED
81            .with_label_values(&[self.kv_backend().name(), "batch_get"])
82            .start_timer();
83
84        let req: BatchGetRequest = req.into();
85
86        let res = self
87            .kv_backend()
88            .batch_get(req)
89            .await
90            .context(error::KvBackendSnafu)?;
91
92        let res = res.to_proto_resp(ResponseHeader::success());
93        Ok(Response::new(res))
94    }
95
96    async fn batch_put(&self, req: Request<PbBatchPutRequest>) -> GrpcResult<PbBatchPutResponse> {
97        let req = req.into_inner();
98
99        let _timer = METRIC_META_KV_REQUEST_ELAPSED
100            .with_label_values(&[self.kv_backend().name(), "batch_pub"])
101            .start_timer();
102
103        let req: BatchPutRequest = req.into();
104
105        let res = self
106            .kv_backend()
107            .batch_put(req)
108            .await
109            .context(error::KvBackendSnafu)?;
110
111        let res = res.to_proto_resp(ResponseHeader::success());
112        Ok(Response::new(res))
113    }
114
115    async fn batch_delete(
116        &self,
117        req: Request<PbBatchDeleteRequest>,
118    ) -> GrpcResult<PbBatchDeleteResponse> {
119        let req = req.into_inner();
120
121        let _timer = METRIC_META_KV_REQUEST_ELAPSED
122            .with_label_values(&[self.kv_backend().name(), "batch_delete"])
123            .start_timer();
124
125        let req: BatchDeleteRequest = req.into();
126
127        let res = self
128            .kv_backend()
129            .batch_delete(req)
130            .await
131            .context(error::KvBackendSnafu)?;
132
133        let res = res.to_proto_resp(ResponseHeader::success());
134        Ok(Response::new(res))
135    }
136
137    async fn compare_and_put(
138        &self,
139        req: Request<PbCompareAndPutRequest>,
140    ) -> GrpcResult<PbCompareAndPutResponse> {
141        let req = req.into_inner();
142
143        let _timer = METRIC_META_KV_REQUEST_ELAPSED
144            .with_label_values(&[self.kv_backend().name(), "compare_and_put"])
145            .start_timer();
146
147        let req: CompareAndPutRequest = req.into();
148
149        let res = self
150            .kv_backend()
151            .compare_and_put(req)
152            .await
153            .context(error::KvBackendSnafu)?;
154
155        let res = res.to_proto_resp(ResponseHeader::success());
156        Ok(Response::new(res))
157    }
158
159    async fn delete_range(
160        &self,
161        req: Request<PbDeleteRangeRequest>,
162    ) -> GrpcResult<PbDeleteRangeResponse> {
163        let req = req.into_inner();
164
165        let _timer = METRIC_META_KV_REQUEST_ELAPSED
166            .with_label_values(&[self.kv_backend().name(), "delete_range"])
167            .start_timer();
168
169        let req: DeleteRangeRequest = req.into();
170
171        let res = self
172            .kv_backend()
173            .delete_range(req)
174            .await
175            .context(error::KvBackendSnafu)?;
176
177        let res = res.to_proto_resp(ResponseHeader::success());
178        Ok(Response::new(res))
179    }
180}
181
182#[cfg(test)]
183mod tests {
184    use std::sync::Arc;
185
186    use api::v1::meta::store_server::Store;
187    use api::v1::meta::*;
188    use common_meta::kv_backend::memory::MemoryKvBackend;
189    use common_telemetry::tracing_context::W3cTrace;
190    use tonic::IntoRequest;
191
192    use crate::metasrv::builder::MetasrvBuilder;
193    use crate::metasrv::Metasrv;
194
195    async fn new_metasrv() -> Metasrv {
196        MetasrvBuilder::new()
197            .kv_backend(Arc::new(MemoryKvBackend::new()))
198            .build()
199            .await
200            .unwrap()
201    }
202
203    #[tokio::test]
204    async fn test_range() {
205        let metasrv = new_metasrv().await;
206
207        let mut req = RangeRequest::default();
208        req.set_header(1, Role::Datanode, W3cTrace::new());
209        let res = metasrv.range(req.into_request()).await;
210
211        let _ = res.unwrap();
212    }
213
214    #[tokio::test]
215    async fn test_put() {
216        let metasrv = new_metasrv().await;
217
218        let mut req = PutRequest::default();
219        req.set_header(1, Role::Datanode, W3cTrace::new());
220        let res = metasrv.put(req.into_request()).await;
221
222        let _ = res.unwrap();
223    }
224
225    #[tokio::test]
226    async fn test_batch_get() {
227        let metasrv = new_metasrv().await;
228
229        let mut req = BatchGetRequest::default();
230        req.set_header(1, Role::Datanode, W3cTrace::new());
231        let res = metasrv.batch_get(req.into_request()).await;
232
233        let _ = res.unwrap();
234    }
235
236    #[tokio::test]
237    async fn test_batch_put() {
238        common_telemetry::init_default_ut_logging();
239        let metasrv = new_metasrv().await;
240
241        let mut req = BatchPutRequest::default();
242        req.set_header(1, Role::Datanode, W3cTrace::new());
243        let res = metasrv.batch_put(req.into_request()).await;
244
245        let _ = res.unwrap();
246    }
247
248    #[tokio::test]
249    async fn test_batch_delete() {
250        let metasrv = new_metasrv().await;
251
252        let mut req = BatchDeleteRequest::default();
253        req.set_header(1, Role::Datanode, W3cTrace::new());
254        let res = metasrv.batch_delete(req.into_request()).await;
255
256        let _ = res.unwrap();
257    }
258
259    #[tokio::test]
260    async fn test_compare_and_put() {
261        let metasrv = new_metasrv().await;
262
263        let mut req = CompareAndPutRequest::default();
264        req.set_header(1, Role::Datanode, W3cTrace::new());
265        let res = metasrv.compare_and_put(req.into_request()).await;
266
267        let _ = res.unwrap();
268    }
269
270    #[tokio::test]
271    async fn test_delete_range() {
272        let metasrv = new_metasrv().await;
273
274        let mut req = DeleteRangeRequest::default();
275        req.set_header(1, Role::Datanode, W3cTrace::new());
276        let res = metasrv.delete_range(req.into_request()).await;
277
278        let _ = res.unwrap();
279    }
280}