1use std::cmp::max;
16
17use common_error::ext::ErrorExt;
18
19use crate::rpc::store::{DeleteRangeResponse, PutResponse, RangeResponse};
20
21mod etcd;
22
23#[async_trait::async_trait]
24pub trait TxnService: Sync + Send {
25 type Error: ErrorExt;
26
27 async fn txn(&self, _txn: Txn) -> Result<TxnResponse, Self::Error> {
28 unimplemented!("txn is not implemented")
29 }
30
31 fn max_txn_ops(&self) -> usize {
33 unimplemented!("txn is not implemented")
34 }
35}
36
37#[derive(Debug, Clone, PartialEq)]
38pub enum CompareOp {
39 Equal,
40 Greater,
41 Less,
42 NotEqual,
43}
44
45#[derive(Debug, Clone, PartialEq)]
46pub struct Compare {
47 pub key: Vec<u8>,
48 pub cmp: CompareOp,
49 pub target: Option<Vec<u8>>,
51}
52
53impl Compare {
54 pub fn new(key: Vec<u8>, cmp: CompareOp, target: Option<Vec<u8>>) -> Self {
55 Self { key, cmp, target }
56 }
57
58 pub fn with_value(key: Vec<u8>, cmp: CompareOp, target: Vec<u8>) -> Self {
59 Self::new(key, cmp, Some(target))
60 }
61
62 pub fn with_value_not_exists(key: Vec<u8>, cmp: CompareOp) -> Self {
63 Self::new(key, cmp, None)
64 }
65
66 pub fn compare_value(&self, value: Option<&Vec<u8>>) -> bool {
67 match (value, &self.target) {
68 (Some(value), Some(target)) => match self.cmp {
69 CompareOp::Equal => *value == *target,
70 CompareOp::Greater => *value > *target,
71 CompareOp::Less => *value < *target,
72 CompareOp::NotEqual => *value != *target,
73 },
74 (Some(_), None) => match self.cmp {
75 CompareOp::Equal => false,
76 CompareOp::Greater => true,
77 CompareOp::Less => false,
78 CompareOp::NotEqual => true,
79 },
80 (None, Some(_)) => match self.cmp {
81 CompareOp::Equal => false,
82 CompareOp::Greater => false,
83 CompareOp::Less => true,
84 CompareOp::NotEqual => true,
85 },
86 (None, None) => match self.cmp {
87 CompareOp::Equal => true,
88 CompareOp::Greater => false,
89 CompareOp::Less => false,
90 CompareOp::NotEqual => false,
91 },
92 }
93 }
94}
95
96#[derive(Debug, Clone, PartialEq)]
97pub enum TxnOp {
98 Put(Vec<u8>, Vec<u8>),
99 Get(Vec<u8>),
100 Delete(Vec<u8>),
101}
102
103#[derive(Debug, Clone, Default, PartialEq)]
104pub struct TxnRequest {
105 pub compare: Vec<Compare>,
106 pub success: Vec<TxnOp>,
107 pub failure: Vec<TxnOp>,
108}
109
110impl TxnRequest {
111 pub fn extend(&mut self, other: TxnRequest) {
112 self.compare.extend(other.compare);
113 self.success.extend(other.success);
114 self.failure.extend(other.failure);
115 }
116}
117
118#[derive(Debug, Clone, PartialEq)]
119pub enum TxnOpResponse {
120 ResponsePut(PutResponse),
121 ResponseGet(RangeResponse),
122 ResponseDelete(DeleteRangeResponse),
123}
124
125pub struct TxnResponse {
126 pub succeeded: bool,
127 pub responses: Vec<TxnOpResponse>,
128}
129
130#[derive(Debug, Clone, Default, PartialEq)]
131pub struct Txn {
132 pub(super) req: TxnRequest,
134 pub(super) c_when: bool,
135 pub(super) c_then: bool,
136 pub(super) c_else: bool,
137}
138
139#[cfg(any(test, feature = "testing"))]
140impl Txn {
141 pub fn req(&self) -> &TxnRequest {
142 &self.req
143 }
144}
145
146impl Txn {
147 pub fn merge_all<T: IntoIterator<Item = Txn>>(values: T) -> Self {
148 values
149 .into_iter()
150 .reduce(|acc, e| acc.merge(e))
151 .unwrap_or_default()
152 }
153
154 pub fn merge(mut self, other: Txn) -> Self {
155 self.c_when |= other.c_when;
156 self.c_then |= other.c_then;
157 self.c_else |= other.c_else;
158
159 self.req.extend(other.req);
160
161 self
162 }
163
164 pub fn new() -> Self {
165 Txn::default()
166 }
167
168 pub fn put_if_not_exists(key: Vec<u8>, value: Vec<u8>) -> Self {
170 Self::new()
171 .when(vec![Compare::with_value_not_exists(
172 key.clone(),
173 CompareOp::Equal,
174 )])
175 .and_then(vec![TxnOp::Put(key.clone(), value)])
176 .or_else(vec![TxnOp::Get(key)])
177 }
178
179 pub fn compare_and_put(key: Vec<u8>, expect: Vec<u8>, value: Vec<u8>) -> Self {
182 Self::new()
183 .when(vec![Compare::with_value(
184 key.clone(),
185 CompareOp::Equal,
186 expect,
187 )])
188 .and_then(vec![TxnOp::Put(key.clone(), value)])
189 .or_else(vec![TxnOp::Get(key)])
190 }
191
192 #[inline]
196 pub fn when(mut self, compares: impl Into<Vec<Compare>>) -> Self {
197 assert!(!self.c_when, "cannot call `when` twice");
198 assert!(!self.c_then, "cannot call `when` after `and_then`");
199 assert!(!self.c_else, "cannot call `when` after `or_else`");
200
201 self.c_when = true;
202 self.req.compare = compares.into();
203 self
204 }
205
206 #[inline]
209 pub fn and_then(mut self, operations: impl Into<Vec<TxnOp>>) -> Self {
210 assert!(!self.c_then, "cannot call `and_then` twice");
211 assert!(!self.c_else, "cannot call `and_then` after `or_else`");
212
213 self.c_then = true;
214 self.req.success = operations.into();
215 self
216 }
217
218 #[inline]
221 pub fn or_else(mut self, operations: impl Into<Vec<TxnOp>>) -> Self {
222 assert!(!self.c_else, "cannot call `or_else` twice");
223
224 self.c_else = true;
225 self.req.failure = operations.into();
226 self
227 }
228
229 #[inline]
230 pub fn max_operations(&self) -> usize {
231 let opc = max(self.req.compare.len(), self.req.success.len());
232 max(opc, self.req.failure.len())
233 }
234}
235
236impl From<Txn> for TxnRequest {
237 fn from(txn: Txn) -> Self {
238 txn.req
239 }
240}
241
242#[cfg(test)]
243mod tests {
244 use super::*;
245
246 #[test]
247 fn test_compare() {
248 let compare = Compare::with_value(vec![1], CompareOp::Equal, vec![1]);
250 assert!(compare.compare_value(Some(&vec![1])));
251 assert!(!compare.compare_value(None));
252 let compare = Compare::with_value_not_exists(vec![1], CompareOp::Equal);
253 assert!(compare.compare_value(None));
254
255 let compare = Compare::with_value(vec![1], CompareOp::Greater, vec![1]);
257 assert!(compare.compare_value(Some(&vec![2])));
258 assert!(!compare.compare_value(None));
259 let compare = Compare::with_value_not_exists(vec![1], CompareOp::Greater);
260 assert!(!compare.compare_value(None));
261 assert!(compare.compare_value(Some(&vec![1])));
262
263 let compare = Compare::with_value(vec![1], CompareOp::Less, vec![1]);
265 assert!(compare.compare_value(Some(&vec![0])));
266 assert!(compare.compare_value(None));
267 let compare = Compare::with_value_not_exists(vec![1], CompareOp::Less);
268 assert!(!compare.compare_value(None));
269 assert!(!compare.compare_value(Some(&vec![1])));
270
271 let compare = Compare::with_value(vec![1], CompareOp::NotEqual, vec![1]);
273 assert!(!compare.compare_value(Some(&vec![1])));
274 assert!(compare.compare_value(Some(&vec![2])));
275 assert!(compare.compare_value(None));
276 let compare = Compare::with_value_not_exists(vec![1], CompareOp::NotEqual);
277 assert!(!compare.compare_value(None));
278 assert!(compare.compare_value(Some(&vec![1])));
279 }
280
281 #[test]
282 fn test_txn() {
283 let txn = Txn::new()
284 .when(vec![Compare::with_value(
285 vec![1],
286 CompareOp::Equal,
287 vec![1],
288 )])
289 .and_then(vec![TxnOp::Put(vec![1], vec![1])])
290 .or_else(vec![TxnOp::Put(vec![1], vec![2])]);
291
292 assert_eq!(
293 txn,
294 Txn {
295 req: TxnRequest {
296 compare: vec![Compare::with_value(vec![1], CompareOp::Equal, vec![1])],
297 success: vec![TxnOp::Put(vec![1], vec![1])],
298 failure: vec![TxnOp::Put(vec![1], vec![2])],
299 },
300 c_when: true,
301 c_then: true,
302 c_else: true,
303 }
304 );
305 }
306}