common_meta/kv_backend/txn/
etcd.rs1use etcd_client::{
16 Compare as EtcdCompare, CompareOp as EtcdCompareOp, Txn as EtcdTxn, TxnOp as EtcdTxnOp,
17 TxnOpResponse as EtcdTxnOpResponse, TxnResponse as EtcdTxnResponse,
18};
19
20use crate::error::{self, Result};
21use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse, TxnResponse};
22use crate::rpc::store::{DeleteRangeResponse, PutResponse, RangeResponse};
23use crate::rpc::KeyValue;
24
25impl From<Txn> for EtcdTxn {
26 fn from(txn: Txn) -> Self {
27 let mut etcd_txn = EtcdTxn::new();
28 if txn.c_when {
29 let compares = txn
30 .req
31 .compare
32 .into_iter()
33 .map(EtcdCompare::from)
34 .collect::<Vec<_>>();
35 etcd_txn = etcd_txn.when(compares);
36 }
37 if txn.c_then {
38 let success = txn
39 .req
40 .success
41 .into_iter()
42 .map(EtcdTxnOp::from)
43 .collect::<Vec<_>>();
44 etcd_txn = etcd_txn.and_then(success);
45 }
46 if txn.c_else {
47 let failure = txn
48 .req
49 .failure
50 .into_iter()
51 .map(EtcdTxnOp::from)
52 .collect::<Vec<_>>();
53 etcd_txn = etcd_txn.or_else(failure);
54 }
55 etcd_txn
56 }
57}
58
59impl From<Compare> for EtcdCompare {
60 fn from(cmp: Compare) -> Self {
61 let etcd_cmp = match cmp.cmp {
62 CompareOp::Equal => EtcdCompareOp::Equal,
63 CompareOp::Greater => EtcdCompareOp::Greater,
64 CompareOp::Less => EtcdCompareOp::Less,
65 CompareOp::NotEqual => EtcdCompareOp::NotEqual,
66 };
67 match cmp.target {
68 Some(target) => EtcdCompare::value(cmp.key, etcd_cmp, target),
69 None => EtcdCompare::create_revision(cmp.key, etcd_cmp, 0),
71 }
72 }
73}
74
75impl From<TxnOp> for EtcdTxnOp {
76 fn from(op: TxnOp) -> Self {
77 match op {
78 TxnOp::Put(key, value) => EtcdTxnOp::put(key, value, None),
79 TxnOp::Get(key) => EtcdTxnOp::get(key, None),
80 TxnOp::Delete(key) => EtcdTxnOp::delete(key, None),
81 }
82 }
83}
84
85impl TryFrom<EtcdTxnOpResponse> for TxnOpResponse {
86 type Error = error::Error;
87
88 fn try_from(op_resp: EtcdTxnOpResponse) -> Result<Self> {
89 match op_resp {
90 EtcdTxnOpResponse::Put(mut res) => {
91 let prev_kv = res.take_prev_key().map(KeyValue::from);
92 Ok(TxnOpResponse::ResponsePut(PutResponse { prev_kv }))
93 }
94 EtcdTxnOpResponse::Get(mut res) => {
95 let kvs = res.take_kvs().into_iter().map(KeyValue::from).collect();
96 Ok(TxnOpResponse::ResponseGet(RangeResponse {
97 kvs,
98 more: false,
99 }))
100 }
101 EtcdTxnOpResponse::Delete(mut res) => {
102 let deleted = res.deleted();
103 let prev_kvs = res
104 .take_prev_kvs()
105 .into_iter()
106 .map(KeyValue::from)
107 .collect::<Vec<_>>();
108 Ok(TxnOpResponse::ResponseDelete(DeleteRangeResponse {
109 deleted,
110 prev_kvs,
111 }))
112 }
113 EtcdTxnOpResponse::Txn(_) => error::EtcdTxnOpResponseSnafu {
114 err_msg: "nested txn is not supported",
115 }
116 .fail(),
117 }
118 }
119}
120
121impl TryFrom<EtcdTxnResponse> for TxnResponse {
122 type Error = error::Error;
123
124 fn try_from(resp: EtcdTxnResponse) -> Result<Self> {
125 let succeeded = resp.succeeded();
126 let responses = resp
127 .op_responses()
128 .into_iter()
129 .map(TxnOpResponse::try_from)
130 .collect::<Result<Vec<_>>>()?;
131 Ok(Self {
132 succeeded,
133 responses,
134 })
135 }
136}