common_meta/
kv_backend.rsuse std::any::Any;
use std::sync::Arc;
use async_trait::async_trait;
use common_error::ext::ErrorExt;
pub use txn::TxnService;
use crate::error::Error;
use crate::kv_backend::txn::{Txn, TxnOpResponse};
use crate::rpc::store::{
BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
};
use crate::rpc::KeyValue;
pub mod chroot;
pub mod etcd;
pub mod memory;
#[cfg(feature = "pg_kvbackend")]
pub mod postgres;
pub mod test;
pub mod txn;
pub type KvBackendRef<E = Error> = Arc<dyn KvBackend<Error = E> + Send + Sync>;
#[async_trait]
pub trait KvBackend: TxnService
where
Self::Error: ErrorExt,
{
fn name(&self) -> &str;
fn as_any(&self) -> &dyn Any;
async fn range(&self, req: RangeRequest) -> Result<RangeResponse, Self::Error>;
async fn put(&self, req: PutRequest) -> Result<PutResponse, Self::Error>;
async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse, Self::Error>;
async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse, Self::Error>;
async fn delete_range(
&self,
req: DeleteRangeRequest,
) -> Result<DeleteRangeResponse, Self::Error>;
async fn batch_delete(
&self,
req: BatchDeleteRequest,
) -> Result<BatchDeleteResponse, Self::Error>;
async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>, Self::Error> {
let req = RangeRequest::new().with_key(key.to_vec());
let mut resp = self.range(req).await?;
Ok(if resp.kvs.is_empty() {
None
} else {
Some(resp.kvs.remove(0))
})
}
async fn compare_and_put(
&self,
req: CompareAndPutRequest,
) -> Result<CompareAndPutResponse, Self::Error> {
let CompareAndPutRequest { key, expect, value } = req;
let txn = if expect.is_empty() {
Txn::put_if_not_exists(key, value)
} else {
Txn::compare_and_put(key, expect, value)
};
let txn_res = self.txn(txn).await?;
let success = txn_res.succeeded;
let op_res = txn_res.responses.into_iter().next();
let prev_kv = match op_res {
Some(TxnOpResponse::ResponsePut(res)) => res.prev_kv,
Some(TxnOpResponse::ResponseGet(res)) => res.kvs.into_iter().next(),
Some(TxnOpResponse::ResponseDelete(res)) => res.prev_kvs.into_iter().next(),
None => None,
};
Ok(CompareAndPutResponse { success, prev_kv })
}
async fn put_conditionally(
&self,
key: Vec<u8>,
value: Vec<u8>,
if_not_exists: bool,
) -> Result<bool, Self::Error> {
let success = if if_not_exists {
let req = CompareAndPutRequest::new()
.with_key(key)
.with_expect(vec![])
.with_value(value);
let res = self.compare_and_put(req).await?;
res.success
} else {
let req = PutRequest::new().with_key(key).with_value(value);
self.put(req).await?;
true
};
Ok(success)
}
async fn exists(&self, key: &[u8]) -> Result<bool, Self::Error> {
let req = RangeRequest::new().with_key(key.to_vec()).with_keys_only();
let resp = self.range(req).await?;
Ok(!resp.kvs.is_empty())
}
async fn delete(&self, key: &[u8], prev_kv: bool) -> Result<Option<KeyValue>, Self::Error> {
let mut req = DeleteRangeRequest::new().with_key(key.to_vec());
if prev_kv {
req = req.with_prev_kv();
}
let resp = self.delete_range(req).await?;
if prev_kv {
Ok(resp.prev_kvs.into_iter().next())
} else {
Ok(None)
}
}
}
pub trait ResettableKvBackend: KvBackend
where
Self::Error: ErrorExt,
{
fn reset(&self);
fn as_kv_backend_ref(self: Arc<Self>) -> KvBackendRef<Self::Error>;
}
pub type ResettableKvBackendRef<E = Error> = Arc<dyn ResettableKvBackend<Error = E> + Send + Sync>;