common_meta/
kv_backend.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use common_error::ext::ErrorExt;
20pub use txn::TxnService;
21
22use crate::error::Error;
23use crate::kv_backend::txn::{Txn, TxnOpResponse};
24use crate::rpc::store::{
25    BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
26    BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
27    DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
28};
29use crate::rpc::KeyValue;
30
31pub mod chroot;
32pub mod etcd;
33pub mod memory;
34#[cfg(any(feature = "mysql_kvbackend", feature = "pg_kvbackend"))]
35pub mod rds;
36pub mod test;
37pub mod txn;
38
39pub type KvBackendRef<E = Error> = Arc<dyn KvBackend<Error = E> + Send + Sync>;
40
41#[async_trait]
42pub trait KvBackend: TxnService
43where
44    Self::Error: ErrorExt,
45{
46    fn name(&self) -> &str;
47
48    fn as_any(&self) -> &dyn Any;
49
50    async fn range(&self, req: RangeRequest) -> Result<RangeResponse, Self::Error>;
51
52    async fn put(&self, req: PutRequest) -> Result<PutResponse, Self::Error>;
53
54    async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse, Self::Error>;
55
56    async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse, Self::Error>;
57
58    async fn delete_range(
59        &self,
60        req: DeleteRangeRequest,
61    ) -> Result<DeleteRangeResponse, Self::Error>;
62
63    async fn batch_delete(
64        &self,
65        req: BatchDeleteRequest,
66    ) -> Result<BatchDeleteResponse, Self::Error>;
67
68    // The following methods are implemented based on the above methods,
69    // and a higher-level interface is provided for to simplify usage.
70
71    async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>, Self::Error> {
72        let req = RangeRequest::new().with_key(key.to_vec());
73        let mut resp = self.range(req).await?;
74        Ok(if resp.kvs.is_empty() {
75            None
76        } else {
77            Some(resp.kvs.remove(0))
78        })
79    }
80
81    /// CAS: Compares the value at the key with the given value, and if they are
82    /// equal, puts the new value at the key.
83    async fn compare_and_put(
84        &self,
85        req: CompareAndPutRequest,
86    ) -> Result<CompareAndPutResponse, Self::Error> {
87        let CompareAndPutRequest { key, expect, value } = req;
88        let txn = if expect.is_empty() {
89            Txn::put_if_not_exists(key, value)
90        } else {
91            Txn::compare_and_put(key, expect, value)
92        };
93        let txn_res = self.txn(txn).await?;
94
95        let success = txn_res.succeeded;
96        // The response is guaranteed to have at most one element.
97        let op_res = txn_res.responses.into_iter().next();
98        let prev_kv = match op_res {
99            Some(TxnOpResponse::ResponsePut(res)) => res.prev_kv,
100            Some(TxnOpResponse::ResponseGet(res)) => res.kvs.into_iter().next(),
101            Some(TxnOpResponse::ResponseDelete(res)) => res.prev_kvs.into_iter().next(),
102            None => None,
103        };
104
105        Ok(CompareAndPutResponse { success, prev_kv })
106    }
107
108    /// Puts a value at a key. If `if_not_exists` is `true`, the operation
109    /// ensures the key does not exist before applying the PUT operation.
110    /// Otherwise, it simply applies the PUT operation without checking for
111    /// the key's existence.
112    async fn put_conditionally(
113        &self,
114        key: Vec<u8>,
115        value: Vec<u8>,
116        if_not_exists: bool,
117    ) -> Result<bool, Self::Error> {
118        let success = if if_not_exists {
119            let req = CompareAndPutRequest::new()
120                .with_key(key)
121                .with_expect(vec![])
122                .with_value(value);
123            let res = self.compare_and_put(req).await?;
124            res.success
125        } else {
126            let req = PutRequest::new().with_key(key).with_value(value);
127            self.put(req).await?;
128            true
129        };
130
131        Ok(success)
132    }
133
134    /// Check if the key exists, not returning the value.
135    /// If the value is large, this method is more efficient than `get`.
136    async fn exists(&self, key: &[u8]) -> Result<bool, Self::Error> {
137        let req = RangeRequest::new().with_key(key.to_vec()).with_keys_only();
138        let resp = self.range(req).await?;
139        Ok(!resp.kvs.is_empty())
140    }
141
142    /// Returns previous key-value pair if `prev_kv` is `true`.
143    async fn delete(&self, key: &[u8], prev_kv: bool) -> Result<Option<KeyValue>, Self::Error> {
144        let mut req = DeleteRangeRequest::new().with_key(key.to_vec());
145        if prev_kv {
146            req = req.with_prev_kv();
147        }
148
149        let resp = self.delete_range(req).await?;
150
151        if prev_kv {
152            Ok(resp.prev_kvs.into_iter().next())
153        } else {
154            Ok(None)
155        }
156    }
157}
158
159pub trait ResettableKvBackend: KvBackend
160where
161    Self::Error: ErrorExt,
162{
163    fn reset(&self);
164
165    /// Upcast as `KvBackendRef`. Since https://github.com/rust-lang/rust/issues/65991 is not yet stable.
166    fn as_kv_backend_ref(self: Arc<Self>) -> KvBackendRef<Self::Error>;
167}
168
169pub type ResettableKvBackendRef<E = Error> = Arc<dyn ResettableKvBackend<Error = E> + Send + Sync>;