1use std::any::Any;
16
17use crate::error::{ReadOnlyKvBackendSnafu, Result};
18use crate::kv_backend::txn::{Txn, TxnOp, TxnResponse};
19use crate::kv_backend::{KvBackend, KvBackendRef, TxnService};
20use crate::rpc::KeyValue;
21use crate::rpc::store::{
22 BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
23 BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
24 DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
25};
26
27pub struct ReadOnlyKvBackend {
29 inner: KvBackendRef,
30 name: String,
31}
32
33impl ReadOnlyKvBackend {
34 pub fn new(inner: KvBackendRef) -> Self {
35 let name = format!("ReadOnlyKvBackend({})", inner.name());
36 Self { inner, name }
37 }
38
39 fn read_only<T>(&self) -> Result<T> {
40 ReadOnlyKvBackendSnafu {
41 name: self.name.clone(),
42 }
43 .fail()
44 }
45
46 fn validate_read_only_ops(&self, ops: &[TxnOp]) -> Result<()> {
47 if ops
48 .iter()
49 .any(|op| matches!(op, TxnOp::Put(_, _) | TxnOp::Delete(_)))
50 {
51 self.read_only()
52 } else {
53 Ok(())
54 }
55 }
56
57 fn validate_read_only_txn(&self, txn: &Txn) -> Result<()> {
58 self.validate_read_only_ops(&txn.req.success)?;
59 self.validate_read_only_ops(&txn.req.failure)
60 }
61}
62
63#[async_trait::async_trait]
64impl TxnService for ReadOnlyKvBackend {
65 type Error = crate::error::Error;
66
67 async fn txn(&self, txn: Txn) -> Result<TxnResponse> {
68 self.validate_read_only_txn(&txn)?;
69 self.inner.txn(txn).await
70 }
71
72 fn max_txn_ops(&self) -> usize {
73 self.inner.max_txn_ops()
74 }
75}
76
77#[async_trait::async_trait]
78impl KvBackend for ReadOnlyKvBackend {
79 fn name(&self) -> &str {
80 &self.name
81 }
82
83 fn as_any(&self) -> &dyn Any {
84 self
85 }
86
87 async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
88 self.inner.range(req).await
89 }
90
91 async fn put(&self, _req: PutRequest) -> Result<PutResponse> {
92 self.read_only()
93 }
94
95 async fn batch_put(&self, _req: BatchPutRequest) -> Result<BatchPutResponse> {
96 self.read_only()
97 }
98
99 async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
100 self.inner.batch_get(req).await
101 }
102
103 async fn compare_and_put(&self, _req: CompareAndPutRequest) -> Result<CompareAndPutResponse> {
104 self.read_only()
105 }
106
107 async fn delete_range(&self, _req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
108 self.read_only()
109 }
110
111 async fn batch_delete(&self, _req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
112 self.read_only()
113 }
114
115 async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>> {
116 self.inner.get(key).await
117 }
118}
119
120#[cfg(test)]
121mod tests {
122 use std::sync::Arc;
123
124 use common_error::ext::ErrorExt;
125 use common_error::status_code::StatusCode;
126
127 use super::*;
128 use crate::error::Error;
129 use crate::kv_backend::memory::MemoryKvBackend;
130 use crate::kv_backend::txn::{Compare, CompareOp, TxnOpResponse};
131 use crate::rpc::store::{
132 BatchDeleteRequest, BatchGetRequest, BatchPutRequest, CompareAndPutRequest,
133 DeleteRangeRequest, PutRequest,
134 };
135
136 async fn read_only_backend() -> ReadOnlyKvBackend {
137 let inner = Arc::new(MemoryKvBackend::<Error>::new());
138 inner
139 .put(PutRequest::new().with_key(b"k1").with_value(b"v1"))
140 .await
141 .unwrap();
142 inner
143 .put(PutRequest::new().with_key(b"k2").with_value(b"v2"))
144 .await
145 .unwrap();
146
147 ReadOnlyKvBackend::new(inner)
148 }
149
150 fn assert_read_only<T>(result: Result<T>) {
151 let err = match result {
152 Ok(_) => panic!("expected read-only error"),
153 Err(err) => err,
154 };
155 assert!(matches!(err, Error::ReadOnlyKvBackend { .. }));
156 assert_eq!(err.status_code(), StatusCode::Unsupported);
157 }
158
159 struct TxnOnlyBackend;
160
161 #[async_trait::async_trait]
162 impl TxnService for TxnOnlyBackend {
163 type Error = Error;
164
165 async fn txn(&self, _txn: Txn) -> Result<TxnResponse> {
166 Ok(TxnResponse {
167 succeeded: true,
168 responses: vec![TxnOpResponse::ResponseGet(RangeResponse {
169 kvs: vec![KeyValue {
170 key: b"k1".to_vec(),
171 value: b"v1".to_vec(),
172 }],
173 more: false,
174 })],
175 })
176 }
177
178 fn max_txn_ops(&self) -> usize {
179 7
180 }
181 }
182
183 #[async_trait::async_trait]
184 impl KvBackend for TxnOnlyBackend {
185 fn name(&self) -> &str {
186 "TxnOnlyBackend"
187 }
188
189 fn as_any(&self) -> &dyn Any {
190 self
191 }
192
193 async fn range(&self, _req: RangeRequest) -> Result<RangeResponse> {
194 unimplemented!("read-only txn should delegate to inner txn")
195 }
196
197 async fn put(&self, _req: PutRequest) -> Result<PutResponse> {
198 unimplemented!("read-only txn should delegate to inner txn")
199 }
200
201 async fn batch_put(&self, _req: BatchPutRequest) -> Result<BatchPutResponse> {
202 unimplemented!("read-only txn should delegate to inner txn")
203 }
204
205 async fn batch_get(&self, _req: BatchGetRequest) -> Result<BatchGetResponse> {
206 unimplemented!("read-only txn should delegate to inner txn")
207 }
208
209 async fn delete_range(&self, _req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
210 unimplemented!("read-only txn should delegate to inner txn")
211 }
212
213 async fn batch_delete(&self, _req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
214 unimplemented!("read-only txn should delegate to inner txn")
215 }
216 }
217
218 #[tokio::test]
219 async fn test_read_only_backend_forwards_reads() {
220 let backend = read_only_backend().await;
221
222 let range = backend
223 .range(RangeRequest::new().with_key(b"k1"))
224 .await
225 .unwrap();
226 assert_eq!(range.kvs.len(), 1);
227 assert_eq!(range.kvs[0].value, b"v1");
228
229 let kv = backend.get(b"k2").await.unwrap().unwrap();
230 assert_eq!(kv.value, b"v2");
231
232 let batch = backend
233 .batch_get(BatchGetRequest::new().add_key(b"k1").add_key(b"k2"))
234 .await
235 .unwrap();
236 assert_eq!(batch.kvs.len(), 2);
237 }
238
239 #[tokio::test]
240 async fn test_read_only_backend_rejects_writes() {
241 let backend = read_only_backend().await;
242
243 assert_read_only(
244 backend
245 .put(PutRequest::new().with_key(b"k3").with_value(b"v3"))
246 .await,
247 );
248 assert_read_only(
249 backend
250 .batch_put(BatchPutRequest::new().add_kv(b"k3", b"v3"))
251 .await,
252 );
253 assert_read_only(
254 backend
255 .compare_and_put(
256 CompareAndPutRequest::new()
257 .with_key(b"k1")
258 .with_expect(b"v1")
259 .with_value(b"v3"),
260 )
261 .await,
262 );
263 assert_read_only(
264 backend
265 .delete_range(DeleteRangeRequest::new().with_key(b"k1"))
266 .await,
267 );
268 assert_read_only(
269 backend
270 .batch_delete(BatchDeleteRequest::new().add_key(b"k1"))
271 .await,
272 );
273 }
274
275 #[tokio::test]
276 async fn test_read_only_backend_rejects_write_txn() {
277 let backend = read_only_backend().await;
278
279 assert_eq!(backend.max_txn_ops(), usize::MAX);
280 assert_read_only(
281 backend
282 .txn(Txn::put_if_not_exists(b"k3".to_vec(), b"v3".to_vec()))
283 .await,
284 );
285 }
286
287 #[tokio::test]
288 async fn test_read_only_backend_delegates_read_txn() {
289 let backend = ReadOnlyKvBackend::new(Arc::new(TxnOnlyBackend));
290
291 assert_eq!(backend.max_txn_ops(), 7);
292 let resp = backend
293 .txn(Txn::new().and_then(vec![TxnOp::Get(b"k1".to_vec())]))
294 .await
295 .unwrap();
296
297 assert!(resp.succeeded);
298 let TxnOpResponse::ResponseGet(range) = &resp.responses[0] else {
299 panic!("expected get response");
300 };
301 assert_eq!(range.kvs[0].value, b"v1");
302 }
303
304 #[tokio::test]
305 async fn test_read_only_backend_allows_get_only_txn() {
306 let backend = read_only_backend().await;
307
308 let resp = backend
309 .txn(Txn::new().and_then(vec![TxnOp::Get(b"k1".to_vec()), TxnOp::Get(b"k2".to_vec())]))
310 .await
311 .unwrap();
312
313 assert!(resp.succeeded);
314 assert_eq!(resp.responses.len(), 2);
315 let TxnOpResponse::ResponseGet(range) = &resp.responses[0] else {
316 panic!("expected get response");
317 };
318 assert_eq!(range.kvs.len(), 1);
319 assert_eq!(range.kvs[0].value, b"v1");
320 let TxnOpResponse::ResponseGet(range) = &resp.responses[1] else {
321 panic!("expected get response");
322 };
323 assert_eq!(range.kvs.len(), 1);
324 assert_eq!(range.kvs[0].value, b"v2");
325 }
326
327 #[tokio::test]
328 async fn test_read_only_backend_allows_compare_and_get_txn() {
329 let backend = read_only_backend().await;
330
331 let txn = Txn::new()
332 .when(vec![Compare::with_value(
333 b"k1".to_vec(),
334 CompareOp::Equal,
335 b"v1".to_vec(),
336 )])
337 .and_then(vec![TxnOp::Get(b"k2".to_vec())])
338 .or_else(vec![TxnOp::Get(b"k1".to_vec())]);
339 let resp = backend.txn(txn).await.unwrap();
340
341 assert!(resp.succeeded);
342 let TxnOpResponse::ResponseGet(range) = &resp.responses[0] else {
343 panic!("expected get response");
344 };
345 assert_eq!(range.kvs.len(), 1);
346 assert_eq!(range.kvs[0].value, b"v2");
347 }
348}