common_meta/kv_backend/
test.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::atomic::{AtomicU8, Ordering};
16use std::sync::Arc;
17
18use txn::{Compare, CompareOp, TxnOp};
19
20use super::{KvBackend, *};
21use crate::error::Error;
22use crate::rpc::store::{BatchGetRequest, PutRequest};
23use crate::rpc::KeyValue;
24use crate::util;
25
26pub fn mock_kvs(prefix: Vec<u8>) -> Vec<KeyValue> {
27    vec![
28        KeyValue {
29            key: [prefix.clone(), b"key1".to_vec()].concat(),
30            value: b"val1".to_vec(),
31        },
32        KeyValue {
33            key: [prefix.clone(), b"key2".to_vec()].concat(),
34            value: b"val2".to_vec(),
35        },
36        KeyValue {
37            key: [prefix.clone(), b"key3".to_vec()].concat(),
38            value: b"val3".to_vec(),
39        },
40        KeyValue {
41            key: [prefix.clone(), b"key11".to_vec()].concat(),
42            value: b"val11".to_vec(),
43        },
44    ]
45}
46
47pub async fn prepare_kv(kv_backend: &impl KvBackend) {
48    prepare_kv_with_prefix(kv_backend, vec![]).await;
49}
50
51pub async fn prepare_kv_with_prefix(kv_backend: &impl KvBackend, prefix: Vec<u8>) {
52    let kvs = mock_kvs(prefix);
53    assert!(kv_backend
54        .batch_put(BatchPutRequest {
55            kvs,
56            ..Default::default()
57        })
58        .await
59        .is_ok());
60}
61
62pub async fn unprepare_kv(kv_backend: &impl KvBackend, prefix: &[u8]) {
63    let range_end = util::get_prefix_end_key(prefix);
64    assert!(
65        kv_backend
66            .delete_range(DeleteRangeRequest {
67                key: prefix.to_vec(),
68                range_end,
69                ..Default::default()
70            })
71            .await
72            .is_ok(),
73        "prefix: {:?}",
74        std::str::from_utf8(prefix).unwrap()
75    );
76}
77
78pub async fn test_kv_put(kv_backend: &impl KvBackend) {
79    test_kv_put_with_prefix(kv_backend, vec![]).await;
80}
81
82pub async fn test_kv_put_with_prefix(kv_backend: &impl KvBackend, prefix: Vec<u8>) {
83    let put_key = [prefix.clone(), b"key11".to_vec()].concat();
84    let resp = kv_backend
85        .put(PutRequest {
86            key: put_key.clone(),
87            value: b"val12".to_vec(),
88            prev_kv: false,
89        })
90        .await
91        .unwrap();
92    assert!(resp.prev_kv.is_none());
93
94    let resp = kv_backend
95        .put(PutRequest {
96            key: put_key.clone(),
97            value: b"val13".to_vec(),
98            prev_kv: true,
99        })
100        .await
101        .unwrap();
102    let prev_kv = resp.prev_kv.unwrap();
103    assert_eq!(put_key, prev_kv.key());
104    assert_eq!(b"val12", prev_kv.value());
105}
106
107pub async fn test_kv_range(kv_backend: &impl KvBackend) {
108    test_kv_range_with_prefix(kv_backend, vec![]).await;
109}
110
111pub async fn test_kv_range_with_prefix(kv_backend: &impl KvBackend, prefix: Vec<u8>) {
112    let key = [prefix.clone(), b"key1".to_vec()].concat();
113    let key11 = [prefix.clone(), b"key11".to_vec()].concat();
114    let range_end = util::get_prefix_end_key(&key);
115
116    let resp = kv_backend
117        .range(RangeRequest {
118            key: key.clone(),
119            range_end: range_end.clone(),
120            limit: 0,
121            keys_only: false,
122        })
123        .await
124        .unwrap();
125
126    assert_eq!(2, resp.kvs.len());
127    assert_eq!(key, resp.kvs[0].key);
128    assert_eq!(b"val1", resp.kvs[0].value());
129    assert_eq!(key11, resp.kvs[1].key);
130    assert_eq!(b"val11", resp.kvs[1].value());
131
132    let resp = kv_backend
133        .range(RangeRequest {
134            key: key.clone(),
135            range_end: range_end.clone(),
136            limit: 0,
137            keys_only: true,
138        })
139        .await
140        .unwrap();
141
142    assert_eq!(2, resp.kvs.len());
143    assert_eq!(key, resp.kvs[0].key);
144    assert_eq!(b"", resp.kvs[0].value());
145    assert_eq!(key11, resp.kvs[1].key);
146    assert_eq!(b"", resp.kvs[1].value());
147
148    let resp = kv_backend
149        .range(RangeRequest {
150            key: key.clone(),
151            limit: 0,
152            keys_only: false,
153            ..Default::default()
154        })
155        .await
156        .unwrap();
157
158    assert_eq!(1, resp.kvs.len());
159    assert_eq!(key, resp.kvs[0].key);
160    assert_eq!(b"val1", resp.kvs[0].value());
161
162    let resp = kv_backend
163        .range(RangeRequest {
164            key: key.clone(),
165            range_end,
166            limit: 1,
167            keys_only: false,
168        })
169        .await
170        .unwrap();
171
172    assert_eq!(1, resp.kvs.len());
173    assert_eq!(key, resp.kvs[0].key);
174    assert_eq!(b"val1", resp.kvs[0].value());
175}
176
177pub async fn test_kv_range_2(kv_backend: &impl KvBackend) {
178    test_kv_range_2_with_prefix(kv_backend, vec![]).await;
179}
180
181pub async fn test_kv_range_2_with_prefix(kv_backend: &impl KvBackend, prefix: Vec<u8>) {
182    let atest = [prefix.clone(), b"atest".to_vec()].concat();
183    let test = [prefix.clone(), b"test".to_vec()].concat();
184
185    kv_backend
186        .put(
187            PutRequest::new()
188                .with_key(atest.clone())
189                .with_value("value"),
190        )
191        .await
192        .unwrap();
193
194    kv_backend
195        .put(PutRequest::new().with_key(test.clone()).with_value("value"))
196        .await
197        .unwrap();
198
199    // If both key and range_end are ‘\0’, then range represents all keys.
200    let all_start = [prefix.clone(), b"\0".to_vec()].concat();
201    let all_end = if prefix.is_empty() {
202        b"\0".to_vec()
203    } else {
204        util::get_prefix_end_key(&prefix)
205    };
206    let result = kv_backend
207        .range(RangeRequest::new().with_range(all_start, all_end.clone()))
208        .await
209        .unwrap();
210
211    assert_eq!(result.kvs.len(), 2);
212    assert!(!result.more);
213
214    // If range_end is ‘\0’, the range is all keys greater than or equal to the key argument.
215    let a_start = [prefix.clone(), b"a".to_vec()].concat();
216    let result = kv_backend
217        .range(RangeRequest::new().with_range(a_start.clone(), all_end.clone()))
218        .await
219        .unwrap();
220
221    assert_eq!(result.kvs.len(), 2);
222
223    let b_start = [prefix.clone(), b"b".to_vec()].concat();
224    let result = kv_backend
225        .range(RangeRequest::new().with_range(b_start, all_end.clone()))
226        .await
227        .unwrap();
228
229    assert_eq!(result.kvs.len(), 1);
230    assert_eq!(result.kvs[0].key, test);
231
232    // Fetches the keys >= "a", set limit to 1, the `more` should be true.
233    let result = kv_backend
234        .range(
235            RangeRequest::new()
236                .with_range(a_start.clone(), all_end.clone())
237                .with_limit(1),
238        )
239        .await
240        .unwrap();
241    assert_eq!(result.kvs.len(), 1);
242    assert!(result.more);
243
244    // Fetches the keys >= "a", set limit to 2, the `more` should be false.
245    let result = kv_backend
246        .range(
247            RangeRequest::new()
248                .with_range(a_start.clone(), all_end.clone())
249                .with_limit(2),
250        )
251        .await
252        .unwrap();
253    assert_eq!(result.kvs.len(), 2);
254    assert!(!result.more);
255
256    // Fetches the keys >= "a", set limit to 3, the `more` should be false.
257    let result = kv_backend
258        .range(
259            RangeRequest::new()
260                .with_range(a_start.clone(), all_end.clone())
261                .with_limit(3),
262        )
263        .await
264        .unwrap();
265    assert_eq!(result.kvs.len(), 2);
266    assert!(!result.more);
267
268    let req = BatchDeleteRequest {
269        keys: vec![atest, test],
270        prev_kv: false,
271    };
272    let resp = kv_backend.batch_delete(req).await.unwrap();
273    assert!(resp.prev_kvs.is_empty());
274}
275
276pub async fn test_kv_batch_get(kv_backend: &impl KvBackend) {
277    test_kv_batch_get_with_prefix(kv_backend, vec![]).await;
278}
279
280pub async fn test_kv_batch_get_with_prefix(kv_backend: &impl KvBackend, prefix: Vec<u8>) {
281    let keys = vec![];
282    let resp = kv_backend
283        .batch_get(BatchGetRequest { keys })
284        .await
285        .unwrap();
286
287    assert!(resp.kvs.is_empty());
288
289    let key10 = [prefix.clone(), b"key10".to_vec()].concat();
290    let keys = vec![key10];
291    let resp = kv_backend
292        .batch_get(BatchGetRequest { keys })
293        .await
294        .unwrap();
295
296    assert!(resp.kvs.is_empty());
297
298    let key1 = [prefix.clone(), b"key1".to_vec()].concat();
299    let key3 = [prefix.clone(), b"key3".to_vec()].concat();
300    let key4 = [prefix.clone(), b"key4".to_vec()].concat();
301    let keys = vec![key1.clone(), key3.clone(), key4];
302    let resp = kv_backend
303        .batch_get(BatchGetRequest { keys })
304        .await
305        .unwrap();
306
307    assert_eq!(2, resp.kvs.len());
308    assert_eq!(key1, resp.kvs[0].key);
309    assert_eq!(b"val1", resp.kvs[0].value());
310    assert_eq!(key3, resp.kvs[1].key);
311    assert_eq!(b"val3", resp.kvs[1].value());
312}
313
314pub async fn test_kv_compare_and_put(kv_backend: Arc<dyn KvBackend<Error = Error>>) {
315    test_kv_compare_and_put_with_prefix(kv_backend, vec![]).await;
316}
317
318pub async fn test_kv_compare_and_put_with_prefix(
319    kv_backend: Arc<dyn KvBackend<Error = Error>>,
320    prefix: Vec<u8>,
321) {
322    let success = Arc::new(AtomicU8::new(0));
323    let key = [prefix.clone(), b"key".to_vec()].concat();
324
325    let mut joins = vec![];
326    for _ in 0..20 {
327        let kv_backend_clone = kv_backend.clone();
328        let success_clone = success.clone();
329        let key_clone = key.clone();
330
331        let join = tokio::spawn(async move {
332            let req = CompareAndPutRequest {
333                key: key_clone,
334                expect: vec![],
335                value: b"val_new".to_vec(),
336            };
337            let resp = kv_backend_clone.compare_and_put(req).await.unwrap();
338            if resp.success {
339                success_clone.fetch_add(1, Ordering::SeqCst);
340            }
341        });
342        joins.push(join);
343    }
344
345    for join in joins {
346        join.await.unwrap();
347    }
348
349    assert_eq!(1, success.load(Ordering::SeqCst));
350
351    let resp = kv_backend.delete(&key, false).await.unwrap();
352    assert!(resp.is_none());
353}
354
355pub async fn test_kv_delete_range(kv_backend: &impl KvBackend) {
356    test_kv_delete_range_with_prefix(kv_backend, vec![]).await;
357}
358
359pub async fn test_kv_delete_range_with_prefix(kv_backend: &impl KvBackend, prefix: Vec<u8>) {
360    let key3 = [prefix.clone(), b"key3".to_vec()].concat();
361    let req = DeleteRangeRequest {
362        key: key3.clone(),
363        range_end: vec![],
364        prev_kv: true,
365    };
366
367    let resp = kv_backend.delete_range(req).await.unwrap();
368    assert_eq!(1, resp.prev_kvs.len());
369    assert_eq!(1, resp.deleted);
370    assert_eq!(key3, resp.prev_kvs[0].key);
371    assert_eq!(b"val3", resp.prev_kvs[0].value());
372
373    let resp = kv_backend.get(&key3).await.unwrap();
374    assert!(resp.is_none());
375
376    let key2 = [prefix.clone(), b"key2".to_vec()].concat();
377    let req = DeleteRangeRequest {
378        key: key2.clone(),
379        range_end: vec![],
380        prev_kv: false,
381    };
382
383    let resp = kv_backend.delete_range(req).await.unwrap();
384    assert_eq!(1, resp.deleted);
385    assert!(resp.prev_kvs.is_empty());
386
387    let resp = kv_backend.get(&key2).await.unwrap();
388    assert!(resp.is_none());
389
390    let key = [prefix.clone(), b"key1".to_vec()].concat();
391    let range_end = util::get_prefix_end_key(&key);
392
393    let req = DeleteRangeRequest {
394        key: key.clone(),
395        range_end: range_end.clone(),
396        prev_kv: true,
397    };
398    let resp = kv_backend.delete_range(req).await.unwrap();
399    assert_eq!(2, resp.prev_kvs.len());
400
401    let req = RangeRequest {
402        key,
403        range_end,
404        ..Default::default()
405    };
406    let resp = kv_backend.range(req).await.unwrap();
407    assert!(resp.kvs.is_empty());
408}
409
410pub async fn test_kv_batch_delete(kv_backend: &impl KvBackend) {
411    test_kv_batch_delete_with_prefix(kv_backend, vec![]).await;
412}
413
414pub async fn test_kv_batch_delete_with_prefix(kv_backend: &impl KvBackend, prefix: Vec<u8>) {
415    let key1 = [prefix.clone(), b"key1".to_vec()].concat();
416    let key100 = [prefix.clone(), b"key100".to_vec()].concat();
417    assert!(kv_backend.get(&key1).await.unwrap().is_some());
418    assert!(kv_backend.get(&key100).await.unwrap().is_none());
419
420    let req = BatchDeleteRequest {
421        keys: vec![key1.clone(), key100.clone()],
422        prev_kv: true,
423    };
424    let resp = kv_backend.batch_delete(req).await.unwrap();
425    assert_eq!(1, resp.prev_kvs.len());
426    assert_eq!(
427        vec![KeyValue {
428            key: key1.clone(),
429            value: b"val1".to_vec()
430        }],
431        resp.prev_kvs
432    );
433    assert!(kv_backend.get(&key1).await.unwrap().is_none());
434
435    let key2 = [prefix.clone(), b"key2".to_vec()].concat();
436    let key3 = [prefix.clone(), b"key3".to_vec()].concat();
437    let key11 = [prefix.clone(), b"key11".to_vec()].concat();
438    assert!(kv_backend.get(&key2).await.unwrap().is_some());
439    assert!(kv_backend.get(&key3).await.unwrap().is_some());
440    assert!(kv_backend.get(&key11).await.unwrap().is_some());
441
442    let req = BatchDeleteRequest {
443        keys: vec![key2.clone(), key3.clone(), key11.clone()],
444        prev_kv: false,
445    };
446    let resp = kv_backend.batch_delete(req).await.unwrap();
447    assert!(resp.prev_kvs.is_empty());
448
449    assert!(kv_backend.get(&key2).await.unwrap().is_none());
450    assert!(kv_backend.get(&key3).await.unwrap().is_none());
451    assert!(kv_backend.get(&key11).await.unwrap().is_none());
452}
453
454pub async fn test_txn_one_compare_op(kv_backend: &impl KvBackend) {
455    let _ = kv_backend
456        .put(PutRequest {
457            key: vec![11],
458            value: vec![3],
459            ..Default::default()
460        })
461        .await
462        .unwrap();
463
464    let txn = Txn::new()
465        .when(vec![Compare::with_value(
466            vec![11],
467            CompareOp::Greater,
468            vec![1],
469        )])
470        .and_then(vec![TxnOp::Put(vec![11], vec![1])])
471        .or_else(vec![TxnOp::Put(vec![11], vec![2])]);
472
473    let txn_response = kv_backend.txn(txn).await.unwrap();
474
475    assert!(txn_response.succeeded);
476    assert_eq!(txn_response.responses.len(), 1);
477}
478
479pub async fn text_txn_multi_compare_op(kv_backend: &impl KvBackend) {
480    for i in 1..3 {
481        let _ = kv_backend
482            .put(PutRequest {
483                key: vec![i],
484                value: vec![i],
485                ..Default::default()
486            })
487            .await
488            .unwrap();
489    }
490
491    let when: Vec<_> = (1..3u8)
492        .map(|i| Compare::with_value(vec![i], CompareOp::Equal, vec![i]))
493        .collect();
494
495    let txn = Txn::new()
496        .when(when)
497        .and_then(vec![
498            TxnOp::Put(vec![1], vec![10]),
499            TxnOp::Put(vec![2], vec![20]),
500        ])
501        .or_else(vec![TxnOp::Put(vec![1], vec![11])]);
502
503    let txn_response = kv_backend.txn(txn).await.unwrap();
504
505    assert!(txn_response.succeeded);
506    assert_eq!(txn_response.responses.len(), 2);
507}
508
509pub async fn test_txn_compare_equal(kv_backend: &impl KvBackend) {
510    let key = vec![101u8];
511    kv_backend.delete(&key, false).await.unwrap();
512
513    let txn = Txn::new()
514        .when(vec![Compare::with_value_not_exists(
515            key.clone(),
516            CompareOp::Equal,
517        )])
518        .and_then(vec![TxnOp::Put(key.clone(), vec![1])])
519        .or_else(vec![TxnOp::Put(key.clone(), vec![2])]);
520    let txn_response = kv_backend.txn(txn.clone()).await.unwrap();
521    assert!(txn_response.succeeded);
522
523    let txn_response = kv_backend.txn(txn).await.unwrap();
524    assert!(!txn_response.succeeded);
525
526    let txn = Txn::new()
527        .when(vec![Compare::with_value(
528            key.clone(),
529            CompareOp::Equal,
530            vec![2],
531        )])
532        .and_then(vec![TxnOp::Put(key.clone(), vec![3])])
533        .or_else(vec![TxnOp::Put(key, vec![4])]);
534    let txn_response = kv_backend.txn(txn).await.unwrap();
535    assert!(txn_response.succeeded);
536}
537
538pub async fn test_txn_compare_greater(kv_backend: &impl KvBackend) {
539    let key = vec![102u8];
540    kv_backend.delete(&key, false).await.unwrap();
541
542    let txn = Txn::new()
543        .when(vec![Compare::with_value_not_exists(
544            key.clone(),
545            CompareOp::Greater,
546        )])
547        .and_then(vec![TxnOp::Put(key.clone(), vec![1])])
548        .or_else(vec![TxnOp::Put(key.clone(), vec![2])]);
549    let txn_response = kv_backend.txn(txn.clone()).await.unwrap();
550    assert!(!txn_response.succeeded);
551
552    let txn_response = kv_backend.txn(txn).await.unwrap();
553    assert!(txn_response.succeeded);
554
555    let txn = Txn::new()
556        .when(vec![Compare::with_value(
557            key.clone(),
558            CompareOp::Greater,
559            vec![1],
560        )])
561        .and_then(vec![TxnOp::Put(key.clone(), vec![3])])
562        .or_else(vec![TxnOp::Get(key.clone())]);
563    let mut txn_response = kv_backend.txn(txn).await.unwrap();
564    assert!(!txn_response.succeeded);
565    let res = txn_response.responses.pop().unwrap();
566    assert_eq!(
567        res,
568        TxnOpResponse::ResponseGet(RangeResponse {
569            kvs: vec![KeyValue {
570                key,
571                value: vec![1]
572            }],
573            more: false,
574        })
575    );
576}
577
578pub async fn test_txn_compare_less(kv_backend: &impl KvBackend) {
579    let key = vec![103u8];
580    kv_backend.delete(&[3], false).await.unwrap();
581
582    let txn = Txn::new()
583        .when(vec![Compare::with_value_not_exists(
584            key.clone(),
585            CompareOp::Less,
586        )])
587        .and_then(vec![TxnOp::Put(key.clone(), vec![1])])
588        .or_else(vec![TxnOp::Put(key.clone(), vec![2])]);
589    let txn_response = kv_backend.txn(txn.clone()).await.unwrap();
590    assert!(!txn_response.succeeded);
591
592    let txn_response = kv_backend.txn(txn).await.unwrap();
593    assert!(!txn_response.succeeded);
594
595    let txn = Txn::new()
596        .when(vec![Compare::with_value(
597            key.clone(),
598            CompareOp::Less,
599            vec![2],
600        )])
601        .and_then(vec![TxnOp::Put(key.clone(), vec![3])])
602        .or_else(vec![TxnOp::Get(key.clone())]);
603    let mut txn_response = kv_backend.txn(txn).await.unwrap();
604    assert!(!txn_response.succeeded);
605    let res = txn_response.responses.pop().unwrap();
606    assert_eq!(
607        res,
608        TxnOpResponse::ResponseGet(RangeResponse {
609            kvs: vec![KeyValue {
610                key,
611                value: vec![2]
612            }],
613            more: false,
614        })
615    );
616}
617
618pub async fn test_txn_compare_not_equal(kv_backend: &impl KvBackend) {
619    let key = vec![104u8];
620    kv_backend.delete(&key, false).await.unwrap();
621
622    let txn = Txn::new()
623        .when(vec![Compare::with_value_not_exists(
624            key.clone(),
625            CompareOp::NotEqual,
626        )])
627        .and_then(vec![TxnOp::Put(key.clone(), vec![1])])
628        .or_else(vec![TxnOp::Put(key.clone(), vec![2])]);
629    let txn_response = kv_backend.txn(txn.clone()).await.unwrap();
630    assert!(!txn_response.succeeded);
631
632    let txn_response = kv_backend.txn(txn).await.unwrap();
633    assert!(txn_response.succeeded);
634
635    let txn = Txn::new()
636        .when(vec![Compare::with_value(
637            key.clone(),
638            CompareOp::Equal,
639            vec![2],
640        )])
641        .and_then(vec![TxnOp::Put(key.clone(), vec![3])])
642        .or_else(vec![TxnOp::Get(key.clone())]);
643    let mut txn_response = kv_backend.txn(txn).await.unwrap();
644    assert!(!txn_response.succeeded);
645    let res = txn_response.responses.pop().unwrap();
646    assert_eq!(
647        res,
648        TxnOpResponse::ResponseGet(RangeResponse {
649            kvs: vec![KeyValue {
650                key,
651                value: vec![1]
652            }],
653            more: false,
654        })
655    );
656}