common_meta/kv_backend/
txn.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::max;
16
17use common_error::ext::ErrorExt;
18
19use crate::rpc::store::{DeleteRangeResponse, PutResponse, RangeResponse};
20
21mod etcd;
22
23#[async_trait::async_trait]
24pub trait TxnService: Sync + Send {
25    type Error: ErrorExt;
26
27    async fn txn(&self, _txn: Txn) -> Result<TxnResponse, Self::Error> {
28        unimplemented!("txn is not implemented")
29    }
30
31    /// Maximum number of operations permitted in a transaction.
32    fn max_txn_ops(&self) -> usize {
33        unimplemented!("txn is not implemented")
34    }
35}
36
37#[derive(Debug, Clone, PartialEq)]
38pub enum CompareOp {
39    Equal,
40    Greater,
41    Less,
42    NotEqual,
43}
44
45#[derive(Debug, Clone, PartialEq)]
46pub struct Compare {
47    pub key: Vec<u8>,
48    pub cmp: CompareOp,
49    /// None means the key does not exist.
50    pub target: Option<Vec<u8>>,
51}
52
53impl Compare {
54    pub fn new(key: Vec<u8>, cmp: CompareOp, target: Option<Vec<u8>>) -> Self {
55        Self { key, cmp, target }
56    }
57
58    pub fn with_value(key: Vec<u8>, cmp: CompareOp, target: Vec<u8>) -> Self {
59        Self::new(key, cmp, Some(target))
60    }
61
62    pub fn with_value_not_exists(key: Vec<u8>, cmp: CompareOp) -> Self {
63        Self::new(key, cmp, None)
64    }
65
66    pub fn compare_value(&self, value: Option<&Vec<u8>>) -> bool {
67        match (value, &self.target) {
68            (Some(value), Some(target)) => match self.cmp {
69                CompareOp::Equal => *value == *target,
70                CompareOp::Greater => *value > *target,
71                CompareOp::Less => *value < *target,
72                CompareOp::NotEqual => *value != *target,
73            },
74            (Some(_), None) => match self.cmp {
75                CompareOp::Equal => false,
76                CompareOp::Greater => true,
77                CompareOp::Less => false,
78                CompareOp::NotEqual => true,
79            },
80            (None, Some(_)) => match self.cmp {
81                CompareOp::Equal => false,
82                CompareOp::Greater => false,
83                CompareOp::Less => true,
84                CompareOp::NotEqual => true,
85            },
86            (None, None) => match self.cmp {
87                CompareOp::Equal => true,
88                CompareOp::Greater => false,
89                CompareOp::Less => false,
90                CompareOp::NotEqual => false,
91            },
92        }
93    }
94}
95
96#[derive(Debug, Clone, PartialEq)]
97pub enum TxnOp {
98    Put(Vec<u8>, Vec<u8>),
99    Get(Vec<u8>),
100    Delete(Vec<u8>),
101}
102
103#[derive(Debug, Clone, Default, PartialEq)]
104pub struct TxnRequest {
105    pub compare: Vec<Compare>,
106    pub success: Vec<TxnOp>,
107    pub failure: Vec<TxnOp>,
108}
109
110impl TxnRequest {
111    pub fn extend(&mut self, other: TxnRequest) {
112        self.compare.extend(other.compare);
113        self.success.extend(other.success);
114        self.failure.extend(other.failure);
115    }
116}
117
118#[derive(Debug, Clone, PartialEq)]
119pub enum TxnOpResponse {
120    ResponsePut(PutResponse),
121    ResponseGet(RangeResponse),
122    ResponseDelete(DeleteRangeResponse),
123}
124
125pub struct TxnResponse {
126    pub succeeded: bool,
127    pub responses: Vec<TxnOpResponse>,
128}
129
130#[derive(Debug, Clone, Default, PartialEq)]
131pub struct Txn {
132    // HACK - chroot would modify this field
133    pub(super) req: TxnRequest,
134    pub(super) c_when: bool,
135    pub(super) c_then: bool,
136    pub(super) c_else: bool,
137}
138
139#[cfg(any(test, feature = "testing"))]
140impl Txn {
141    pub fn req(&self) -> &TxnRequest {
142        &self.req
143    }
144}
145
146impl Txn {
147    pub fn merge_all<T: IntoIterator<Item = Txn>>(values: T) -> Self {
148        values
149            .into_iter()
150            .reduce(|acc, e| acc.merge(e))
151            .unwrap_or_default()
152    }
153
154    pub fn merge(mut self, other: Txn) -> Self {
155        self.c_when |= other.c_when;
156        self.c_then |= other.c_then;
157        self.c_else |= other.c_else;
158
159        self.req.extend(other.req);
160
161        self
162    }
163
164    pub fn new() -> Self {
165        Txn::default()
166    }
167
168    /// Builds a transaction that puts a value at a key if the key does not exist.
169    pub fn put_if_not_exists(key: Vec<u8>, value: Vec<u8>) -> Self {
170        Self::new()
171            .when(vec![Compare::with_value_not_exists(
172                key.clone(),
173                CompareOp::Equal,
174            )])
175            .and_then(vec![TxnOp::Put(key.clone(), value)])
176            .or_else(vec![TxnOp::Get(key)])
177    }
178
179    /// Builds a transaction that puts a value at a key if the key exists and the value
180    /// is equal to `expect`.
181    pub fn compare_and_put(key: Vec<u8>, expect: Vec<u8>, value: Vec<u8>) -> Self {
182        Self::new()
183            .when(vec![Compare::with_value(
184                key.clone(),
185                CompareOp::Equal,
186                expect,
187            )])
188            .and_then(vec![TxnOp::Put(key.clone(), value)])
189            .or_else(vec![TxnOp::Get(key)])
190    }
191
192    /// Takes a list of comparison. If all comparisons passed in succeed,
193    /// the operations passed into `and_then()` will be executed. Or the operations
194    /// passed into `or_else()` will be executed.
195    #[inline]
196    pub fn when(mut self, compares: impl Into<Vec<Compare>>) -> Self {
197        assert!(!self.c_when, "cannot call `when` twice");
198        assert!(!self.c_then, "cannot call `when` after `and_then`");
199        assert!(!self.c_else, "cannot call `when` after `or_else`");
200
201        self.c_when = true;
202        self.req.compare = compares.into();
203        self
204    }
205
206    /// Takes a list of operations. The operations list will be executed, if the
207    /// comparisons passed into `when()` succeed.
208    #[inline]
209    pub fn and_then(mut self, operations: impl Into<Vec<TxnOp>>) -> Self {
210        assert!(!self.c_then, "cannot call `and_then` twice");
211        assert!(!self.c_else, "cannot call `and_then` after `or_else`");
212
213        self.c_then = true;
214        self.req.success = operations.into();
215        self
216    }
217
218    /// Takes a list of operations. The operations list will be executed, if the
219    /// comparisons passed into `when()` fail.
220    #[inline]
221    pub fn or_else(mut self, operations: impl Into<Vec<TxnOp>>) -> Self {
222        assert!(!self.c_else, "cannot call `or_else` twice");
223
224        self.c_else = true;
225        self.req.failure = operations.into();
226        self
227    }
228
229    #[inline]
230    pub fn max_operations(&self) -> usize {
231        let opc = max(self.req.compare.len(), self.req.success.len());
232        max(opc, self.req.failure.len())
233    }
234}
235
236impl From<Txn> for TxnRequest {
237    fn from(txn: Txn) -> Self {
238        txn.req
239    }
240}
241
242#[cfg(test)]
243mod tests {
244    use super::*;
245
246    #[test]
247    fn test_compare() {
248        // Equal
249        let compare = Compare::with_value(vec![1], CompareOp::Equal, vec![1]);
250        assert!(compare.compare_value(Some(&vec![1])));
251        assert!(!compare.compare_value(None));
252        let compare = Compare::with_value_not_exists(vec![1], CompareOp::Equal);
253        assert!(compare.compare_value(None));
254
255        // Greater
256        let compare = Compare::with_value(vec![1], CompareOp::Greater, vec![1]);
257        assert!(compare.compare_value(Some(&vec![2])));
258        assert!(!compare.compare_value(None));
259        let compare = Compare::with_value_not_exists(vec![1], CompareOp::Greater);
260        assert!(!compare.compare_value(None));
261        assert!(compare.compare_value(Some(&vec![1])));
262
263        // Less
264        let compare = Compare::with_value(vec![1], CompareOp::Less, vec![1]);
265        assert!(compare.compare_value(Some(&vec![0])));
266        assert!(compare.compare_value(None));
267        let compare = Compare::with_value_not_exists(vec![1], CompareOp::Less);
268        assert!(!compare.compare_value(None));
269        assert!(!compare.compare_value(Some(&vec![1])));
270
271        // NotEqual
272        let compare = Compare::with_value(vec![1], CompareOp::NotEqual, vec![1]);
273        assert!(!compare.compare_value(Some(&vec![1])));
274        assert!(compare.compare_value(Some(&vec![2])));
275        assert!(compare.compare_value(None));
276        let compare = Compare::with_value_not_exists(vec![1], CompareOp::NotEqual);
277        assert!(!compare.compare_value(None));
278        assert!(compare.compare_value(Some(&vec![1])));
279    }
280
281    #[test]
282    fn test_txn() {
283        let txn = Txn::new()
284            .when(vec![Compare::with_value(
285                vec![1],
286                CompareOp::Equal,
287                vec![1],
288            )])
289            .and_then(vec![TxnOp::Put(vec![1], vec![1])])
290            .or_else(vec![TxnOp::Put(vec![1], vec![2])]);
291
292        assert_eq!(
293            txn,
294            Txn {
295                req: TxnRequest {
296                    compare: vec![Compare::with_value(vec![1], CompareOp::Equal, vec![1])],
297                    success: vec![TxnOp::Put(vec![1], vec![1])],
298                    failure: vec![TxnOp::Put(vec![1], vec![2])],
299                },
300                c_when: true,
301                c_then: true,
302                c_else: true,
303            }
304        );
305    }
306}