common_meta/
kv_backend.rs1use std::any::Any;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use common_error::ext::ErrorExt;
20pub use txn::TxnService;
21
22use crate::error::Error;
23use crate::kv_backend::txn::{Txn, TxnOpResponse};
24use crate::rpc::KeyValue;
25use crate::rpc::store::{
26 BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
27 BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
28 DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
29};
30
31pub mod chroot;
32pub mod etcd;
33pub mod memory;
34#[cfg(any(feature = "mysql_kvbackend", feature = "pg_kvbackend"))]
35pub mod rds;
36pub mod read_only;
37pub mod test;
38#[cfg(any(test, feature = "testing"))]
39pub mod test_util;
40pub mod txn;
41pub mod util;
42pub type KvBackendRef<E = Error> = Arc<dyn KvBackend<Error = E> + Send + Sync>;
43
44#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
45pub const DEFAULT_META_TABLE_NAME: &str = "greptime_metakv";
47
48#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
49pub const DEFAULT_META_ELECTION_LOCK_ID: u64 = 1;
51
52#[async_trait]
53pub trait KvBackend: TxnService
54where
55 Self::Error: ErrorExt,
56{
57 fn name(&self) -> &str;
58
59 fn as_any(&self) -> &dyn Any;
60
61 async fn range(&self, req: RangeRequest) -> Result<RangeResponse, Self::Error>;
62
63 async fn put(&self, req: PutRequest) -> Result<PutResponse, Self::Error>;
64
65 async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse, Self::Error>;
66
67 async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse, Self::Error>;
68
69 async fn delete_range(
70 &self,
71 req: DeleteRangeRequest,
72 ) -> Result<DeleteRangeResponse, Self::Error>;
73
74 async fn batch_delete(
75 &self,
76 req: BatchDeleteRequest,
77 ) -> Result<BatchDeleteResponse, Self::Error>;
78
79 async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>, Self::Error> {
83 let req = RangeRequest::new().with_key(key.to_vec());
84 let mut resp = self.range(req).await?;
85 Ok(if resp.kvs.is_empty() {
86 None
87 } else {
88 Some(resp.kvs.remove(0))
89 })
90 }
91
92 async fn compare_and_put(
95 &self,
96 req: CompareAndPutRequest,
97 ) -> Result<CompareAndPutResponse, Self::Error> {
98 let CompareAndPutRequest { key, expect, value } = req;
99 let txn = if expect.is_empty() {
100 Txn::put_if_not_exists(key, value)
101 } else {
102 Txn::compare_and_put(key, expect, value)
103 };
104 let txn_res = self.txn(txn).await?;
105
106 let success = txn_res.succeeded;
107 let op_res = txn_res.responses.into_iter().next();
109 let prev_kv = match op_res {
110 Some(TxnOpResponse::ResponsePut(res)) => res.prev_kv,
111 Some(TxnOpResponse::ResponseGet(res)) => res.kvs.into_iter().next(),
112 Some(TxnOpResponse::ResponseDelete(res)) => res.prev_kvs.into_iter().next(),
113 None => None,
114 };
115
116 Ok(CompareAndPutResponse { success, prev_kv })
117 }
118
119 async fn put_conditionally(
124 &self,
125 key: Vec<u8>,
126 value: Vec<u8>,
127 if_not_exists: bool,
128 ) -> Result<bool, Self::Error> {
129 let success = if if_not_exists {
130 let req = CompareAndPutRequest::new()
131 .with_key(key)
132 .with_expect(vec![])
133 .with_value(value);
134 let res = self.compare_and_put(req).await?;
135 res.success
136 } else {
137 let req = PutRequest::new().with_key(key).with_value(value);
138 self.put(req).await?;
139 true
140 };
141
142 Ok(success)
143 }
144
145 async fn exists(&self, key: &[u8]) -> Result<bool, Self::Error> {
148 let req = RangeRequest::new().with_key(key.to_vec()).with_keys_only();
149 let resp = self.range(req).await?;
150 Ok(!resp.kvs.is_empty())
151 }
152
153 async fn delete(&self, key: &[u8], prev_kv: bool) -> Result<Option<KeyValue>, Self::Error> {
155 let mut req = DeleteRangeRequest::new().with_key(key.to_vec());
156 if prev_kv {
157 req = req.with_prev_kv();
158 }
159
160 let resp = self.delete_range(req).await?;
161
162 if prev_kv {
163 Ok(resp.prev_kvs.into_iter().next())
164 } else {
165 Ok(None)
166 }
167 }
168}
169
170pub trait ResettableKvBackend: KvBackend
171where
172 Self::Error: ErrorExt,
173{
174 fn reset(&self);
175
176 fn as_kv_backend_ref(self: Arc<Self>) -> KvBackendRef<Self::Error>;
178}
179
180pub type ResettableKvBackendRef<E = Error> = Arc<dyn ResettableKvBackend<Error = E> + Send + Sync>;