Skip to main content

common_meta/kv_backend/
read_only.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use crate::error::{ReadOnlyKvBackendSnafu, Result};
18use crate::kv_backend::txn::{Txn, TxnOp, TxnResponse};
19use crate::kv_backend::{KvBackend, KvBackendRef, TxnService};
20use crate::rpc::KeyValue;
21use crate::rpc::store::{
22    BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
23    BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
24    DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
25};
26
27/// A [`KvBackend`] wrapper that forwards reads and rejects writes.
28pub struct ReadOnlyKvBackend {
29    inner: KvBackendRef,
30    name: String,
31}
32
33impl ReadOnlyKvBackend {
34    pub fn new(inner: KvBackendRef) -> Self {
35        let name = format!("ReadOnlyKvBackend({})", inner.name());
36        Self { inner, name }
37    }
38
39    fn read_only<T>(&self) -> Result<T> {
40        ReadOnlyKvBackendSnafu {
41            name: self.name.clone(),
42        }
43        .fail()
44    }
45
46    fn validate_read_only_ops(&self, ops: &[TxnOp]) -> Result<()> {
47        if ops
48            .iter()
49            .any(|op| matches!(op, TxnOp::Put(_, _) | TxnOp::Delete(_)))
50        {
51            self.read_only()
52        } else {
53            Ok(())
54        }
55    }
56
57    fn validate_read_only_txn(&self, txn: &Txn) -> Result<()> {
58        self.validate_read_only_ops(&txn.req.success)?;
59        self.validate_read_only_ops(&txn.req.failure)
60    }
61}
62
63#[async_trait::async_trait]
64impl TxnService for ReadOnlyKvBackend {
65    type Error = crate::error::Error;
66
67    async fn txn(&self, txn: Txn) -> Result<TxnResponse> {
68        self.validate_read_only_txn(&txn)?;
69        self.inner.txn(txn).await
70    }
71
72    fn max_txn_ops(&self) -> usize {
73        self.inner.max_txn_ops()
74    }
75}
76
77#[async_trait::async_trait]
78impl KvBackend for ReadOnlyKvBackend {
79    fn name(&self) -> &str {
80        &self.name
81    }
82
83    fn as_any(&self) -> &dyn Any {
84        self
85    }
86
87    async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
88        self.inner.range(req).await
89    }
90
91    async fn put(&self, _req: PutRequest) -> Result<PutResponse> {
92        self.read_only()
93    }
94
95    async fn batch_put(&self, _req: BatchPutRequest) -> Result<BatchPutResponse> {
96        self.read_only()
97    }
98
99    async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
100        self.inner.batch_get(req).await
101    }
102
103    async fn compare_and_put(&self, _req: CompareAndPutRequest) -> Result<CompareAndPutResponse> {
104        self.read_only()
105    }
106
107    async fn delete_range(&self, _req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
108        self.read_only()
109    }
110
111    async fn batch_delete(&self, _req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
112        self.read_only()
113    }
114
115    async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>> {
116        self.inner.get(key).await
117    }
118}
119
120#[cfg(test)]
121mod tests {
122    use std::sync::Arc;
123
124    use common_error::ext::ErrorExt;
125    use common_error::status_code::StatusCode;
126
127    use super::*;
128    use crate::error::Error;
129    use crate::kv_backend::memory::MemoryKvBackend;
130    use crate::kv_backend::txn::{Compare, CompareOp, TxnOpResponse};
131    use crate::rpc::store::{
132        BatchDeleteRequest, BatchGetRequest, BatchPutRequest, CompareAndPutRequest,
133        DeleteRangeRequest, PutRequest,
134    };
135
136    async fn read_only_backend() -> ReadOnlyKvBackend {
137        let inner = Arc::new(MemoryKvBackend::<Error>::new());
138        inner
139            .put(PutRequest::new().with_key(b"k1").with_value(b"v1"))
140            .await
141            .unwrap();
142        inner
143            .put(PutRequest::new().with_key(b"k2").with_value(b"v2"))
144            .await
145            .unwrap();
146
147        ReadOnlyKvBackend::new(inner)
148    }
149
150    fn assert_read_only<T>(result: Result<T>) {
151        let err = match result {
152            Ok(_) => panic!("expected read-only error"),
153            Err(err) => err,
154        };
155        assert!(matches!(err, Error::ReadOnlyKvBackend { .. }));
156        assert_eq!(err.status_code(), StatusCode::Unsupported);
157    }
158
159    struct TxnOnlyBackend;
160
161    #[async_trait::async_trait]
162    impl TxnService for TxnOnlyBackend {
163        type Error = Error;
164
165        async fn txn(&self, _txn: Txn) -> Result<TxnResponse> {
166            Ok(TxnResponse {
167                succeeded: true,
168                responses: vec![TxnOpResponse::ResponseGet(RangeResponse {
169                    kvs: vec![KeyValue {
170                        key: b"k1".to_vec(),
171                        value: b"v1".to_vec(),
172                    }],
173                    more: false,
174                })],
175            })
176        }
177
178        fn max_txn_ops(&self) -> usize {
179            7
180        }
181    }
182
183    #[async_trait::async_trait]
184    impl KvBackend for TxnOnlyBackend {
185        fn name(&self) -> &str {
186            "TxnOnlyBackend"
187        }
188
189        fn as_any(&self) -> &dyn Any {
190            self
191        }
192
193        async fn range(&self, _req: RangeRequest) -> Result<RangeResponse> {
194            unimplemented!("read-only txn should delegate to inner txn")
195        }
196
197        async fn put(&self, _req: PutRequest) -> Result<PutResponse> {
198            unimplemented!("read-only txn should delegate to inner txn")
199        }
200
201        async fn batch_put(&self, _req: BatchPutRequest) -> Result<BatchPutResponse> {
202            unimplemented!("read-only txn should delegate to inner txn")
203        }
204
205        async fn batch_get(&self, _req: BatchGetRequest) -> Result<BatchGetResponse> {
206            unimplemented!("read-only txn should delegate to inner txn")
207        }
208
209        async fn delete_range(&self, _req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
210            unimplemented!("read-only txn should delegate to inner txn")
211        }
212
213        async fn batch_delete(&self, _req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
214            unimplemented!("read-only txn should delegate to inner txn")
215        }
216    }
217
218    #[tokio::test]
219    async fn test_read_only_backend_forwards_reads() {
220        let backend = read_only_backend().await;
221
222        let range = backend
223            .range(RangeRequest::new().with_key(b"k1"))
224            .await
225            .unwrap();
226        assert_eq!(range.kvs.len(), 1);
227        assert_eq!(range.kvs[0].value, b"v1");
228
229        let kv = backend.get(b"k2").await.unwrap().unwrap();
230        assert_eq!(kv.value, b"v2");
231
232        let batch = backend
233            .batch_get(BatchGetRequest::new().add_key(b"k1").add_key(b"k2"))
234            .await
235            .unwrap();
236        assert_eq!(batch.kvs.len(), 2);
237    }
238
239    #[tokio::test]
240    async fn test_read_only_backend_rejects_writes() {
241        let backend = read_only_backend().await;
242
243        assert_read_only(
244            backend
245                .put(PutRequest::new().with_key(b"k3").with_value(b"v3"))
246                .await,
247        );
248        assert_read_only(
249            backend
250                .batch_put(BatchPutRequest::new().add_kv(b"k3", b"v3"))
251                .await,
252        );
253        assert_read_only(
254            backend
255                .compare_and_put(
256                    CompareAndPutRequest::new()
257                        .with_key(b"k1")
258                        .with_expect(b"v1")
259                        .with_value(b"v3"),
260                )
261                .await,
262        );
263        assert_read_only(
264            backend
265                .delete_range(DeleteRangeRequest::new().with_key(b"k1"))
266                .await,
267        );
268        assert_read_only(
269            backend
270                .batch_delete(BatchDeleteRequest::new().add_key(b"k1"))
271                .await,
272        );
273    }
274
275    #[tokio::test]
276    async fn test_read_only_backend_rejects_write_txn() {
277        let backend = read_only_backend().await;
278
279        assert_eq!(backend.max_txn_ops(), usize::MAX);
280        assert_read_only(
281            backend
282                .txn(Txn::put_if_not_exists(b"k3".to_vec(), b"v3".to_vec()))
283                .await,
284        );
285    }
286
287    #[tokio::test]
288    async fn test_read_only_backend_delegates_read_txn() {
289        let backend = ReadOnlyKvBackend::new(Arc::new(TxnOnlyBackend));
290
291        assert_eq!(backend.max_txn_ops(), 7);
292        let resp = backend
293            .txn(Txn::new().and_then(vec![TxnOp::Get(b"k1".to_vec())]))
294            .await
295            .unwrap();
296
297        assert!(resp.succeeded);
298        let TxnOpResponse::ResponseGet(range) = &resp.responses[0] else {
299            panic!("expected get response");
300        };
301        assert_eq!(range.kvs[0].value, b"v1");
302    }
303
304    #[tokio::test]
305    async fn test_read_only_backend_allows_get_only_txn() {
306        let backend = read_only_backend().await;
307
308        let resp = backend
309            .txn(Txn::new().and_then(vec![TxnOp::Get(b"k1".to_vec()), TxnOp::Get(b"k2".to_vec())]))
310            .await
311            .unwrap();
312
313        assert!(resp.succeeded);
314        assert_eq!(resp.responses.len(), 2);
315        let TxnOpResponse::ResponseGet(range) = &resp.responses[0] else {
316            panic!("expected get response");
317        };
318        assert_eq!(range.kvs.len(), 1);
319        assert_eq!(range.kvs[0].value, b"v1");
320        let TxnOpResponse::ResponseGet(range) = &resp.responses[1] else {
321            panic!("expected get response");
322        };
323        assert_eq!(range.kvs.len(), 1);
324        assert_eq!(range.kvs[0].value, b"v2");
325    }
326
327    #[tokio::test]
328    async fn test_read_only_backend_allows_compare_and_get_txn() {
329        let backend = read_only_backend().await;
330
331        let txn = Txn::new()
332            .when(vec![Compare::with_value(
333                b"k1".to_vec(),
334                CompareOp::Equal,
335                b"v1".to_vec(),
336            )])
337            .and_then(vec![TxnOp::Get(b"k2".to_vec())])
338            .or_else(vec![TxnOp::Get(b"k1".to_vec())]);
339        let resp = backend.txn(txn).await.unwrap();
340
341        assert!(resp.succeeded);
342        let TxnOpResponse::ResponseGet(range) = &resp.responses[0] else {
343            panic!("expected get response");
344        };
345        assert_eq!(range.kvs.len(), 1);
346        assert_eq!(range.kvs[0].value, b"v2");
347    }
348}