1use std::any::Any;
18use std::ops::Bound::{Excluded, Included, Unbounded};
19use std::path::Path;
20use std::sync::{Arc, RwLock};
21
22use common_config::KvBackendConfig;
23use common_error::ext::BoxedError;
24use common_meta::error as meta_error;
25use common_meta::kv_backend::txn::{Txn, TxnOp, TxnOpResponse, TxnRequest, TxnResponse};
26use common_meta::kv_backend::{KvBackend, TxnService};
27use common_meta::rpc::store::{
28 BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
29 BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse,
30 RangeRequest, RangeResponse,
31};
32use common_meta::rpc::KeyValue;
33use common_meta::util::get_next_prefix_key;
34use common_runtime::RepeatedTask;
35use raft_engine::{Config, Engine, LogBatch, ReadableSize, RecoveryMode};
36use snafu::{IntoError, ResultExt};
37
38use crate::error::{self, Error, IoSnafu, RaftEngineSnafu, StartWalTaskSnafu};
39use crate::raft_engine::log_store::PurgeExpiredFilesFunction;
40
41pub(crate) const SYSTEM_NAMESPACE: u64 = 0;
42
43pub struct RaftEngineBackend {
45 engine: RwLock<Arc<Engine>>,
46 _gc_task: RepeatedTask<Error>,
47}
48
49fn ensure_dir(dir: &str) -> error::Result<()> {
50 let io_context = |err| {
51 IoSnafu {
52 path: dir.to_string(),
53 }
54 .into_error(err)
55 };
56
57 let path = Path::new(dir);
58 if !path.exists() {
59 return std::fs::create_dir_all(path).map_err(io_context);
61 }
62
63 let metadata = std::fs::metadata(path).map_err(io_context)?;
64 if !metadata.is_dir() {
65 return Err(io_context(std::io::ErrorKind::NotADirectory.into()));
66 }
67
68 Ok(())
69}
70
71impl RaftEngineBackend {
72 pub fn try_open_with_cfg(dir: String, config: &KvBackendConfig) -> error::Result<Self> {
73 let cfg = Config {
74 dir: dir.to_string(),
75 purge_threshold: ReadableSize(config.purge_threshold.0),
76 recovery_mode: RecoveryMode::TolerateTailCorruption,
77 batch_compression_threshold: ReadableSize::kb(8),
78 target_file_size: ReadableSize(config.file_size.0),
79 ..Default::default()
80 };
81
82 ensure_dir(&dir)?;
83 if let Some(spill_dir) = &cfg.spill_dir {
84 ensure_dir(spill_dir)?;
85 }
86
87 let engine = Arc::new(Engine::open(cfg).context(RaftEngineSnafu)?);
88 let gc_task = RepeatedTask::new(
89 config.purge_interval,
90 Box::new(PurgeExpiredFilesFunction {
91 engine: engine.clone(),
92 }),
93 );
94 gc_task
95 .start(common_runtime::global_runtime())
96 .context(StartWalTaskSnafu { name: "gc_task" })?;
97
98 Ok(Self {
99 engine: RwLock::new(engine),
100 _gc_task: gc_task,
101 })
102 }
103}
104
105#[async_trait::async_trait]
106impl TxnService for RaftEngineBackend {
107 type Error = meta_error::Error;
108
109 async fn txn(&self, txn: Txn) -> meta_error::Result<TxnResponse> {
110 let TxnRequest {
111 compare,
112 success,
113 failure,
114 } = txn.into();
115
116 let mut succeeded = true;
117
118 #[allow(clippy::readonly_write_lock)]
122 let engine = self.engine.write().unwrap();
123
124 for cmp in compare {
125 let existing_value = engine_get(&engine, &cmp.key)?.map(|kv| kv.value);
126 if !cmp.compare_value(existing_value.as_ref()) {
127 succeeded = false;
128 break;
129 }
130 }
131
132 let mut batch = LogBatch::default();
133 let do_txn = |txn_op| match txn_op {
134 TxnOp::Put(key, value) => {
135 batch
136 .put(SYSTEM_NAMESPACE, key, value)
137 .context(RaftEngineSnafu)
138 .map_err(BoxedError::new)
139 .context(meta_error::ExternalSnafu)?;
140 Ok(TxnOpResponse::ResponsePut(PutResponse { prev_kv: None }))
141 }
142
143 TxnOp::Get(key) => {
144 let value = engine_get(&engine, &key)?.map(|kv| kv.value);
145 let kvs = value
146 .map(|value| KeyValue { key, value })
147 .into_iter()
148 .collect();
149 Ok(TxnOpResponse::ResponseGet(RangeResponse {
150 kvs,
151 more: false,
152 }))
153 }
154
155 TxnOp::Delete(key) => {
156 let prev = engine_get(&engine, &key)?;
157 batch.delete(SYSTEM_NAMESPACE, key);
158 let deleted = if prev.is_some() { 1 } else { 0 };
159 Ok(TxnOpResponse::ResponseDelete(DeleteRangeResponse {
160 deleted,
161 prev_kvs: vec![],
162 }))
163 }
164 };
165
166 let responses = if succeeded { success } else { failure }
167 .into_iter()
168 .map(do_txn)
169 .collect::<meta_error::Result<_>>()?;
170
171 engine
172 .write(&mut batch, false)
173 .context(RaftEngineSnafu)
174 .map_err(BoxedError::new)
175 .context(meta_error::ExternalSnafu)?;
176
177 Ok(TxnResponse {
178 succeeded,
179 responses,
180 })
181 }
182
183 fn max_txn_ops(&self) -> usize {
184 usize::MAX
185 }
186}
187
188#[async_trait::async_trait]
189impl KvBackend for RaftEngineBackend {
190 fn name(&self) -> &str {
191 "RaftEngineBackend"
192 }
193
194 fn as_any(&self) -> &dyn Any {
195 self
196 }
197
198 async fn range(&self, req: RangeRequest) -> Result<RangeResponse, Self::Error> {
199 let mut res = vec![];
200 let (start, end) = req.range();
201 let RangeRequest {
202 keys_only, limit, ..
203 } = req;
204
205 let (start_key, end_key) = match (start, end) {
206 (Included(start), Included(end)) => (Some(start), Some(get_next_prefix_key(&end))),
207 (Unbounded, Unbounded) => (None, None),
208 (Included(start), Excluded(end)) => (Some(start), Some(end)),
209 (Included(start), Unbounded) => (Some(start), None),
210 _ => unreachable!(),
211 };
212 let mut more = false;
213 let mut iter = 0;
214
215 self.engine
216 .read()
217 .unwrap()
218 .scan_raw_messages(
219 SYSTEM_NAMESPACE,
220 start_key.as_deref(),
221 end_key.as_deref(),
222 false,
223 |key, value| {
224 let take = limit == 0 || iter != limit;
225 iter += 1;
226 more = limit > 0 && iter > limit;
227
228 if take {
229 res.push(KeyValue {
230 key: key.to_vec(),
231 value: if keys_only { vec![] } else { value.to_vec() },
232 });
233 }
234
235 take
236 },
237 )
238 .context(RaftEngineSnafu)
239 .map_err(BoxedError::new)
240 .context(meta_error::ExternalSnafu)?;
241 Ok(RangeResponse { kvs: res, more })
242 }
243
244 async fn put(&self, req: PutRequest) -> Result<PutResponse, Self::Error> {
245 let PutRequest {
246 key,
247 value,
248 prev_kv,
249 } = req;
250
251 let mut prev = None;
252 let engine = self.engine.read().unwrap();
255 if prev_kv {
256 prev = engine_get(&engine, &key)?;
257 }
258 engine_put(&engine, key, value)?;
259 Ok(PutResponse { prev_kv: prev })
260 }
261
262 async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse, Self::Error> {
263 let BatchPutRequest { kvs, prev_kv } = req;
264 let mut batch = LogBatch::with_capacity(kvs.len());
265
266 let mut prev_kvs = if prev_kv {
267 Vec::with_capacity(kvs.len())
268 } else {
269 vec![]
270 };
271
272 let engine = self.engine.read().unwrap();
273 for kv in kvs {
274 if prev_kv && let Some(kv) = engine_get(&engine, &kv.key)? {
275 prev_kvs.push(kv);
276 }
277 batch
278 .put(SYSTEM_NAMESPACE, kv.key, kv.value)
279 .context(RaftEngineSnafu)
280 .map_err(BoxedError::new)
281 .context(meta_error::ExternalSnafu)?;
282 }
283
284 engine
285 .write(&mut batch, false)
286 .context(RaftEngineSnafu)
287 .map_err(BoxedError::new)
288 .context(meta_error::ExternalSnafu)?;
289
290 Ok(BatchPutResponse { prev_kvs })
291 }
292
293 async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse, Self::Error> {
294 let mut response = BatchGetResponse {
295 kvs: Vec::with_capacity(req.keys.len()),
296 };
297 for key in req.keys {
298 let Some(value) = self.engine.read().unwrap().get(SYSTEM_NAMESPACE, &key) else {
299 continue;
300 };
301 response.kvs.push(KeyValue { key, value });
302 }
303 Ok(response)
304 }
305
306 async fn delete_range(
307 &self,
308 req: DeleteRangeRequest,
309 ) -> Result<DeleteRangeResponse, Self::Error> {
310 let DeleteRangeRequest {
311 key,
312 range_end,
313 prev_kv,
314 } = req;
315
316 let range = RangeRequest {
317 key,
318 range_end,
319 limit: 0,
320 keys_only: false,
321 };
322 let range_resp = self.range(range).await?;
323
324 let mut prev_kvs = vec![];
325 let mut deleted = 0;
326
327 let engine = self.engine.read().unwrap();
328 for kv in range_resp.kvs {
329 engine_delete(&engine, &kv.key)?;
330 if prev_kv {
331 prev_kvs.push(kv);
332 }
333 deleted += 1;
334 }
335
336 Ok(DeleteRangeResponse { deleted, prev_kvs })
337 }
338
339 async fn batch_delete(
340 &self,
341 req: BatchDeleteRequest,
342 ) -> Result<BatchDeleteResponse, Self::Error> {
343 let BatchDeleteRequest { keys, prev_kv } = req;
344
345 let mut prev_kvs = if prev_kv {
346 Vec::with_capacity(keys.len())
347 } else {
348 vec![]
349 };
350 let mut batch = LogBatch::with_capacity(keys.len());
351 let engine = self.engine.read().unwrap();
352 for key in keys {
353 if prev_kv && let Some(prev) = engine_get(&engine, &key)? {
354 prev_kvs.push(prev);
355 }
356 batch.delete(SYSTEM_NAMESPACE, key);
357 }
358 engine
359 .write(&mut batch, false)
360 .context(RaftEngineSnafu)
361 .map_err(BoxedError::new)
362 .context(meta_error::ExternalSnafu)?;
363 Ok(BatchDeleteResponse { prev_kvs })
364 }
365
366 async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>, Self::Error> {
367 engine_get(&self.engine.read().unwrap(), key)
368 }
369
370 async fn exists(&self, key: &[u8]) -> Result<bool, Self::Error> {
371 Ok(engine_get(&self.engine.read().unwrap(), key)?.is_some())
372 }
373
374 async fn delete(&self, key: &[u8], prev_kv: bool) -> Result<Option<KeyValue>, Self::Error> {
375 let engine = self.engine.read().unwrap();
376 let prev = if prev_kv {
377 engine_get(&engine, key)?
378 } else {
379 None
380 };
381 engine_delete(&engine, key)?;
382 Ok(prev)
383 }
384}
385
386fn engine_get(engine: &Engine, key: &[u8]) -> meta_error::Result<Option<KeyValue>> {
387 let res = engine.get(SYSTEM_NAMESPACE, key);
388 Ok(res.map(|value| KeyValue {
389 key: key.to_vec(),
390 value,
391 }))
392}
393
394fn engine_put(engine: &Engine, key: Vec<u8>, value: Vec<u8>) -> meta_error::Result<()> {
395 let mut batch = LogBatch::with_capacity(1);
396 batch
397 .put(SYSTEM_NAMESPACE, key, value)
398 .context(RaftEngineSnafu)
399 .map_err(BoxedError::new)
400 .context(meta_error::ExternalSnafu)?;
401 engine
402 .write(&mut batch, false)
403 .context(RaftEngineSnafu)
404 .map_err(BoxedError::new)
405 .context(meta_error::ExternalSnafu)?;
406 Ok(())
407}
408
409fn engine_delete(engine: &Engine, key: &[u8]) -> meta_error::Result<()> {
410 let mut batch = LogBatch::with_capacity(1);
411 batch.delete(SYSTEM_NAMESPACE, key.to_vec());
412 engine
413 .write(&mut batch, false)
414 .context(RaftEngineSnafu)
415 .map_err(BoxedError::new)
416 .context(meta_error::ExternalSnafu)?;
417 Ok(())
418}
419
420#[cfg(test)]
421mod tests {
422 use std::collections::HashSet;
423 use std::sync::Arc;
424
425 use common_meta::kv_backend::test::{
426 prepare_kv, test_kv_batch_delete, test_kv_batch_get, test_kv_compare_and_put,
427 test_kv_delete_range, test_kv_put, test_kv_range, test_kv_range_2,
428 };
429 use common_meta::rpc::store::{CompareAndPutRequest, CompareAndPutResponse};
430 use common_test_util::temp_dir::create_temp_dir;
431
432 use super::*;
433
434 fn build_kv_backend(dir: String) -> RaftEngineBackend {
435 RaftEngineBackend::try_open_with_cfg(dir, &KvBackendConfig::default()).unwrap()
436 }
437
438 #[tokio::test]
439 async fn test_raft_engine_kv() {
440 let dir = create_temp_dir("raft-engine-kv");
441 let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
442 assert!(backend.get(b"hello").await.unwrap().is_none());
443
444 let response = backend
445 .put(PutRequest {
446 key: b"hello".to_vec(),
447 value: b"world".to_vec(),
448 prev_kv: false,
449 })
450 .await
451 .unwrap();
452 assert!(response.prev_kv.is_none());
453 assert_eq!(
454 b"world".as_slice(),
455 &backend.get(b"hello").await.unwrap().unwrap().value
456 );
457 }
458
459 #[tokio::test]
460 async fn test_compare_and_put() {
461 let dir = create_temp_dir("compare_and_put");
462 let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
463
464 let key = b"hello".to_vec();
465 backend
466 .put(PutRequest {
467 key: key.clone(),
468 value: b"word".to_vec(),
469 prev_kv: false,
470 })
471 .await
472 .unwrap();
473
474 let CompareAndPutResponse { success, prev_kv } = backend
475 .compare_and_put(CompareAndPutRequest {
476 key: key.clone(),
477 expect: b"world".to_vec(),
478 value: b"whatever".to_vec(),
479 })
480 .await
481 .unwrap();
482 assert!(!success);
483 assert_eq!(b"word".as_slice(), &prev_kv.unwrap().value);
484
485 let CompareAndPutResponse { success, prev_kv } = backend
486 .compare_and_put(CompareAndPutRequest {
487 key: key.clone(),
488 expect: b"word".to_vec(),
489 value: b"world".to_vec(),
490 })
491 .await
492 .unwrap();
493 assert!(success);
494 assert!(prev_kv.is_none());
496
497 assert_eq!(
498 b"world".as_slice(),
499 &backend.get(b"hello").await.unwrap().unwrap().value
500 );
501 }
502
503 fn build_batch_key_values(start: usize, end: usize) -> Vec<KeyValue> {
504 (start..end)
505 .map(|idx| {
506 let bytes = idx.to_ne_bytes().to_vec();
507 KeyValue {
508 key: bytes.clone(),
509 value: bytes,
510 }
511 })
512 .collect()
513 }
514
515 #[tokio::test]
516 async fn test_compare_and_put_empty() {
517 let dir = create_temp_dir("compare_and_put_empty");
518 let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
519 let CompareAndPutResponse { success, prev_kv } = backend
520 .compare_and_put(CompareAndPutRequest {
521 key: b"hello".to_vec(),
522 expect: vec![],
523 value: b"world".to_vec(),
524 })
525 .await
526 .unwrap();
527 assert!(success);
528 assert!(prev_kv.is_none());
529
530 assert_eq!(
531 b"world".as_slice(),
532 &backend.get(b"hello").await.unwrap().unwrap().value
533 );
534 }
535
536 #[tokio::test]
537 async fn test_batch_put_and_scan_delete() {
538 let dir = create_temp_dir("compare_and_put");
539 let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
540
541 let BatchPutResponse { prev_kvs } = backend
543 .batch_put(BatchPutRequest {
544 kvs: build_batch_key_values(0, 10),
545 prev_kv: false,
546 })
547 .await
548 .unwrap();
549 assert_eq!(0, prev_kvs.len());
550
551 let BatchPutResponse { prev_kvs } = backend
552 .batch_put(BatchPutRequest {
553 kvs: build_batch_key_values(5, 15),
554 prev_kv: true,
555 })
556 .await
557 .unwrap();
558 let prev_kvs = prev_kvs
559 .into_iter()
560 .map(|kv| kv.key)
561 .collect::<HashSet<_>>();
562 assert_eq!(
563 build_batch_key_values(5, 10)
564 .into_iter()
565 .map(|kv| kv.key)
566 .collect::<HashSet<_>>(),
567 prev_kvs
568 );
569
570 let RangeResponse { kvs, more } = backend
572 .range(RangeRequest {
573 key: 2usize.to_ne_bytes().to_vec(),
574 range_end: 10usize.to_ne_bytes().to_vec(),
575 limit: 0,
576 keys_only: false,
577 })
578 .await
579 .unwrap();
580 assert!(!more);
581 assert_eq!(
582 build_batch_key_values(2, 10)
583 .into_iter()
584 .map(|kv| kv.key)
585 .collect::<HashSet<_>>(),
586 kvs.into_iter().map(|kv| kv.key).collect::<HashSet<_>>()
587 );
588
589 let RangeResponse { kvs, more } = backend
591 .range(RangeRequest {
592 key: 0usize.to_ne_bytes().to_vec(),
593 range_end: 1000usize.to_ne_bytes().to_vec(),
594 limit: 0,
595 keys_only: false,
596 })
597 .await
598 .unwrap();
599 assert!(!more);
600 assert_eq!(
601 build_batch_key_values(0, 15)
602 .into_iter()
603 .map(|kv| kv.key)
604 .collect::<HashSet<_>>(),
605 kvs.into_iter().map(|kv| kv.key).collect::<HashSet<_>>()
606 );
607
608 let BatchDeleteResponse { prev_kvs } = backend
610 .batch_delete(BatchDeleteRequest {
611 keys: build_batch_key_values(3, 7)
612 .into_iter()
613 .map(|kv| kv.key)
614 .collect(),
615 prev_kv: true,
616 })
617 .await
618 .unwrap();
619 assert_eq!(
620 build_batch_key_values(3, 7)
621 .into_iter()
622 .map(|kv| kv.key)
623 .collect::<HashSet<_>>(),
624 prev_kvs
625 .into_iter()
626 .map(|kv| kv.key)
627 .collect::<HashSet<_>>()
628 );
629
630 let RangeResponse { kvs, more } = backend
632 .range(RangeRequest {
633 key: 0usize.to_ne_bytes().to_vec(),
634 range_end: 1000usize.to_ne_bytes().to_vec(),
635 limit: 0,
636 keys_only: false,
637 })
638 .await
639 .unwrap();
640 assert!(!more);
641
642 let keys = kvs.into_iter().map(|kv| kv.key).collect::<HashSet<_>>();
643 assert_eq!(
644 build_batch_key_values(0, 3)
645 .into_iter()
646 .chain(build_batch_key_values(7, 15))
647 .map(|kv| kv.key)
648 .collect::<HashSet<_>>(),
649 keys
650 );
651 }
652
653 #[tokio::test]
654 async fn test_range() {
655 let dir = create_temp_dir("range");
656 let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
657 prepare_kv(&backend).await;
658
659 test_kv_range(&backend).await;
660 }
661
662 #[tokio::test]
663 async fn test_range_2() {
664 let dir = create_temp_dir("range2");
665 let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
666
667 test_kv_range_2(&backend).await;
668 }
669
670 #[tokio::test]
671 async fn test_put() {
672 let dir = create_temp_dir("put");
673 let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
674 prepare_kv(&backend).await;
675
676 test_kv_put(&backend).await;
677 }
678
679 #[tokio::test]
680 async fn test_batch_get() {
681 let dir = create_temp_dir("batch_get");
682 let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
683 prepare_kv(&backend).await;
684
685 test_kv_batch_get(&backend).await;
686 }
687
688 #[tokio::test]
689 async fn test_batch_delete() {
690 let dir = create_temp_dir("batch_delete");
691 let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
692 prepare_kv(&backend).await;
693
694 test_kv_batch_delete(&backend).await;
695 }
696
697 #[tokio::test]
698 async fn test_delete_range() {
699 let dir = create_temp_dir("delete_range");
700 let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
701 prepare_kv(&backend).await;
702
703 test_kv_delete_range(&backend).await;
704 }
705
706 #[tokio::test(flavor = "multi_thread")]
707 async fn test_compare_and_put_2() {
708 let dir = create_temp_dir("compare_and_put");
709 let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
710 prepare_kv(&backend).await;
711
712 test_kv_compare_and_put(Arc::new(backend)).await;
713 }
714}