1use std::sync::atomic::{AtomicU8, Ordering};
16use std::sync::Arc;
17
18use txn::{Compare, CompareOp, TxnOp};
19
20use super::{KvBackend, *};
21use crate::error::Error;
22use crate::rpc::store::{BatchGetRequest, PutRequest};
23use crate::rpc::KeyValue;
24use crate::util;
25
26pub fn mock_kvs(prefix: Vec<u8>) -> Vec<KeyValue> {
27 vec![
28 KeyValue {
29 key: [prefix.clone(), b"key1".to_vec()].concat(),
30 value: b"val1".to_vec(),
31 },
32 KeyValue {
33 key: [prefix.clone(), b"key2".to_vec()].concat(),
34 value: b"val2".to_vec(),
35 },
36 KeyValue {
37 key: [prefix.clone(), b"key3".to_vec()].concat(),
38 value: b"val3".to_vec(),
39 },
40 KeyValue {
41 key: [prefix.clone(), b"key11".to_vec()].concat(),
42 value: b"val11".to_vec(),
43 },
44 ]
45}
46
47pub async fn prepare_kv(kv_backend: &impl KvBackend) {
48 prepare_kv_with_prefix(kv_backend, vec![]).await;
49}
50
51pub async fn prepare_kv_with_prefix(kv_backend: &impl KvBackend, prefix: Vec<u8>) {
52 let kvs = mock_kvs(prefix);
53 assert!(kv_backend
54 .batch_put(BatchPutRequest {
55 kvs,
56 ..Default::default()
57 })
58 .await
59 .is_ok());
60}
61
62pub async fn unprepare_kv(kv_backend: &impl KvBackend, prefix: &[u8]) {
63 let range_end = util::get_prefix_end_key(prefix);
64 assert!(
65 kv_backend
66 .delete_range(DeleteRangeRequest {
67 key: prefix.to_vec(),
68 range_end,
69 ..Default::default()
70 })
71 .await
72 .is_ok(),
73 "prefix: {:?}",
74 std::str::from_utf8(prefix).unwrap()
75 );
76}
77
78pub async fn test_kv_put(kv_backend: &impl KvBackend) {
79 test_kv_put_with_prefix(kv_backend, vec![]).await;
80}
81
82pub async fn test_kv_put_with_prefix(kv_backend: &impl KvBackend, prefix: Vec<u8>) {
83 let put_key = [prefix.clone(), b"key11".to_vec()].concat();
84 let resp = kv_backend
85 .put(PutRequest {
86 key: put_key.clone(),
87 value: b"val12".to_vec(),
88 prev_kv: false,
89 })
90 .await
91 .unwrap();
92 assert!(resp.prev_kv.is_none());
93
94 let resp = kv_backend
95 .put(PutRequest {
96 key: put_key.clone(),
97 value: b"val13".to_vec(),
98 prev_kv: true,
99 })
100 .await
101 .unwrap();
102 let prev_kv = resp.prev_kv.unwrap();
103 assert_eq!(put_key, prev_kv.key());
104 assert_eq!(b"val12", prev_kv.value());
105}
106
107pub async fn test_kv_range(kv_backend: &impl KvBackend) {
108 test_kv_range_with_prefix(kv_backend, vec![]).await;
109}
110
111pub async fn test_kv_range_with_prefix(kv_backend: &impl KvBackend, prefix: Vec<u8>) {
112 let key = [prefix.clone(), b"key1".to_vec()].concat();
113 let key11 = [prefix.clone(), b"key11".to_vec()].concat();
114 let range_end = util::get_prefix_end_key(&key);
115
116 let resp = kv_backend
117 .range(RangeRequest {
118 key: key.clone(),
119 range_end: range_end.clone(),
120 limit: 0,
121 keys_only: false,
122 })
123 .await
124 .unwrap();
125
126 assert_eq!(2, resp.kvs.len());
127 assert_eq!(key, resp.kvs[0].key);
128 assert_eq!(b"val1", resp.kvs[0].value());
129 assert_eq!(key11, resp.kvs[1].key);
130 assert_eq!(b"val11", resp.kvs[1].value());
131
132 let resp = kv_backend
133 .range(RangeRequest {
134 key: key.clone(),
135 range_end: range_end.clone(),
136 limit: 0,
137 keys_only: true,
138 })
139 .await
140 .unwrap();
141
142 assert_eq!(2, resp.kvs.len());
143 assert_eq!(key, resp.kvs[0].key);
144 assert_eq!(b"", resp.kvs[0].value());
145 assert_eq!(key11, resp.kvs[1].key);
146 assert_eq!(b"", resp.kvs[1].value());
147
148 let resp = kv_backend
149 .range(RangeRequest {
150 key: key.clone(),
151 limit: 0,
152 keys_only: false,
153 ..Default::default()
154 })
155 .await
156 .unwrap();
157
158 assert_eq!(1, resp.kvs.len());
159 assert_eq!(key, resp.kvs[0].key);
160 assert_eq!(b"val1", resp.kvs[0].value());
161
162 let resp = kv_backend
163 .range(RangeRequest {
164 key: key.clone(),
165 range_end,
166 limit: 1,
167 keys_only: false,
168 })
169 .await
170 .unwrap();
171
172 assert_eq!(1, resp.kvs.len());
173 assert_eq!(key, resp.kvs[0].key);
174 assert_eq!(b"val1", resp.kvs[0].value());
175}
176
177pub async fn test_kv_range_2(kv_backend: &impl KvBackend) {
178 test_kv_range_2_with_prefix(kv_backend, vec![]).await;
179}
180
181pub async fn test_kv_range_2_with_prefix(kv_backend: &impl KvBackend, prefix: Vec<u8>) {
182 let atest = [prefix.clone(), b"atest".to_vec()].concat();
183 let test = [prefix.clone(), b"test".to_vec()].concat();
184
185 kv_backend
186 .put(
187 PutRequest::new()
188 .with_key(atest.clone())
189 .with_value("value"),
190 )
191 .await
192 .unwrap();
193
194 kv_backend
195 .put(PutRequest::new().with_key(test.clone()).with_value("value"))
196 .await
197 .unwrap();
198
199 let all_start = [prefix.clone(), b"\0".to_vec()].concat();
201 let all_end = if prefix.is_empty() {
202 b"\0".to_vec()
203 } else {
204 util::get_prefix_end_key(&prefix)
205 };
206 let result = kv_backend
207 .range(RangeRequest::new().with_range(all_start, all_end.clone()))
208 .await
209 .unwrap();
210
211 assert_eq!(result.kvs.len(), 2);
212 assert!(!result.more);
213
214 let a_start = [prefix.clone(), b"a".to_vec()].concat();
216 let result = kv_backend
217 .range(RangeRequest::new().with_range(a_start.clone(), all_end.clone()))
218 .await
219 .unwrap();
220
221 assert_eq!(result.kvs.len(), 2);
222
223 let b_start = [prefix.clone(), b"b".to_vec()].concat();
224 let result = kv_backend
225 .range(RangeRequest::new().with_range(b_start, all_end.clone()))
226 .await
227 .unwrap();
228
229 assert_eq!(result.kvs.len(), 1);
230 assert_eq!(result.kvs[0].key, test);
231
232 let result = kv_backend
234 .range(
235 RangeRequest::new()
236 .with_range(a_start.clone(), all_end.clone())
237 .with_limit(1),
238 )
239 .await
240 .unwrap();
241 assert_eq!(result.kvs.len(), 1);
242 assert!(result.more);
243
244 let result = kv_backend
246 .range(
247 RangeRequest::new()
248 .with_range(a_start.clone(), all_end.clone())
249 .with_limit(2),
250 )
251 .await
252 .unwrap();
253 assert_eq!(result.kvs.len(), 2);
254 assert!(!result.more);
255
256 let result = kv_backend
258 .range(
259 RangeRequest::new()
260 .with_range(a_start.clone(), all_end.clone())
261 .with_limit(3),
262 )
263 .await
264 .unwrap();
265 assert_eq!(result.kvs.len(), 2);
266 assert!(!result.more);
267
268 let req = BatchDeleteRequest {
269 keys: vec![atest, test],
270 prev_kv: false,
271 };
272 let resp = kv_backend.batch_delete(req).await.unwrap();
273 assert!(resp.prev_kvs.is_empty());
274}
275
276pub async fn test_kv_batch_get(kv_backend: &impl KvBackend) {
277 test_kv_batch_get_with_prefix(kv_backend, vec![]).await;
278}
279
280pub async fn test_kv_batch_get_with_prefix(kv_backend: &impl KvBackend, prefix: Vec<u8>) {
281 let keys = vec![];
282 let resp = kv_backend
283 .batch_get(BatchGetRequest { keys })
284 .await
285 .unwrap();
286
287 assert!(resp.kvs.is_empty());
288
289 let key10 = [prefix.clone(), b"key10".to_vec()].concat();
290 let keys = vec![key10];
291 let resp = kv_backend
292 .batch_get(BatchGetRequest { keys })
293 .await
294 .unwrap();
295
296 assert!(resp.kvs.is_empty());
297
298 let key1 = [prefix.clone(), b"key1".to_vec()].concat();
299 let key3 = [prefix.clone(), b"key3".to_vec()].concat();
300 let key4 = [prefix.clone(), b"key4".to_vec()].concat();
301 let keys = vec![key1.clone(), key3.clone(), key4];
302 let resp = kv_backend
303 .batch_get(BatchGetRequest { keys })
304 .await
305 .unwrap();
306
307 assert_eq!(2, resp.kvs.len());
308 assert_eq!(key1, resp.kvs[0].key);
309 assert_eq!(b"val1", resp.kvs[0].value());
310 assert_eq!(key3, resp.kvs[1].key);
311 assert_eq!(b"val3", resp.kvs[1].value());
312}
313
314pub async fn test_kv_compare_and_put(kv_backend: Arc<dyn KvBackend<Error = Error>>) {
315 test_kv_compare_and_put_with_prefix(kv_backend, vec![]).await;
316}
317
318pub async fn test_kv_compare_and_put_with_prefix(
319 kv_backend: Arc<dyn KvBackend<Error = Error>>,
320 prefix: Vec<u8>,
321) {
322 let success = Arc::new(AtomicU8::new(0));
323 let key = [prefix.clone(), b"key".to_vec()].concat();
324
325 let mut joins = vec![];
326 for _ in 0..20 {
327 let kv_backend_clone = kv_backend.clone();
328 let success_clone = success.clone();
329 let key_clone = key.clone();
330
331 let join = tokio::spawn(async move {
332 let req = CompareAndPutRequest {
333 key: key_clone,
334 expect: vec![],
335 value: b"val_new".to_vec(),
336 };
337 let resp = kv_backend_clone.compare_and_put(req).await.unwrap();
338 if resp.success {
339 success_clone.fetch_add(1, Ordering::SeqCst);
340 }
341 });
342 joins.push(join);
343 }
344
345 for join in joins {
346 join.await.unwrap();
347 }
348
349 assert_eq!(1, success.load(Ordering::SeqCst));
350
351 let resp = kv_backend.delete(&key, false).await.unwrap();
352 assert!(resp.is_none());
353}
354
355pub async fn test_kv_delete_range(kv_backend: &impl KvBackend) {
356 test_kv_delete_range_with_prefix(kv_backend, vec![]).await;
357}
358
359pub async fn test_kv_delete_range_with_prefix(kv_backend: &impl KvBackend, prefix: Vec<u8>) {
360 let key3 = [prefix.clone(), b"key3".to_vec()].concat();
361 let req = DeleteRangeRequest {
362 key: key3.clone(),
363 range_end: vec![],
364 prev_kv: true,
365 };
366
367 let resp = kv_backend.delete_range(req).await.unwrap();
368 assert_eq!(1, resp.prev_kvs.len());
369 assert_eq!(1, resp.deleted);
370 assert_eq!(key3, resp.prev_kvs[0].key);
371 assert_eq!(b"val3", resp.prev_kvs[0].value());
372
373 let resp = kv_backend.get(&key3).await.unwrap();
374 assert!(resp.is_none());
375
376 let key2 = [prefix.clone(), b"key2".to_vec()].concat();
377 let req = DeleteRangeRequest {
378 key: key2.clone(),
379 range_end: vec![],
380 prev_kv: false,
381 };
382
383 let resp = kv_backend.delete_range(req).await.unwrap();
384 assert_eq!(1, resp.deleted);
385 assert!(resp.prev_kvs.is_empty());
386
387 let resp = kv_backend.get(&key2).await.unwrap();
388 assert!(resp.is_none());
389
390 let key = [prefix.clone(), b"key1".to_vec()].concat();
391 let range_end = util::get_prefix_end_key(&key);
392
393 let req = DeleteRangeRequest {
394 key: key.clone(),
395 range_end: range_end.clone(),
396 prev_kv: true,
397 };
398 let resp = kv_backend.delete_range(req).await.unwrap();
399 assert_eq!(2, resp.prev_kvs.len());
400
401 let req = RangeRequest {
402 key,
403 range_end,
404 ..Default::default()
405 };
406 let resp = kv_backend.range(req).await.unwrap();
407 assert!(resp.kvs.is_empty());
408}
409
410pub async fn test_kv_batch_delete(kv_backend: &impl KvBackend) {
411 test_kv_batch_delete_with_prefix(kv_backend, vec![]).await;
412}
413
414pub async fn test_kv_batch_delete_with_prefix(kv_backend: &impl KvBackend, prefix: Vec<u8>) {
415 let key1 = [prefix.clone(), b"key1".to_vec()].concat();
416 let key100 = [prefix.clone(), b"key100".to_vec()].concat();
417 assert!(kv_backend.get(&key1).await.unwrap().is_some());
418 assert!(kv_backend.get(&key100).await.unwrap().is_none());
419
420 let req = BatchDeleteRequest {
421 keys: vec![key1.clone(), key100.clone()],
422 prev_kv: true,
423 };
424 let resp = kv_backend.batch_delete(req).await.unwrap();
425 assert_eq!(1, resp.prev_kvs.len());
426 assert_eq!(
427 vec![KeyValue {
428 key: key1.clone(),
429 value: b"val1".to_vec()
430 }],
431 resp.prev_kvs
432 );
433 assert!(kv_backend.get(&key1).await.unwrap().is_none());
434
435 let key2 = [prefix.clone(), b"key2".to_vec()].concat();
436 let key3 = [prefix.clone(), b"key3".to_vec()].concat();
437 let key11 = [prefix.clone(), b"key11".to_vec()].concat();
438 assert!(kv_backend.get(&key2).await.unwrap().is_some());
439 assert!(kv_backend.get(&key3).await.unwrap().is_some());
440 assert!(kv_backend.get(&key11).await.unwrap().is_some());
441
442 let req = BatchDeleteRequest {
443 keys: vec![key2.clone(), key3.clone(), key11.clone()],
444 prev_kv: false,
445 };
446 let resp = kv_backend.batch_delete(req).await.unwrap();
447 assert!(resp.prev_kvs.is_empty());
448
449 assert!(kv_backend.get(&key2).await.unwrap().is_none());
450 assert!(kv_backend.get(&key3).await.unwrap().is_none());
451 assert!(kv_backend.get(&key11).await.unwrap().is_none());
452}
453
454pub async fn test_txn_one_compare_op(kv_backend: &impl KvBackend) {
455 let _ = kv_backend
456 .put(PutRequest {
457 key: vec![11],
458 value: vec![3],
459 ..Default::default()
460 })
461 .await
462 .unwrap();
463
464 let txn = Txn::new()
465 .when(vec![Compare::with_value(
466 vec![11],
467 CompareOp::Greater,
468 vec![1],
469 )])
470 .and_then(vec![TxnOp::Put(vec![11], vec![1])])
471 .or_else(vec![TxnOp::Put(vec![11], vec![2])]);
472
473 let txn_response = kv_backend.txn(txn).await.unwrap();
474
475 assert!(txn_response.succeeded);
476 assert_eq!(txn_response.responses.len(), 1);
477}
478
479pub async fn text_txn_multi_compare_op(kv_backend: &impl KvBackend) {
480 for i in 1..3 {
481 let _ = kv_backend
482 .put(PutRequest {
483 key: vec![i],
484 value: vec![i],
485 ..Default::default()
486 })
487 .await
488 .unwrap();
489 }
490
491 let when: Vec<_> = (1..3u8)
492 .map(|i| Compare::with_value(vec![i], CompareOp::Equal, vec![i]))
493 .collect();
494
495 let txn = Txn::new()
496 .when(when)
497 .and_then(vec![
498 TxnOp::Put(vec![1], vec![10]),
499 TxnOp::Put(vec![2], vec![20]),
500 ])
501 .or_else(vec![TxnOp::Put(vec![1], vec![11])]);
502
503 let txn_response = kv_backend.txn(txn).await.unwrap();
504
505 assert!(txn_response.succeeded);
506 assert_eq!(txn_response.responses.len(), 2);
507}
508
509pub async fn test_txn_compare_equal(kv_backend: &impl KvBackend) {
510 let key = vec![101u8];
511 kv_backend.delete(&key, false).await.unwrap();
512
513 let txn = Txn::new()
514 .when(vec![Compare::with_value_not_exists(
515 key.clone(),
516 CompareOp::Equal,
517 )])
518 .and_then(vec![TxnOp::Put(key.clone(), vec![1])])
519 .or_else(vec![TxnOp::Put(key.clone(), vec![2])]);
520 let txn_response = kv_backend.txn(txn.clone()).await.unwrap();
521 assert!(txn_response.succeeded);
522
523 let txn_response = kv_backend.txn(txn).await.unwrap();
524 assert!(!txn_response.succeeded);
525
526 let txn = Txn::new()
527 .when(vec![Compare::with_value(
528 key.clone(),
529 CompareOp::Equal,
530 vec![2],
531 )])
532 .and_then(vec![TxnOp::Put(key.clone(), vec![3])])
533 .or_else(vec![TxnOp::Put(key, vec![4])]);
534 let txn_response = kv_backend.txn(txn).await.unwrap();
535 assert!(txn_response.succeeded);
536}
537
538pub async fn test_txn_compare_greater(kv_backend: &impl KvBackend) {
539 let key = vec![102u8];
540 kv_backend.delete(&key, false).await.unwrap();
541
542 let txn = Txn::new()
543 .when(vec![Compare::with_value_not_exists(
544 key.clone(),
545 CompareOp::Greater,
546 )])
547 .and_then(vec![TxnOp::Put(key.clone(), vec![1])])
548 .or_else(vec![TxnOp::Put(key.clone(), vec![2])]);
549 let txn_response = kv_backend.txn(txn.clone()).await.unwrap();
550 assert!(!txn_response.succeeded);
551
552 let txn_response = kv_backend.txn(txn).await.unwrap();
553 assert!(txn_response.succeeded);
554
555 let txn = Txn::new()
556 .when(vec![Compare::with_value(
557 key.clone(),
558 CompareOp::Greater,
559 vec![1],
560 )])
561 .and_then(vec![TxnOp::Put(key.clone(), vec![3])])
562 .or_else(vec![TxnOp::Get(key.clone())]);
563 let mut txn_response = kv_backend.txn(txn).await.unwrap();
564 assert!(!txn_response.succeeded);
565 let res = txn_response.responses.pop().unwrap();
566 assert_eq!(
567 res,
568 TxnOpResponse::ResponseGet(RangeResponse {
569 kvs: vec![KeyValue {
570 key,
571 value: vec![1]
572 }],
573 more: false,
574 })
575 );
576}
577
578pub async fn test_txn_compare_less(kv_backend: &impl KvBackend) {
579 let key = vec![103u8];
580 kv_backend.delete(&[3], false).await.unwrap();
581
582 let txn = Txn::new()
583 .when(vec![Compare::with_value_not_exists(
584 key.clone(),
585 CompareOp::Less,
586 )])
587 .and_then(vec![TxnOp::Put(key.clone(), vec![1])])
588 .or_else(vec![TxnOp::Put(key.clone(), vec![2])]);
589 let txn_response = kv_backend.txn(txn.clone()).await.unwrap();
590 assert!(!txn_response.succeeded);
591
592 let txn_response = kv_backend.txn(txn).await.unwrap();
593 assert!(!txn_response.succeeded);
594
595 let txn = Txn::new()
596 .when(vec![Compare::with_value(
597 key.clone(),
598 CompareOp::Less,
599 vec![2],
600 )])
601 .and_then(vec![TxnOp::Put(key.clone(), vec![3])])
602 .or_else(vec![TxnOp::Get(key.clone())]);
603 let mut txn_response = kv_backend.txn(txn).await.unwrap();
604 assert!(!txn_response.succeeded);
605 let res = txn_response.responses.pop().unwrap();
606 assert_eq!(
607 res,
608 TxnOpResponse::ResponseGet(RangeResponse {
609 kvs: vec![KeyValue {
610 key,
611 value: vec![2]
612 }],
613 more: false,
614 })
615 );
616}
617
618pub async fn test_txn_compare_not_equal(kv_backend: &impl KvBackend) {
619 let key = vec![104u8];
620 kv_backend.delete(&key, false).await.unwrap();
621
622 let txn = Txn::new()
623 .when(vec![Compare::with_value_not_exists(
624 key.clone(),
625 CompareOp::NotEqual,
626 )])
627 .and_then(vec![TxnOp::Put(key.clone(), vec![1])])
628 .or_else(vec![TxnOp::Put(key.clone(), vec![2])]);
629 let txn_response = kv_backend.txn(txn.clone()).await.unwrap();
630 assert!(!txn_response.succeeded);
631
632 let txn_response = kv_backend.txn(txn).await.unwrap();
633 assert!(txn_response.succeeded);
634
635 let txn = Txn::new()
636 .when(vec![Compare::with_value(
637 key.clone(),
638 CompareOp::Equal,
639 vec![2],
640 )])
641 .and_then(vec![TxnOp::Put(key.clone(), vec![3])])
642 .or_else(vec![TxnOp::Get(key.clone())]);
643 let mut txn_response = kv_backend.txn(txn).await.unwrap();
644 assert!(!txn_response.succeeded);
645 let res = txn_response.responses.pop().unwrap();
646 assert_eq!(
647 res,
648 TxnOpResponse::ResponseGet(RangeResponse {
649 kvs: vec![KeyValue {
650 key,
651 value: vec![1]
652 }],
653 more: false,
654 })
655 );
656}