common_meta/
kv_backend.rs1use std::any::Any;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use common_error::ext::ErrorExt;
20pub use txn::TxnService;
21
22use crate::error::Error;
23use crate::kv_backend::txn::{Txn, TxnOpResponse};
24use crate::rpc::store::{
25 BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
26 BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
27 DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
28};
29use crate::rpc::KeyValue;
30
31pub mod chroot;
32pub mod etcd;
33pub mod memory;
34#[cfg(any(feature = "mysql_kvbackend", feature = "pg_kvbackend"))]
35pub mod rds;
36pub mod test;
37pub mod txn;
38pub mod util;
39pub type KvBackendRef<E = Error> = Arc<dyn KvBackend<Error = E> + Send + Sync>;
40
41#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
42pub const DEFAULT_META_TABLE_NAME: &str = "greptime_metakv";
44
45#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
46pub const DEFAULT_META_ELECTION_LOCK_ID: u64 = 1;
48
49#[async_trait]
50pub trait KvBackend: TxnService
51where
52 Self::Error: ErrorExt,
53{
54 fn name(&self) -> &str;
55
56 fn as_any(&self) -> &dyn Any;
57
58 async fn range(&self, req: RangeRequest) -> Result<RangeResponse, Self::Error>;
59
60 async fn put(&self, req: PutRequest) -> Result<PutResponse, Self::Error>;
61
62 async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse, Self::Error>;
63
64 async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse, Self::Error>;
65
66 async fn delete_range(
67 &self,
68 req: DeleteRangeRequest,
69 ) -> Result<DeleteRangeResponse, Self::Error>;
70
71 async fn batch_delete(
72 &self,
73 req: BatchDeleteRequest,
74 ) -> Result<BatchDeleteResponse, Self::Error>;
75
76 async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>, Self::Error> {
80 let req = RangeRequest::new().with_key(key.to_vec());
81 let mut resp = self.range(req).await?;
82 Ok(if resp.kvs.is_empty() {
83 None
84 } else {
85 Some(resp.kvs.remove(0))
86 })
87 }
88
89 async fn compare_and_put(
92 &self,
93 req: CompareAndPutRequest,
94 ) -> Result<CompareAndPutResponse, Self::Error> {
95 let CompareAndPutRequest { key, expect, value } = req;
96 let txn = if expect.is_empty() {
97 Txn::put_if_not_exists(key, value)
98 } else {
99 Txn::compare_and_put(key, expect, value)
100 };
101 let txn_res = self.txn(txn).await?;
102
103 let success = txn_res.succeeded;
104 let op_res = txn_res.responses.into_iter().next();
106 let prev_kv = match op_res {
107 Some(TxnOpResponse::ResponsePut(res)) => res.prev_kv,
108 Some(TxnOpResponse::ResponseGet(res)) => res.kvs.into_iter().next(),
109 Some(TxnOpResponse::ResponseDelete(res)) => res.prev_kvs.into_iter().next(),
110 None => None,
111 };
112
113 Ok(CompareAndPutResponse { success, prev_kv })
114 }
115
116 async fn put_conditionally(
121 &self,
122 key: Vec<u8>,
123 value: Vec<u8>,
124 if_not_exists: bool,
125 ) -> Result<bool, Self::Error> {
126 let success = if if_not_exists {
127 let req = CompareAndPutRequest::new()
128 .with_key(key)
129 .with_expect(vec![])
130 .with_value(value);
131 let res = self.compare_and_put(req).await?;
132 res.success
133 } else {
134 let req = PutRequest::new().with_key(key).with_value(value);
135 self.put(req).await?;
136 true
137 };
138
139 Ok(success)
140 }
141
142 async fn exists(&self, key: &[u8]) -> Result<bool, Self::Error> {
145 let req = RangeRequest::new().with_key(key.to_vec()).with_keys_only();
146 let resp = self.range(req).await?;
147 Ok(!resp.kvs.is_empty())
148 }
149
150 async fn delete(&self, key: &[u8], prev_kv: bool) -> Result<Option<KeyValue>, Self::Error> {
152 let mut req = DeleteRangeRequest::new().with_key(key.to_vec());
153 if prev_kv {
154 req = req.with_prev_kv();
155 }
156
157 let resp = self.delete_range(req).await?;
158
159 if prev_kv {
160 Ok(resp.prev_kvs.into_iter().next())
161 } else {
162 Ok(None)
163 }
164 }
165}
166
167pub trait ResettableKvBackend: KvBackend
168where
169 Self::Error: ErrorExt,
170{
171 fn reset(&self);
172
173 fn as_kv_backend_ref(self: Arc<Self>) -> KvBackendRef<Self::Error>;
175}
176
177pub type ResettableKvBackendRef<E = Error> = Arc<dyn ResettableKvBackend<Error = E> + Send + Sync>;