common_meta/kv_backend/txn/
etcd.rsuse etcd_client::{
Compare as EtcdCompare, CompareOp as EtcdCompareOp, Txn as EtcdTxn, TxnOp as EtcdTxnOp,
TxnOpResponse as EtcdTxnOpResponse, TxnResponse as EtcdTxnResponse,
};
use super::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse, TxnResponse};
use crate::error::{self, Result};
use crate::rpc::store::{DeleteRangeResponse, PutResponse, RangeResponse};
use crate::rpc::KeyValue;
impl From<Txn> for EtcdTxn {
fn from(txn: Txn) -> Self {
let mut etcd_txn = EtcdTxn::new();
if txn.c_when {
let compares = txn
.req
.compare
.into_iter()
.map(EtcdCompare::from)
.collect::<Vec<_>>();
etcd_txn = etcd_txn.when(compares);
}
if txn.c_then {
let success = txn
.req
.success
.into_iter()
.map(EtcdTxnOp::from)
.collect::<Vec<_>>();
etcd_txn = etcd_txn.and_then(success);
}
if txn.c_else {
let failure = txn
.req
.failure
.into_iter()
.map(EtcdTxnOp::from)
.collect::<Vec<_>>();
etcd_txn = etcd_txn.or_else(failure);
}
etcd_txn
}
}
impl From<Compare> for EtcdCompare {
fn from(cmp: Compare) -> Self {
let etcd_cmp = match cmp.cmp {
CompareOp::Equal => EtcdCompareOp::Equal,
CompareOp::Greater => EtcdCompareOp::Greater,
CompareOp::Less => EtcdCompareOp::Less,
CompareOp::NotEqual => EtcdCompareOp::NotEqual,
};
match cmp.target {
Some(target) => EtcdCompare::value(cmp.key, etcd_cmp, target),
None => EtcdCompare::create_revision(cmp.key, etcd_cmp, 0),
}
}
}
impl From<TxnOp> for EtcdTxnOp {
fn from(op: TxnOp) -> Self {
match op {
TxnOp::Put(key, value) => EtcdTxnOp::put(key, value, None),
TxnOp::Get(key) => EtcdTxnOp::get(key, None),
TxnOp::Delete(key) => EtcdTxnOp::delete(key, None),
}
}
}
impl TryFrom<EtcdTxnOpResponse> for TxnOpResponse {
type Error = error::Error;
fn try_from(op_resp: EtcdTxnOpResponse) -> Result<Self> {
match op_resp {
EtcdTxnOpResponse::Put(mut res) => {
let prev_kv = res.take_prev_key().map(KeyValue::from);
Ok(TxnOpResponse::ResponsePut(PutResponse { prev_kv }))
}
EtcdTxnOpResponse::Get(mut res) => {
let kvs = res.take_kvs().into_iter().map(KeyValue::from).collect();
Ok(TxnOpResponse::ResponseGet(RangeResponse {
kvs,
more: false,
}))
}
EtcdTxnOpResponse::Delete(mut res) => {
let deleted = res.deleted();
let prev_kvs = res
.take_prev_kvs()
.into_iter()
.map(KeyValue::from)
.collect::<Vec<_>>();
Ok(TxnOpResponse::ResponseDelete(DeleteRangeResponse {
deleted,
prev_kvs,
}))
}
EtcdTxnOpResponse::Txn(_) => error::EtcdTxnOpResponseSnafu {
err_msg: "nested txn is not supported",
}
.fail(),
}
}
}
impl TryFrom<EtcdTxnResponse> for TxnResponse {
type Error = error::Error;
fn try_from(resp: EtcdTxnResponse) -> Result<Self> {
let succeeded = resp.succeeded();
let responses = resp
.op_responses()
.into_iter()
.map(TxnOpResponse::try_from)
.collect::<Result<Vec<_>>>()?;
Ok(Self {
succeeded,
responses,
})
}
}