1use std::sync::Arc;
16use std::sync::atomic::{AtomicU8, Ordering};
17
18use txn::{Compare, CompareOp, TxnOp};
19
20use super::{KvBackend, *};
21use crate::error::Error;
22use crate::rpc::KeyValue;
23use crate::rpc::store::{BatchGetRequest, PutRequest};
24use crate::util;
25
26pub fn mock_kvs(prefix: Vec<u8>) -> Vec<KeyValue> {
27 vec![
28 KeyValue {
29 key: [prefix.clone(), b"key1".to_vec()].concat(),
30 value: b"val1".to_vec(),
31 },
32 KeyValue {
33 key: [prefix.clone(), b"key2".to_vec()].concat(),
34 value: b"val2".to_vec(),
35 },
36 KeyValue {
37 key: [prefix.clone(), b"key3".to_vec()].concat(),
38 value: b"val3".to_vec(),
39 },
40 KeyValue {
41 key: [prefix.clone(), b"key11".to_vec()].concat(),
42 value: b"val11".to_vec(),
43 },
44 ]
45}
46
47pub async fn prepare_kv(kv_backend: &impl KvBackend) {
48 prepare_kv_with_prefix(kv_backend, vec![]).await;
49}
50
51pub async fn prepare_kv_with_prefix(kv_backend: &impl KvBackend, prefix: Vec<u8>) {
52 let kvs = mock_kvs(prefix);
53 assert!(
54 kv_backend
55 .batch_put(BatchPutRequest {
56 kvs,
57 ..Default::default()
58 })
59 .await
60 .is_ok()
61 );
62}
63
64pub async fn unprepare_kv(kv_backend: &impl KvBackend, prefix: &[u8]) {
65 let range_end = util::get_prefix_end_key(prefix);
66 assert!(
67 kv_backend
68 .delete_range(DeleteRangeRequest {
69 key: prefix.to_vec(),
70 range_end,
71 ..Default::default()
72 })
73 .await
74 .is_ok(),
75 "prefix: {:?}",
76 std::str::from_utf8(prefix).unwrap()
77 );
78}
79
80pub async fn test_kv_put(kv_backend: &impl KvBackend) {
81 test_kv_put_with_prefix(kv_backend, vec![]).await;
82}
83
84pub async fn test_kv_put_with_prefix(kv_backend: &impl KvBackend, prefix: Vec<u8>) {
85 let put_key = [prefix.clone(), b"key11".to_vec()].concat();
86 let resp = kv_backend
87 .put(PutRequest {
88 key: put_key.clone(),
89 value: b"val12".to_vec(),
90 prev_kv: false,
91 })
92 .await
93 .unwrap();
94 assert!(resp.prev_kv.is_none());
95
96 let resp = kv_backend
97 .put(PutRequest {
98 key: put_key.clone(),
99 value: b"val13".to_vec(),
100 prev_kv: true,
101 })
102 .await
103 .unwrap();
104 let prev_kv = resp.prev_kv.unwrap();
105 assert_eq!(put_key, prev_kv.key());
106 assert_eq!(b"val12", prev_kv.value());
107}
108
109pub async fn test_kv_range(kv_backend: &impl KvBackend) {
110 test_kv_range_with_prefix(kv_backend, vec![]).await;
111}
112
113pub async fn test_simple_kv_range(kvbackend: &impl KvBackend) {
114 {
115 let full_query = RangeRequest::new().with_range(vec![0], vec![0]);
116 let response = kvbackend.range(full_query).await.unwrap();
117 assert_eq!(response.kvs.len(), 4);
118 }
119 {
120 let point_query = RangeRequest::new().with_range(b"key11".to_vec(), vec![]);
121 let response = kvbackend.range(point_query).await.unwrap();
122 assert_eq!(response.kvs.len(), 1);
123 }
124 {
125 let left_bounded_query = RangeRequest::new().with_range(b"key1".to_vec(), vec![0]);
126 let response = kvbackend.range(left_bounded_query).await.unwrap();
127 assert_eq!(response.kvs.len(), 4);
128 }
129 {
130 let range_query = RangeRequest::new().with_range(b"key1".to_vec(), b"key11".to_vec());
131 let response = kvbackend.range(range_query).await.unwrap();
132 assert_eq!(response.kvs.len(), 1);
133 }
134 {
135 let prefix_query = RangeRequest::new().with_range(b"key1".to_vec(), b"key2".to_vec());
136 let response = kvbackend.range(prefix_query).await.unwrap();
137 assert_eq!(response.kvs.len(), 2);
138 }
139 {
140 let range_query = RangeRequest::new().with_range(b"key10".to_vec(), b"key100".to_vec());
141 let response = kvbackend.range(range_query).await.unwrap();
142 assert_eq!(response.kvs.len(), 0);
143 }
144 {
145 let prefix_query = RangeRequest::new().with_range(b"key10".to_vec(), b"key11".to_vec());
146 let response = kvbackend.range(prefix_query).await.unwrap();
147 assert_eq!(response.kvs.len(), 0);
148 }
149}
150
151pub async fn test_kv_range_with_prefix(kv_backend: &impl KvBackend, prefix: Vec<u8>) {
152 let key = [prefix.clone(), b"key1".to_vec()].concat();
153 let key11 = [prefix.clone(), b"key11".to_vec()].concat();
154 let range_end = util::get_prefix_end_key(&key);
155
156 let resp = kv_backend
157 .range(RangeRequest {
158 key: key.clone(),
159 range_end: range_end.clone(),
160 limit: 0,
161 keys_only: false,
162 })
163 .await
164 .unwrap();
165
166 assert_eq!(2, resp.kvs.len());
167 assert_eq!(key, resp.kvs[0].key);
168 assert_eq!(b"val1", resp.kvs[0].value());
169 assert_eq!(key11, resp.kvs[1].key);
170 assert_eq!(b"val11", resp.kvs[1].value());
171
172 let resp = kv_backend
173 .range(RangeRequest {
174 key: key.clone(),
175 range_end: range_end.clone(),
176 limit: 0,
177 keys_only: true,
178 })
179 .await
180 .unwrap();
181
182 assert_eq!(2, resp.kvs.len());
183 assert_eq!(key, resp.kvs[0].key);
184 assert_eq!(b"", resp.kvs[0].value());
185 assert_eq!(key11, resp.kvs[1].key);
186 assert_eq!(b"", resp.kvs[1].value());
187
188 let resp = kv_backend
189 .range(RangeRequest {
190 key: key.clone(),
191 limit: 0,
192 keys_only: false,
193 ..Default::default()
194 })
195 .await
196 .unwrap();
197
198 assert_eq!(1, resp.kvs.len());
199 assert_eq!(key, resp.kvs[0].key);
200 assert_eq!(b"val1", resp.kvs[0].value());
201
202 let resp = kv_backend
203 .range(RangeRequest {
204 key: key.clone(),
205 range_end,
206 limit: 1,
207 keys_only: false,
208 })
209 .await
210 .unwrap();
211
212 assert_eq!(1, resp.kvs.len());
213 assert_eq!(key, resp.kvs[0].key);
214 assert_eq!(b"val1", resp.kvs[0].value());
215}
216
217pub async fn test_kv_range_2(kv_backend: &impl KvBackend) {
218 test_kv_range_2_with_prefix(kv_backend, vec![]).await;
219}
220
221pub async fn test_kv_range_2_with_prefix(kv_backend: &impl KvBackend, prefix: Vec<u8>) {
222 let atest = [prefix.clone(), b"atest".to_vec()].concat();
223 let test = [prefix.clone(), b"test".to_vec()].concat();
224
225 kv_backend
226 .put(
227 PutRequest::new()
228 .with_key(atest.clone())
229 .with_value("value"),
230 )
231 .await
232 .unwrap();
233
234 kv_backend
235 .put(PutRequest::new().with_key(test.clone()).with_value("value"))
236 .await
237 .unwrap();
238
239 let all_start = [prefix.clone(), b"\0".to_vec()].concat();
241 let all_end = if prefix.is_empty() {
242 b"\0".to_vec()
243 } else {
244 util::get_prefix_end_key(&prefix)
245 };
246 let result = kv_backend
247 .range(RangeRequest::new().with_range(all_start, all_end.clone()))
248 .await
249 .unwrap();
250
251 assert_eq!(result.kvs.len(), 2);
252 assert!(!result.more);
253
254 let a_start = [prefix.clone(), b"a".to_vec()].concat();
256 let result = kv_backend
257 .range(RangeRequest::new().with_range(a_start.clone(), all_end.clone()))
258 .await
259 .unwrap();
260
261 assert_eq!(result.kvs.len(), 2);
262
263 let b_start = [prefix.clone(), b"b".to_vec()].concat();
264 let result = kv_backend
265 .range(RangeRequest::new().with_range(b_start, all_end.clone()))
266 .await
267 .unwrap();
268
269 assert_eq!(result.kvs.len(), 1);
270 assert_eq!(result.kvs[0].key, test);
271
272 let result = kv_backend
274 .range(
275 RangeRequest::new()
276 .with_range(a_start.clone(), all_end.clone())
277 .with_limit(1),
278 )
279 .await
280 .unwrap();
281 assert_eq!(result.kvs.len(), 1);
282 assert!(result.more);
283
284 let result = kv_backend
286 .range(
287 RangeRequest::new()
288 .with_range(a_start.clone(), all_end.clone())
289 .with_limit(2),
290 )
291 .await
292 .unwrap();
293 assert_eq!(result.kvs.len(), 2);
294 assert!(!result.more);
295
296 let result = kv_backend
298 .range(
299 RangeRequest::new()
300 .with_range(a_start.clone(), all_end.clone())
301 .with_limit(3),
302 )
303 .await
304 .unwrap();
305 assert_eq!(result.kvs.len(), 2);
306 assert!(!result.more);
307
308 let req = BatchDeleteRequest {
309 keys: vec![atest, test],
310 prev_kv: false,
311 };
312 let resp = kv_backend.batch_delete(req).await.unwrap();
313 assert!(resp.prev_kvs.is_empty());
314}
315
316pub async fn test_kv_batch_get(kv_backend: &impl KvBackend) {
317 test_kv_batch_get_with_prefix(kv_backend, vec![]).await;
318}
319
320pub async fn test_kv_batch_get_with_prefix(kv_backend: &impl KvBackend, prefix: Vec<u8>) {
321 let keys = vec![];
322 let resp = kv_backend
323 .batch_get(BatchGetRequest { keys })
324 .await
325 .unwrap();
326
327 assert!(resp.kvs.is_empty());
328
329 let key10 = [prefix.clone(), b"key10".to_vec()].concat();
330 let keys = vec![key10];
331 let resp = kv_backend
332 .batch_get(BatchGetRequest { keys })
333 .await
334 .unwrap();
335
336 assert!(resp.kvs.is_empty());
337
338 let key1 = [prefix.clone(), b"key1".to_vec()].concat();
339 let key3 = [prefix.clone(), b"key3".to_vec()].concat();
340 let key4 = [prefix.clone(), b"key4".to_vec()].concat();
341 let keys = vec![key1.clone(), key3.clone(), key4];
342 let resp = kv_backend
343 .batch_get(BatchGetRequest { keys })
344 .await
345 .unwrap();
346
347 assert_eq!(2, resp.kvs.len());
348 assert_eq!(key1, resp.kvs[0].key);
349 assert_eq!(b"val1", resp.kvs[0].value());
350 assert_eq!(key3, resp.kvs[1].key);
351 assert_eq!(b"val3", resp.kvs[1].value());
352}
353
354pub async fn test_kv_compare_and_put(kv_backend: Arc<dyn KvBackend<Error = Error>>) {
355 test_kv_compare_and_put_with_prefix(kv_backend, vec![]).await;
356}
357
358pub async fn test_kv_compare_and_put_with_prefix(
359 kv_backend: Arc<dyn KvBackend<Error = Error>>,
360 prefix: Vec<u8>,
361) {
362 let success = Arc::new(AtomicU8::new(0));
363 let key = [prefix.clone(), b"key".to_vec()].concat();
364
365 let mut joins = vec![];
366 for _ in 0..20 {
367 let kv_backend_clone = kv_backend.clone();
368 let success_clone = success.clone();
369 let key_clone = key.clone();
370
371 let join = tokio::spawn(async move {
372 let req = CompareAndPutRequest {
373 key: key_clone,
374 expect: vec![],
375 value: b"val_new".to_vec(),
376 };
377 let resp = kv_backend_clone.compare_and_put(req).await.unwrap();
378 if resp.success {
379 success_clone.fetch_add(1, Ordering::SeqCst);
380 }
381 });
382 joins.push(join);
383 }
384
385 for join in joins {
386 join.await.unwrap();
387 }
388
389 assert_eq!(1, success.load(Ordering::SeqCst));
390
391 let resp = kv_backend.delete(&key, false).await.unwrap();
392 assert!(resp.is_none());
393}
394
395pub async fn test_kv_delete_range(kv_backend: &impl KvBackend) {
396 test_kv_delete_range_with_prefix(kv_backend, vec![]).await;
397}
398
399pub async fn test_kv_delete_range_with_prefix(kv_backend: &impl KvBackend, prefix: Vec<u8>) {
400 let key3 = [prefix.clone(), b"key3".to_vec()].concat();
401 let req = DeleteRangeRequest {
402 key: key3.clone(),
403 range_end: vec![],
404 prev_kv: true,
405 };
406
407 let resp = kv_backend.delete_range(req).await.unwrap();
408 assert_eq!(1, resp.prev_kvs.len());
409 assert_eq!(1, resp.deleted);
410 assert_eq!(key3, resp.prev_kvs[0].key);
411 assert_eq!(b"val3", resp.prev_kvs[0].value());
412
413 let resp = kv_backend.get(&key3).await.unwrap();
414 assert!(resp.is_none());
415
416 let key2 = [prefix.clone(), b"key2".to_vec()].concat();
417 let req = DeleteRangeRequest {
418 key: key2.clone(),
419 range_end: vec![],
420 prev_kv: false,
421 };
422
423 let resp = kv_backend.delete_range(req).await.unwrap();
424 assert_eq!(1, resp.deleted);
425 assert!(resp.prev_kvs.is_empty());
426
427 let resp = kv_backend.get(&key2).await.unwrap();
428 assert!(resp.is_none());
429
430 let key = [prefix.clone(), b"key1".to_vec()].concat();
431 let range_end = util::get_prefix_end_key(&key);
432
433 let req = DeleteRangeRequest {
434 key: key.clone(),
435 range_end: range_end.clone(),
436 prev_kv: true,
437 };
438 let resp = kv_backend.delete_range(req).await.unwrap();
439 assert_eq!(2, resp.prev_kvs.len());
440
441 let req = RangeRequest {
442 key,
443 range_end,
444 ..Default::default()
445 };
446 let resp = kv_backend.range(req).await.unwrap();
447 assert!(resp.kvs.is_empty());
448}
449
450pub async fn test_kv_batch_delete(kv_backend: &impl KvBackend) {
451 test_kv_batch_delete_with_prefix(kv_backend, vec![]).await;
452}
453
454pub async fn test_kv_batch_delete_with_prefix(kv_backend: &impl KvBackend, prefix: Vec<u8>) {
455 let key1 = [prefix.clone(), b"key1".to_vec()].concat();
456 let key100 = [prefix.clone(), b"key100".to_vec()].concat();
457 assert!(kv_backend.get(&key1).await.unwrap().is_some());
458 assert!(kv_backend.get(&key100).await.unwrap().is_none());
459
460 let req = BatchDeleteRequest {
461 keys: vec![key1.clone(), key100.clone()],
462 prev_kv: true,
463 };
464 let resp = kv_backend.batch_delete(req).await.unwrap();
465 assert_eq!(1, resp.prev_kvs.len());
466 assert_eq!(
467 vec![KeyValue {
468 key: key1.clone(),
469 value: b"val1".to_vec()
470 }],
471 resp.prev_kvs
472 );
473 assert!(kv_backend.get(&key1).await.unwrap().is_none());
474
475 let key2 = [prefix.clone(), b"key2".to_vec()].concat();
476 let key3 = [prefix.clone(), b"key3".to_vec()].concat();
477 let key11 = [prefix.clone(), b"key11".to_vec()].concat();
478 assert!(kv_backend.get(&key2).await.unwrap().is_some());
479 assert!(kv_backend.get(&key3).await.unwrap().is_some());
480 assert!(kv_backend.get(&key11).await.unwrap().is_some());
481
482 let req = BatchDeleteRequest {
483 keys: vec![key2.clone(), key3.clone(), key11.clone()],
484 prev_kv: false,
485 };
486 let resp = kv_backend.batch_delete(req).await.unwrap();
487 assert!(resp.prev_kvs.is_empty());
488
489 assert!(kv_backend.get(&key2).await.unwrap().is_none());
490 assert!(kv_backend.get(&key3).await.unwrap().is_none());
491 assert!(kv_backend.get(&key11).await.unwrap().is_none());
492}
493
494pub async fn test_txn_one_compare_op(kv_backend: &impl KvBackend) {
495 let _ = kv_backend
496 .put(PutRequest {
497 key: vec![11],
498 value: vec![3],
499 ..Default::default()
500 })
501 .await
502 .unwrap();
503
504 let txn = Txn::new()
505 .when(vec![Compare::with_value(
506 vec![11],
507 CompareOp::Greater,
508 vec![1],
509 )])
510 .and_then(vec![TxnOp::Put(vec![11], vec![1])])
511 .or_else(vec![TxnOp::Put(vec![11], vec![2])]);
512
513 let txn_response = kv_backend.txn(txn).await.unwrap();
514
515 assert!(txn_response.succeeded);
516 assert_eq!(txn_response.responses.len(), 1);
517}
518
519pub async fn text_txn_multi_compare_op(kv_backend: &impl KvBackend) {
520 for i in 1..3 {
521 let _ = kv_backend
522 .put(PutRequest {
523 key: vec![i],
524 value: vec![i],
525 ..Default::default()
526 })
527 .await
528 .unwrap();
529 }
530
531 let when: Vec<_> = (1..3u8)
532 .map(|i| Compare::with_value(vec![i], CompareOp::Equal, vec![i]))
533 .collect();
534
535 let txn = Txn::new()
536 .when(when)
537 .and_then(vec![
538 TxnOp::Put(vec![1], vec![10]),
539 TxnOp::Put(vec![2], vec![20]),
540 ])
541 .or_else(vec![TxnOp::Put(vec![1], vec![11])]);
542
543 let txn_response = kv_backend.txn(txn).await.unwrap();
544
545 assert!(txn_response.succeeded);
546 assert_eq!(txn_response.responses.len(), 2);
547}
548
549pub async fn test_txn_compare_equal(kv_backend: &impl KvBackend) {
550 let key = vec![101u8];
551 kv_backend.delete(&key, false).await.unwrap();
552
553 let txn = Txn::new()
554 .when(vec![Compare::with_value_not_exists(
555 key.clone(),
556 CompareOp::Equal,
557 )])
558 .and_then(vec![TxnOp::Put(key.clone(), vec![1])])
559 .or_else(vec![TxnOp::Put(key.clone(), vec![2])]);
560 let txn_response = kv_backend.txn(txn.clone()).await.unwrap();
561 assert!(txn_response.succeeded);
562
563 let txn_response = kv_backend.txn(txn).await.unwrap();
564 assert!(!txn_response.succeeded);
565
566 let txn = Txn::new()
567 .when(vec![Compare::with_value(
568 key.clone(),
569 CompareOp::Equal,
570 vec![2],
571 )])
572 .and_then(vec![TxnOp::Put(key.clone(), vec![3])])
573 .or_else(vec![TxnOp::Put(key, vec![4])]);
574 let txn_response = kv_backend.txn(txn).await.unwrap();
575 assert!(txn_response.succeeded);
576}
577
578pub async fn test_txn_compare_greater(kv_backend: &impl KvBackend) {
579 let key = vec![102u8];
580 kv_backend.delete(&key, false).await.unwrap();
581
582 let txn = Txn::new()
583 .when(vec![Compare::with_value_not_exists(
584 key.clone(),
585 CompareOp::Greater,
586 )])
587 .and_then(vec![TxnOp::Put(key.clone(), vec![1])])
588 .or_else(vec![TxnOp::Put(key.clone(), vec![2])]);
589 let txn_response = kv_backend.txn(txn.clone()).await.unwrap();
590 assert!(!txn_response.succeeded);
591
592 let txn_response = kv_backend.txn(txn).await.unwrap();
593 assert!(txn_response.succeeded);
594
595 let txn = Txn::new()
596 .when(vec![Compare::with_value(
597 key.clone(),
598 CompareOp::Greater,
599 vec![1],
600 )])
601 .and_then(vec![TxnOp::Put(key.clone(), vec![3])])
602 .or_else(vec![TxnOp::Get(key.clone())]);
603 let mut txn_response = kv_backend.txn(txn).await.unwrap();
604 assert!(!txn_response.succeeded);
605 let res = txn_response.responses.pop().unwrap();
606 assert_eq!(
607 res,
608 TxnOpResponse::ResponseGet(RangeResponse {
609 kvs: vec![KeyValue {
610 key,
611 value: vec![1]
612 }],
613 more: false,
614 })
615 );
616}
617
618pub async fn test_txn_compare_less(kv_backend: &impl KvBackend) {
619 let key = vec![103u8];
620 kv_backend.delete(&[3], false).await.unwrap();
621
622 let txn = Txn::new()
623 .when(vec![Compare::with_value_not_exists(
624 key.clone(),
625 CompareOp::Less,
626 )])
627 .and_then(vec![TxnOp::Put(key.clone(), vec![1])])
628 .or_else(vec![TxnOp::Put(key.clone(), vec![2])]);
629 let txn_response = kv_backend.txn(txn.clone()).await.unwrap();
630 assert!(!txn_response.succeeded);
631
632 let txn_response = kv_backend.txn(txn).await.unwrap();
633 assert!(!txn_response.succeeded);
634
635 let txn = Txn::new()
636 .when(vec![Compare::with_value(
637 key.clone(),
638 CompareOp::Less,
639 vec![2],
640 )])
641 .and_then(vec![TxnOp::Put(key.clone(), vec![3])])
642 .or_else(vec![TxnOp::Get(key.clone())]);
643 let mut txn_response = kv_backend.txn(txn).await.unwrap();
644 assert!(!txn_response.succeeded);
645 let res = txn_response.responses.pop().unwrap();
646 assert_eq!(
647 res,
648 TxnOpResponse::ResponseGet(RangeResponse {
649 kvs: vec![KeyValue {
650 key,
651 value: vec![2]
652 }],
653 more: false,
654 })
655 );
656}
657
658pub async fn test_txn_compare_not_equal(kv_backend: &impl KvBackend) {
659 let key = vec![104u8];
660 kv_backend.delete(&key, false).await.unwrap();
661
662 let txn = Txn::new()
663 .when(vec![Compare::with_value_not_exists(
664 key.clone(),
665 CompareOp::NotEqual,
666 )])
667 .and_then(vec![TxnOp::Put(key.clone(), vec![1])])
668 .or_else(vec![TxnOp::Put(key.clone(), vec![2])]);
669 let txn_response = kv_backend.txn(txn.clone()).await.unwrap();
670 assert!(!txn_response.succeeded);
671
672 let txn_response = kv_backend.txn(txn).await.unwrap();
673 assert!(txn_response.succeeded);
674
675 let txn = Txn::new()
676 .when(vec![Compare::with_value(
677 key.clone(),
678 CompareOp::Equal,
679 vec![2],
680 )])
681 .and_then(vec![TxnOp::Put(key.clone(), vec![3])])
682 .or_else(vec![TxnOp::Get(key.clone())]);
683 let mut txn_response = kv_backend.txn(txn).await.unwrap();
684 assert!(!txn_response.succeeded);
685 let res = txn_response.responses.pop().unwrap();
686 assert_eq!(
687 res,
688 TxnOpResponse::ResponseGet(RangeResponse {
689 kvs: vec![KeyValue {
690 key,
691 value: vec![1]
692 }],
693 more: false,
694 })
695 );
696}