common_meta/kv_backend/txn/
etcd.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use etcd_client::{
16    Compare as EtcdCompare, CompareOp as EtcdCompareOp, Txn as EtcdTxn, TxnOp as EtcdTxnOp,
17    TxnOpResponse as EtcdTxnOpResponse, TxnResponse as EtcdTxnResponse,
18};
19
20use crate::error::{self, Result};
21use crate::kv_backend::txn::{Compare, CompareOp, Txn, TxnOp, TxnOpResponse, TxnResponse};
22use crate::rpc::store::{DeleteRangeResponse, PutResponse, RangeResponse};
23use crate::rpc::KeyValue;
24
25impl From<Txn> for EtcdTxn {
26    fn from(txn: Txn) -> Self {
27        let mut etcd_txn = EtcdTxn::new();
28        if txn.c_when {
29            let compares = txn
30                .req
31                .compare
32                .into_iter()
33                .map(EtcdCompare::from)
34                .collect::<Vec<_>>();
35            etcd_txn = etcd_txn.when(compares);
36        }
37        if txn.c_then {
38            let success = txn
39                .req
40                .success
41                .into_iter()
42                .map(EtcdTxnOp::from)
43                .collect::<Vec<_>>();
44            etcd_txn = etcd_txn.and_then(success);
45        }
46        if txn.c_else {
47            let failure = txn
48                .req
49                .failure
50                .into_iter()
51                .map(EtcdTxnOp::from)
52                .collect::<Vec<_>>();
53            etcd_txn = etcd_txn.or_else(failure);
54        }
55        etcd_txn
56    }
57}
58
59impl From<Compare> for EtcdCompare {
60    fn from(cmp: Compare) -> Self {
61        let etcd_cmp = match cmp.cmp {
62            CompareOp::Equal => EtcdCompareOp::Equal,
63            CompareOp::Greater => EtcdCompareOp::Greater,
64            CompareOp::Less => EtcdCompareOp::Less,
65            CompareOp::NotEqual => EtcdCompareOp::NotEqual,
66        };
67        match cmp.target {
68            Some(target) => EtcdCompare::value(cmp.key, etcd_cmp, target),
69            // create revision 0 means key does not exist
70            None => EtcdCompare::create_revision(cmp.key, etcd_cmp, 0),
71        }
72    }
73}
74
75impl From<TxnOp> for EtcdTxnOp {
76    fn from(op: TxnOp) -> Self {
77        match op {
78            TxnOp::Put(key, value) => EtcdTxnOp::put(key, value, None),
79            TxnOp::Get(key) => EtcdTxnOp::get(key, None),
80            TxnOp::Delete(key) => EtcdTxnOp::delete(key, None),
81        }
82    }
83}
84
85impl TryFrom<EtcdTxnOpResponse> for TxnOpResponse {
86    type Error = error::Error;
87
88    fn try_from(op_resp: EtcdTxnOpResponse) -> Result<Self> {
89        match op_resp {
90            EtcdTxnOpResponse::Put(mut res) => {
91                let prev_kv = res.take_prev_key().map(KeyValue::from);
92                Ok(TxnOpResponse::ResponsePut(PutResponse { prev_kv }))
93            }
94            EtcdTxnOpResponse::Get(mut res) => {
95                let kvs = res.take_kvs().into_iter().map(KeyValue::from).collect();
96                Ok(TxnOpResponse::ResponseGet(RangeResponse {
97                    kvs,
98                    more: false,
99                }))
100            }
101            EtcdTxnOpResponse::Delete(mut res) => {
102                let deleted = res.deleted();
103                let prev_kvs = res
104                    .take_prev_kvs()
105                    .into_iter()
106                    .map(KeyValue::from)
107                    .collect::<Vec<_>>();
108                Ok(TxnOpResponse::ResponseDelete(DeleteRangeResponse {
109                    deleted,
110                    prev_kvs,
111                }))
112            }
113            EtcdTxnOpResponse::Txn(_) => error::EtcdTxnOpResponseSnafu {
114                err_msg: "nested txn is not supported",
115            }
116            .fail(),
117        }
118    }
119}
120
121impl TryFrom<EtcdTxnResponse> for TxnResponse {
122    type Error = error::Error;
123
124    fn try_from(resp: EtcdTxnResponse) -> Result<Self> {
125        let succeeded = resp.succeeded();
126        let responses = resp
127            .op_responses()
128            .into_iter()
129            .map(TxnOpResponse::try_from)
130            .collect::<Result<Vec<_>>>()?;
131        Ok(Self {
132            succeeded,
133            responses,
134        })
135    }
136}