common_meta/
kv_backend.rs1use std::any::Any;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use common_error::ext::ErrorExt;
20pub use txn::TxnService;
21
22use crate::error::Error;
23use crate::kv_backend::txn::{Txn, TxnOpResponse};
24use crate::rpc::store::{
25 BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
26 BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
27 DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
28};
29use crate::rpc::KeyValue;
30
31pub mod chroot;
32pub mod etcd;
33pub mod memory;
34#[cfg(any(feature = "mysql_kvbackend", feature = "pg_kvbackend"))]
35pub mod rds;
36pub mod test;
37pub mod txn;
38
39pub type KvBackendRef<E = Error> = Arc<dyn KvBackend<Error = E> + Send + Sync>;
40
41#[async_trait]
42pub trait KvBackend: TxnService
43where
44 Self::Error: ErrorExt,
45{
46 fn name(&self) -> &str;
47
48 fn as_any(&self) -> &dyn Any;
49
50 async fn range(&self, req: RangeRequest) -> Result<RangeResponse, Self::Error>;
51
52 async fn put(&self, req: PutRequest) -> Result<PutResponse, Self::Error>;
53
54 async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse, Self::Error>;
55
56 async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse, Self::Error>;
57
58 async fn delete_range(
59 &self,
60 req: DeleteRangeRequest,
61 ) -> Result<DeleteRangeResponse, Self::Error>;
62
63 async fn batch_delete(
64 &self,
65 req: BatchDeleteRequest,
66 ) -> Result<BatchDeleteResponse, Self::Error>;
67
68 async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>, Self::Error> {
72 let req = RangeRequest::new().with_key(key.to_vec());
73 let mut resp = self.range(req).await?;
74 Ok(if resp.kvs.is_empty() {
75 None
76 } else {
77 Some(resp.kvs.remove(0))
78 })
79 }
80
81 async fn compare_and_put(
84 &self,
85 req: CompareAndPutRequest,
86 ) -> Result<CompareAndPutResponse, Self::Error> {
87 let CompareAndPutRequest { key, expect, value } = req;
88 let txn = if expect.is_empty() {
89 Txn::put_if_not_exists(key, value)
90 } else {
91 Txn::compare_and_put(key, expect, value)
92 };
93 let txn_res = self.txn(txn).await?;
94
95 let success = txn_res.succeeded;
96 let op_res = txn_res.responses.into_iter().next();
98 let prev_kv = match op_res {
99 Some(TxnOpResponse::ResponsePut(res)) => res.prev_kv,
100 Some(TxnOpResponse::ResponseGet(res)) => res.kvs.into_iter().next(),
101 Some(TxnOpResponse::ResponseDelete(res)) => res.prev_kvs.into_iter().next(),
102 None => None,
103 };
104
105 Ok(CompareAndPutResponse { success, prev_kv })
106 }
107
108 async fn put_conditionally(
113 &self,
114 key: Vec<u8>,
115 value: Vec<u8>,
116 if_not_exists: bool,
117 ) -> Result<bool, Self::Error> {
118 let success = if if_not_exists {
119 let req = CompareAndPutRequest::new()
120 .with_key(key)
121 .with_expect(vec![])
122 .with_value(value);
123 let res = self.compare_and_put(req).await?;
124 res.success
125 } else {
126 let req = PutRequest::new().with_key(key).with_value(value);
127 self.put(req).await?;
128 true
129 };
130
131 Ok(success)
132 }
133
134 async fn exists(&self, key: &[u8]) -> Result<bool, Self::Error> {
137 let req = RangeRequest::new().with_key(key.to_vec()).with_keys_only();
138 let resp = self.range(req).await?;
139 Ok(!resp.kvs.is_empty())
140 }
141
142 async fn delete(&self, key: &[u8], prev_kv: bool) -> Result<Option<KeyValue>, Self::Error> {
144 let mut req = DeleteRangeRequest::new().with_key(key.to_vec());
145 if prev_kv {
146 req = req.with_prev_kv();
147 }
148
149 let resp = self.delete_range(req).await?;
150
151 if prev_kv {
152 Ok(resp.prev_kvs.into_iter().next())
153 } else {
154 Ok(None)
155 }
156 }
157}
158
159pub trait ResettableKvBackend: KvBackend
160where
161 Self::Error: ErrorExt,
162{
163 fn reset(&self);
164
165 fn as_kv_backend_ref(self: Arc<Self>) -> KvBackendRef<Self::Error>;
167}
168
169pub type ResettableKvBackendRef<E = Error> = Arc<dyn ResettableKvBackend<Error = E> + Send + Sync>;