common_meta/
kv_backend.rs1use std::any::Any;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use common_error::ext::ErrorExt;
20pub use txn::TxnService;
21
22use crate::error::Error;
23use crate::kv_backend::txn::{Txn, TxnOpResponse};
24use crate::rpc::KeyValue;
25use crate::rpc::store::{
26 BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
27 BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
28 DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
29};
30
31pub mod chroot;
32pub mod etcd;
33pub mod memory;
34#[cfg(any(feature = "mysql_kvbackend", feature = "pg_kvbackend"))]
35pub mod rds;
36pub mod test;
37#[cfg(any(test, feature = "testing"))]
38pub mod test_util;
39pub mod txn;
40pub mod util;
41pub type KvBackendRef<E = Error> = Arc<dyn KvBackend<Error = E> + Send + Sync>;
42
43#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
44pub const DEFAULT_META_TABLE_NAME: &str = "greptime_metakv";
46
47#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
48pub const DEFAULT_META_ELECTION_LOCK_ID: u64 = 1;
50
51#[async_trait]
52pub trait KvBackend: TxnService
53where
54 Self::Error: ErrorExt,
55{
56 fn name(&self) -> &str;
57
58 fn as_any(&self) -> &dyn Any;
59
60 async fn range(&self, req: RangeRequest) -> Result<RangeResponse, Self::Error>;
61
62 async fn put(&self, req: PutRequest) -> Result<PutResponse, Self::Error>;
63
64 async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse, Self::Error>;
65
66 async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse, Self::Error>;
67
68 async fn delete_range(
69 &self,
70 req: DeleteRangeRequest,
71 ) -> Result<DeleteRangeResponse, Self::Error>;
72
73 async fn batch_delete(
74 &self,
75 req: BatchDeleteRequest,
76 ) -> Result<BatchDeleteResponse, Self::Error>;
77
78 async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>, Self::Error> {
82 let req = RangeRequest::new().with_key(key.to_vec());
83 let mut resp = self.range(req).await?;
84 Ok(if resp.kvs.is_empty() {
85 None
86 } else {
87 Some(resp.kvs.remove(0))
88 })
89 }
90
91 async fn compare_and_put(
94 &self,
95 req: CompareAndPutRequest,
96 ) -> Result<CompareAndPutResponse, Self::Error> {
97 let CompareAndPutRequest { key, expect, value } = req;
98 let txn = if expect.is_empty() {
99 Txn::put_if_not_exists(key, value)
100 } else {
101 Txn::compare_and_put(key, expect, value)
102 };
103 let txn_res = self.txn(txn).await?;
104
105 let success = txn_res.succeeded;
106 let op_res = txn_res.responses.into_iter().next();
108 let prev_kv = match op_res {
109 Some(TxnOpResponse::ResponsePut(res)) => res.prev_kv,
110 Some(TxnOpResponse::ResponseGet(res)) => res.kvs.into_iter().next(),
111 Some(TxnOpResponse::ResponseDelete(res)) => res.prev_kvs.into_iter().next(),
112 None => None,
113 };
114
115 Ok(CompareAndPutResponse { success, prev_kv })
116 }
117
118 async fn put_conditionally(
123 &self,
124 key: Vec<u8>,
125 value: Vec<u8>,
126 if_not_exists: bool,
127 ) -> Result<bool, Self::Error> {
128 let success = if if_not_exists {
129 let req = CompareAndPutRequest::new()
130 .with_key(key)
131 .with_expect(vec![])
132 .with_value(value);
133 let res = self.compare_and_put(req).await?;
134 res.success
135 } else {
136 let req = PutRequest::new().with_key(key).with_value(value);
137 self.put(req).await?;
138 true
139 };
140
141 Ok(success)
142 }
143
144 async fn exists(&self, key: &[u8]) -> Result<bool, Self::Error> {
147 let req = RangeRequest::new().with_key(key.to_vec()).with_keys_only();
148 let resp = self.range(req).await?;
149 Ok(!resp.kvs.is_empty())
150 }
151
152 async fn delete(&self, key: &[u8], prev_kv: bool) -> Result<Option<KeyValue>, Self::Error> {
154 let mut req = DeleteRangeRequest::new().with_key(key.to_vec());
155 if prev_kv {
156 req = req.with_prev_kv();
157 }
158
159 let resp = self.delete_range(req).await?;
160
161 if prev_kv {
162 Ok(resp.prev_kvs.into_iter().next())
163 } else {
164 Ok(None)
165 }
166 }
167}
168
169pub trait ResettableKvBackend: KvBackend
170where
171 Self::Error: ErrorExt,
172{
173 fn reset(&self);
174
175 fn as_kv_backend_ref(self: Arc<Self>) -> KvBackendRef<Self::Error>;
177}
178
179pub type ResettableKvBackendRef<E = Error> = Arc<dyn ResettableKvBackend<Error = E> + Send + Sync>;