1use std::sync::atomic::{AtomicU8, Ordering};
16use std::sync::Arc;
17
18use txn::{Compare, CompareOp, TxnOp};
19
20use super::{KvBackend, *};
21use crate::error::Error;
22use crate::rpc::store::{BatchGetRequest, PutRequest};
23use crate::rpc::KeyValue;
24use crate::util;
25
26pub fn mock_kvs(prefix: Vec<u8>) -> Vec<KeyValue> {
27 vec![
28 KeyValue {
29 key: [prefix.clone(), b"key1".to_vec()].concat(),
30 value: b"val1".to_vec(),
31 },
32 KeyValue {
33 key: [prefix.clone(), b"key2".to_vec()].concat(),
34 value: b"val2".to_vec(),
35 },
36 KeyValue {
37 key: [prefix.clone(), b"key3".to_vec()].concat(),
38 value: b"val3".to_vec(),
39 },
40 KeyValue {
41 key: [prefix.clone(), b"key11".to_vec()].concat(),
42 value: b"val11".to_vec(),
43 },
44 ]
45}
46
47pub async fn prepare_kv(kv_backend: &impl KvBackend) {
48 prepare_kv_with_prefix(kv_backend, vec![]).await;
49}
50
51pub async fn prepare_kv_with_prefix(kv_backend: &impl KvBackend, prefix: Vec<u8>) {
52 let kvs = mock_kvs(prefix);
53 assert!(kv_backend
54 .batch_put(BatchPutRequest {
55 kvs,
56 ..Default::default()
57 })
58 .await
59 .is_ok());
60}
61
62pub async fn unprepare_kv(kv_backend: &impl KvBackend, prefix: &[u8]) {
63 let range_end = util::get_prefix_end_key(prefix);
64 assert!(
65 kv_backend
66 .delete_range(DeleteRangeRequest {
67 key: prefix.to_vec(),
68 range_end,
69 ..Default::default()
70 })
71 .await
72 .is_ok(),
73 "prefix: {:?}",
74 std::str::from_utf8(prefix).unwrap()
75 );
76}
77
78pub async fn test_kv_put(kv_backend: &impl KvBackend) {
79 test_kv_put_with_prefix(kv_backend, vec![]).await;
80}
81
82pub async fn test_kv_put_with_prefix(kv_backend: &impl KvBackend, prefix: Vec<u8>) {
83 let put_key = [prefix.clone(), b"key11".to_vec()].concat();
84 let resp = kv_backend
85 .put(PutRequest {
86 key: put_key.clone(),
87 value: b"val12".to_vec(),
88 prev_kv: false,
89 })
90 .await
91 .unwrap();
92 assert!(resp.prev_kv.is_none());
93
94 let resp = kv_backend
95 .put(PutRequest {
96 key: put_key.clone(),
97 value: b"val13".to_vec(),
98 prev_kv: true,
99 })
100 .await
101 .unwrap();
102 let prev_kv = resp.prev_kv.unwrap();
103 assert_eq!(put_key, prev_kv.key());
104 assert_eq!(b"val12", prev_kv.value());
105}
106
107pub async fn test_kv_range(kv_backend: &impl KvBackend) {
108 test_kv_range_with_prefix(kv_backend, vec![]).await;
109}
110
111pub async fn test_simple_kv_range(kvbackend: &impl KvBackend) {
112 {
113 let full_query = RangeRequest::new().with_range(vec![0], vec![0]);
114 let response = kvbackend.range(full_query).await.unwrap();
115 assert_eq!(response.kvs.len(), 4);
116 }
117 {
118 let point_query = RangeRequest::new().with_range(b"key11".to_vec(), vec![]);
119 let response = kvbackend.range(point_query).await.unwrap();
120 assert_eq!(response.kvs.len(), 1);
121 }
122 {
123 let left_bounded_query = RangeRequest::new().with_range(b"key1".to_vec(), vec![0]);
124 let response = kvbackend.range(left_bounded_query).await.unwrap();
125 assert_eq!(response.kvs.len(), 4);
126 }
127 {
128 let range_query = RangeRequest::new().with_range(b"key1".to_vec(), b"key11".to_vec());
129 let response = kvbackend.range(range_query).await.unwrap();
130 assert_eq!(response.kvs.len(), 1);
131 }
132 {
133 let prefix_query = RangeRequest::new().with_range(b"key1".to_vec(), b"key2".to_vec());
134 let response = kvbackend.range(prefix_query).await.unwrap();
135 assert_eq!(response.kvs.len(), 2);
136 }
137 {
138 let range_query = RangeRequest::new().with_range(b"key10".to_vec(), b"key100".to_vec());
139 let response = kvbackend.range(range_query).await.unwrap();
140 assert_eq!(response.kvs.len(), 0);
141 }
142 {
143 let prefix_query = RangeRequest::new().with_range(b"key10".to_vec(), b"key11".to_vec());
144 let response = kvbackend.range(prefix_query).await.unwrap();
145 assert_eq!(response.kvs.len(), 0);
146 }
147}
148
149pub async fn test_kv_range_with_prefix(kv_backend: &impl KvBackend, prefix: Vec<u8>) {
150 let key = [prefix.clone(), b"key1".to_vec()].concat();
151 let key11 = [prefix.clone(), b"key11".to_vec()].concat();
152 let range_end = util::get_prefix_end_key(&key);
153
154 let resp = kv_backend
155 .range(RangeRequest {
156 key: key.clone(),
157 range_end: range_end.clone(),
158 limit: 0,
159 keys_only: false,
160 })
161 .await
162 .unwrap();
163
164 assert_eq!(2, resp.kvs.len());
165 assert_eq!(key, resp.kvs[0].key);
166 assert_eq!(b"val1", resp.kvs[0].value());
167 assert_eq!(key11, resp.kvs[1].key);
168 assert_eq!(b"val11", resp.kvs[1].value());
169
170 let resp = kv_backend
171 .range(RangeRequest {
172 key: key.clone(),
173 range_end: range_end.clone(),
174 limit: 0,
175 keys_only: true,
176 })
177 .await
178 .unwrap();
179
180 assert_eq!(2, resp.kvs.len());
181 assert_eq!(key, resp.kvs[0].key);
182 assert_eq!(b"", resp.kvs[0].value());
183 assert_eq!(key11, resp.kvs[1].key);
184 assert_eq!(b"", resp.kvs[1].value());
185
186 let resp = kv_backend
187 .range(RangeRequest {
188 key: key.clone(),
189 limit: 0,
190 keys_only: false,
191 ..Default::default()
192 })
193 .await
194 .unwrap();
195
196 assert_eq!(1, resp.kvs.len());
197 assert_eq!(key, resp.kvs[0].key);
198 assert_eq!(b"val1", resp.kvs[0].value());
199
200 let resp = kv_backend
201 .range(RangeRequest {
202 key: key.clone(),
203 range_end,
204 limit: 1,
205 keys_only: false,
206 })
207 .await
208 .unwrap();
209
210 assert_eq!(1, resp.kvs.len());
211 assert_eq!(key, resp.kvs[0].key);
212 assert_eq!(b"val1", resp.kvs[0].value());
213}
214
215pub async fn test_kv_range_2(kv_backend: &impl KvBackend) {
216 test_kv_range_2_with_prefix(kv_backend, vec![]).await;
217}
218
219pub async fn test_kv_range_2_with_prefix(kv_backend: &impl KvBackend, prefix: Vec<u8>) {
220 let atest = [prefix.clone(), b"atest".to_vec()].concat();
221 let test = [prefix.clone(), b"test".to_vec()].concat();
222
223 kv_backend
224 .put(
225 PutRequest::new()
226 .with_key(atest.clone())
227 .with_value("value"),
228 )
229 .await
230 .unwrap();
231
232 kv_backend
233 .put(PutRequest::new().with_key(test.clone()).with_value("value"))
234 .await
235 .unwrap();
236
237 let all_start = [prefix.clone(), b"\0".to_vec()].concat();
239 let all_end = if prefix.is_empty() {
240 b"\0".to_vec()
241 } else {
242 util::get_prefix_end_key(&prefix)
243 };
244 let result = kv_backend
245 .range(RangeRequest::new().with_range(all_start, all_end.clone()))
246 .await
247 .unwrap();
248
249 assert_eq!(result.kvs.len(), 2);
250 assert!(!result.more);
251
252 let a_start = [prefix.clone(), b"a".to_vec()].concat();
254 let result = kv_backend
255 .range(RangeRequest::new().with_range(a_start.clone(), all_end.clone()))
256 .await
257 .unwrap();
258
259 assert_eq!(result.kvs.len(), 2);
260
261 let b_start = [prefix.clone(), b"b".to_vec()].concat();
262 let result = kv_backend
263 .range(RangeRequest::new().with_range(b_start, all_end.clone()))
264 .await
265 .unwrap();
266
267 assert_eq!(result.kvs.len(), 1);
268 assert_eq!(result.kvs[0].key, test);
269
270 let result = kv_backend
272 .range(
273 RangeRequest::new()
274 .with_range(a_start.clone(), all_end.clone())
275 .with_limit(1),
276 )
277 .await
278 .unwrap();
279 assert_eq!(result.kvs.len(), 1);
280 assert!(result.more);
281
282 let result = kv_backend
284 .range(
285 RangeRequest::new()
286 .with_range(a_start.clone(), all_end.clone())
287 .with_limit(2),
288 )
289 .await
290 .unwrap();
291 assert_eq!(result.kvs.len(), 2);
292 assert!(!result.more);
293
294 let result = kv_backend
296 .range(
297 RangeRequest::new()
298 .with_range(a_start.clone(), all_end.clone())
299 .with_limit(3),
300 )
301 .await
302 .unwrap();
303 assert_eq!(result.kvs.len(), 2);
304 assert!(!result.more);
305
306 let req = BatchDeleteRequest {
307 keys: vec![atest, test],
308 prev_kv: false,
309 };
310 let resp = kv_backend.batch_delete(req).await.unwrap();
311 assert!(resp.prev_kvs.is_empty());
312}
313
314pub async fn test_kv_batch_get(kv_backend: &impl KvBackend) {
315 test_kv_batch_get_with_prefix(kv_backend, vec![]).await;
316}
317
318pub async fn test_kv_batch_get_with_prefix(kv_backend: &impl KvBackend, prefix: Vec<u8>) {
319 let keys = vec![];
320 let resp = kv_backend
321 .batch_get(BatchGetRequest { keys })
322 .await
323 .unwrap();
324
325 assert!(resp.kvs.is_empty());
326
327 let key10 = [prefix.clone(), b"key10".to_vec()].concat();
328 let keys = vec![key10];
329 let resp = kv_backend
330 .batch_get(BatchGetRequest { keys })
331 .await
332 .unwrap();
333
334 assert!(resp.kvs.is_empty());
335
336 let key1 = [prefix.clone(), b"key1".to_vec()].concat();
337 let key3 = [prefix.clone(), b"key3".to_vec()].concat();
338 let key4 = [prefix.clone(), b"key4".to_vec()].concat();
339 let keys = vec![key1.clone(), key3.clone(), key4];
340 let resp = kv_backend
341 .batch_get(BatchGetRequest { keys })
342 .await
343 .unwrap();
344
345 assert_eq!(2, resp.kvs.len());
346 assert_eq!(key1, resp.kvs[0].key);
347 assert_eq!(b"val1", resp.kvs[0].value());
348 assert_eq!(key3, resp.kvs[1].key);
349 assert_eq!(b"val3", resp.kvs[1].value());
350}
351
352pub async fn test_kv_compare_and_put(kv_backend: Arc<dyn KvBackend<Error = Error>>) {
353 test_kv_compare_and_put_with_prefix(kv_backend, vec![]).await;
354}
355
356pub async fn test_kv_compare_and_put_with_prefix(
357 kv_backend: Arc<dyn KvBackend<Error = Error>>,
358 prefix: Vec<u8>,
359) {
360 let success = Arc::new(AtomicU8::new(0));
361 let key = [prefix.clone(), b"key".to_vec()].concat();
362
363 let mut joins = vec![];
364 for _ in 0..20 {
365 let kv_backend_clone = kv_backend.clone();
366 let success_clone = success.clone();
367 let key_clone = key.clone();
368
369 let join = tokio::spawn(async move {
370 let req = CompareAndPutRequest {
371 key: key_clone,
372 expect: vec![],
373 value: b"val_new".to_vec(),
374 };
375 let resp = kv_backend_clone.compare_and_put(req).await.unwrap();
376 if resp.success {
377 success_clone.fetch_add(1, Ordering::SeqCst);
378 }
379 });
380 joins.push(join);
381 }
382
383 for join in joins {
384 join.await.unwrap();
385 }
386
387 assert_eq!(1, success.load(Ordering::SeqCst));
388
389 let resp = kv_backend.delete(&key, false).await.unwrap();
390 assert!(resp.is_none());
391}
392
393pub async fn test_kv_delete_range(kv_backend: &impl KvBackend) {
394 test_kv_delete_range_with_prefix(kv_backend, vec![]).await;
395}
396
397pub async fn test_kv_delete_range_with_prefix(kv_backend: &impl KvBackend, prefix: Vec<u8>) {
398 let key3 = [prefix.clone(), b"key3".to_vec()].concat();
399 let req = DeleteRangeRequest {
400 key: key3.clone(),
401 range_end: vec![],
402 prev_kv: true,
403 };
404
405 let resp = kv_backend.delete_range(req).await.unwrap();
406 assert_eq!(1, resp.prev_kvs.len());
407 assert_eq!(1, resp.deleted);
408 assert_eq!(key3, resp.prev_kvs[0].key);
409 assert_eq!(b"val3", resp.prev_kvs[0].value());
410
411 let resp = kv_backend.get(&key3).await.unwrap();
412 assert!(resp.is_none());
413
414 let key2 = [prefix.clone(), b"key2".to_vec()].concat();
415 let req = DeleteRangeRequest {
416 key: key2.clone(),
417 range_end: vec![],
418 prev_kv: false,
419 };
420
421 let resp = kv_backend.delete_range(req).await.unwrap();
422 assert_eq!(1, resp.deleted);
423 assert!(resp.prev_kvs.is_empty());
424
425 let resp = kv_backend.get(&key2).await.unwrap();
426 assert!(resp.is_none());
427
428 let key = [prefix.clone(), b"key1".to_vec()].concat();
429 let range_end = util::get_prefix_end_key(&key);
430
431 let req = DeleteRangeRequest {
432 key: key.clone(),
433 range_end: range_end.clone(),
434 prev_kv: true,
435 };
436 let resp = kv_backend.delete_range(req).await.unwrap();
437 assert_eq!(2, resp.prev_kvs.len());
438
439 let req = RangeRequest {
440 key,
441 range_end,
442 ..Default::default()
443 };
444 let resp = kv_backend.range(req).await.unwrap();
445 assert!(resp.kvs.is_empty());
446}
447
448pub async fn test_kv_batch_delete(kv_backend: &impl KvBackend) {
449 test_kv_batch_delete_with_prefix(kv_backend, vec![]).await;
450}
451
452pub async fn test_kv_batch_delete_with_prefix(kv_backend: &impl KvBackend, prefix: Vec<u8>) {
453 let key1 = [prefix.clone(), b"key1".to_vec()].concat();
454 let key100 = [prefix.clone(), b"key100".to_vec()].concat();
455 assert!(kv_backend.get(&key1).await.unwrap().is_some());
456 assert!(kv_backend.get(&key100).await.unwrap().is_none());
457
458 let req = BatchDeleteRequest {
459 keys: vec![key1.clone(), key100.clone()],
460 prev_kv: true,
461 };
462 let resp = kv_backend.batch_delete(req).await.unwrap();
463 assert_eq!(1, resp.prev_kvs.len());
464 assert_eq!(
465 vec![KeyValue {
466 key: key1.clone(),
467 value: b"val1".to_vec()
468 }],
469 resp.prev_kvs
470 );
471 assert!(kv_backend.get(&key1).await.unwrap().is_none());
472
473 let key2 = [prefix.clone(), b"key2".to_vec()].concat();
474 let key3 = [prefix.clone(), b"key3".to_vec()].concat();
475 let key11 = [prefix.clone(), b"key11".to_vec()].concat();
476 assert!(kv_backend.get(&key2).await.unwrap().is_some());
477 assert!(kv_backend.get(&key3).await.unwrap().is_some());
478 assert!(kv_backend.get(&key11).await.unwrap().is_some());
479
480 let req = BatchDeleteRequest {
481 keys: vec![key2.clone(), key3.clone(), key11.clone()],
482 prev_kv: false,
483 };
484 let resp = kv_backend.batch_delete(req).await.unwrap();
485 assert!(resp.prev_kvs.is_empty());
486
487 assert!(kv_backend.get(&key2).await.unwrap().is_none());
488 assert!(kv_backend.get(&key3).await.unwrap().is_none());
489 assert!(kv_backend.get(&key11).await.unwrap().is_none());
490}
491
492pub async fn test_txn_one_compare_op(kv_backend: &impl KvBackend) {
493 let _ = kv_backend
494 .put(PutRequest {
495 key: vec![11],
496 value: vec![3],
497 ..Default::default()
498 })
499 .await
500 .unwrap();
501
502 let txn = Txn::new()
503 .when(vec![Compare::with_value(
504 vec![11],
505 CompareOp::Greater,
506 vec![1],
507 )])
508 .and_then(vec![TxnOp::Put(vec![11], vec![1])])
509 .or_else(vec![TxnOp::Put(vec![11], vec![2])]);
510
511 let txn_response = kv_backend.txn(txn).await.unwrap();
512
513 assert!(txn_response.succeeded);
514 assert_eq!(txn_response.responses.len(), 1);
515}
516
517pub async fn text_txn_multi_compare_op(kv_backend: &impl KvBackend) {
518 for i in 1..3 {
519 let _ = kv_backend
520 .put(PutRequest {
521 key: vec![i],
522 value: vec![i],
523 ..Default::default()
524 })
525 .await
526 .unwrap();
527 }
528
529 let when: Vec<_> = (1..3u8)
530 .map(|i| Compare::with_value(vec![i], CompareOp::Equal, vec![i]))
531 .collect();
532
533 let txn = Txn::new()
534 .when(when)
535 .and_then(vec![
536 TxnOp::Put(vec![1], vec![10]),
537 TxnOp::Put(vec![2], vec![20]),
538 ])
539 .or_else(vec![TxnOp::Put(vec![1], vec![11])]);
540
541 let txn_response = kv_backend.txn(txn).await.unwrap();
542
543 assert!(txn_response.succeeded);
544 assert_eq!(txn_response.responses.len(), 2);
545}
546
547pub async fn test_txn_compare_equal(kv_backend: &impl KvBackend) {
548 let key = vec![101u8];
549 kv_backend.delete(&key, false).await.unwrap();
550
551 let txn = Txn::new()
552 .when(vec![Compare::with_value_not_exists(
553 key.clone(),
554 CompareOp::Equal,
555 )])
556 .and_then(vec![TxnOp::Put(key.clone(), vec![1])])
557 .or_else(vec![TxnOp::Put(key.clone(), vec![2])]);
558 let txn_response = kv_backend.txn(txn.clone()).await.unwrap();
559 assert!(txn_response.succeeded);
560
561 let txn_response = kv_backend.txn(txn).await.unwrap();
562 assert!(!txn_response.succeeded);
563
564 let txn = Txn::new()
565 .when(vec![Compare::with_value(
566 key.clone(),
567 CompareOp::Equal,
568 vec![2],
569 )])
570 .and_then(vec![TxnOp::Put(key.clone(), vec![3])])
571 .or_else(vec![TxnOp::Put(key, vec![4])]);
572 let txn_response = kv_backend.txn(txn).await.unwrap();
573 assert!(txn_response.succeeded);
574}
575
576pub async fn test_txn_compare_greater(kv_backend: &impl KvBackend) {
577 let key = vec![102u8];
578 kv_backend.delete(&key, false).await.unwrap();
579
580 let txn = Txn::new()
581 .when(vec![Compare::with_value_not_exists(
582 key.clone(),
583 CompareOp::Greater,
584 )])
585 .and_then(vec![TxnOp::Put(key.clone(), vec![1])])
586 .or_else(vec![TxnOp::Put(key.clone(), vec![2])]);
587 let txn_response = kv_backend.txn(txn.clone()).await.unwrap();
588 assert!(!txn_response.succeeded);
589
590 let txn_response = kv_backend.txn(txn).await.unwrap();
591 assert!(txn_response.succeeded);
592
593 let txn = Txn::new()
594 .when(vec![Compare::with_value(
595 key.clone(),
596 CompareOp::Greater,
597 vec![1],
598 )])
599 .and_then(vec![TxnOp::Put(key.clone(), vec![3])])
600 .or_else(vec![TxnOp::Get(key.clone())]);
601 let mut txn_response = kv_backend.txn(txn).await.unwrap();
602 assert!(!txn_response.succeeded);
603 let res = txn_response.responses.pop().unwrap();
604 assert_eq!(
605 res,
606 TxnOpResponse::ResponseGet(RangeResponse {
607 kvs: vec![KeyValue {
608 key,
609 value: vec![1]
610 }],
611 more: false,
612 })
613 );
614}
615
616pub async fn test_txn_compare_less(kv_backend: &impl KvBackend) {
617 let key = vec![103u8];
618 kv_backend.delete(&[3], false).await.unwrap();
619
620 let txn = Txn::new()
621 .when(vec![Compare::with_value_not_exists(
622 key.clone(),
623 CompareOp::Less,
624 )])
625 .and_then(vec![TxnOp::Put(key.clone(), vec![1])])
626 .or_else(vec![TxnOp::Put(key.clone(), vec![2])]);
627 let txn_response = kv_backend.txn(txn.clone()).await.unwrap();
628 assert!(!txn_response.succeeded);
629
630 let txn_response = kv_backend.txn(txn).await.unwrap();
631 assert!(!txn_response.succeeded);
632
633 let txn = Txn::new()
634 .when(vec![Compare::with_value(
635 key.clone(),
636 CompareOp::Less,
637 vec![2],
638 )])
639 .and_then(vec![TxnOp::Put(key.clone(), vec![3])])
640 .or_else(vec![TxnOp::Get(key.clone())]);
641 let mut txn_response = kv_backend.txn(txn).await.unwrap();
642 assert!(!txn_response.succeeded);
643 let res = txn_response.responses.pop().unwrap();
644 assert_eq!(
645 res,
646 TxnOpResponse::ResponseGet(RangeResponse {
647 kvs: vec![KeyValue {
648 key,
649 value: vec![2]
650 }],
651 more: false,
652 })
653 );
654}
655
656pub async fn test_txn_compare_not_equal(kv_backend: &impl KvBackend) {
657 let key = vec![104u8];
658 kv_backend.delete(&key, false).await.unwrap();
659
660 let txn = Txn::new()
661 .when(vec![Compare::with_value_not_exists(
662 key.clone(),
663 CompareOp::NotEqual,
664 )])
665 .and_then(vec![TxnOp::Put(key.clone(), vec![1])])
666 .or_else(vec![TxnOp::Put(key.clone(), vec![2])]);
667 let txn_response = kv_backend.txn(txn.clone()).await.unwrap();
668 assert!(!txn_response.succeeded);
669
670 let txn_response = kv_backend.txn(txn).await.unwrap();
671 assert!(txn_response.succeeded);
672
673 let txn = Txn::new()
674 .when(vec![Compare::with_value(
675 key.clone(),
676 CompareOp::Equal,
677 vec![2],
678 )])
679 .and_then(vec![TxnOp::Put(key.clone(), vec![3])])
680 .or_else(vec![TxnOp::Get(key.clone())]);
681 let mut txn_response = kv_backend.txn(txn).await.unwrap();
682 assert!(!txn_response.succeeded);
683 let res = txn_response.responses.pop().unwrap();
684 assert_eq!(
685 res,
686 TxnOpResponse::ResponseGet(RangeResponse {
687 kvs: vec![KeyValue {
688 key,
689 value: vec![1]
690 }],
691 more: false,
692 })
693 );
694}