log_store/raft_engine/
backend.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! [KvBackend] implementation based on [raft_engine::Engine].
16
17use std::any::Any;
18use std::ops::Bound::{Excluded, Included, Unbounded};
19use std::path::Path;
20use std::sync::{Arc, RwLock};
21
22use common_config::KvBackendConfig;
23use common_error::ext::BoxedError;
24use common_meta::error as meta_error;
25use common_meta::kv_backend::txn::{Txn, TxnOp, TxnOpResponse, TxnRequest, TxnResponse};
26use common_meta::kv_backend::{KvBackend, TxnService};
27use common_meta::rpc::store::{
28    BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
29    BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest, PutResponse,
30    RangeRequest, RangeResponse,
31};
32use common_meta::rpc::KeyValue;
33use common_meta::util::get_next_prefix_key;
34use common_runtime::RepeatedTask;
35use raft_engine::{Config, Engine, LogBatch, ReadableSize, RecoveryMode};
36use snafu::{IntoError, ResultExt};
37
38use crate::error::{self, Error, IoSnafu, RaftEngineSnafu, StartWalTaskSnafu};
39use crate::raft_engine::log_store::PurgeExpiredFilesFunction;
40
41pub(crate) const SYSTEM_NAMESPACE: u64 = 0;
42
43/// RaftEngine based [KvBackend] implementation.
44pub struct RaftEngineBackend {
45    engine: RwLock<Arc<Engine>>,
46    _gc_task: RepeatedTask<Error>,
47}
48
49fn ensure_dir(dir: &str) -> error::Result<()> {
50    let io_context = |err| {
51        IoSnafu {
52            path: dir.to_string(),
53        }
54        .into_error(err)
55    };
56
57    let path = Path::new(dir);
58    if !path.exists() {
59        // create the directory to ensure the permission
60        return std::fs::create_dir_all(path).map_err(io_context);
61    }
62
63    let metadata = std::fs::metadata(path).map_err(io_context)?;
64    if !metadata.is_dir() {
65        return Err(io_context(std::io::ErrorKind::NotADirectory.into()));
66    }
67
68    Ok(())
69}
70
71impl RaftEngineBackend {
72    pub fn try_open_with_cfg(dir: String, config: &KvBackendConfig) -> error::Result<Self> {
73        let cfg = Config {
74            dir: dir.to_string(),
75            purge_threshold: ReadableSize(config.purge_threshold.0),
76            recovery_mode: RecoveryMode::TolerateTailCorruption,
77            batch_compression_threshold: ReadableSize::kb(8),
78            target_file_size: ReadableSize(config.file_size.0),
79            ..Default::default()
80        };
81
82        ensure_dir(&dir)?;
83        if let Some(spill_dir) = &cfg.spill_dir {
84            ensure_dir(spill_dir)?;
85        }
86
87        let engine = Arc::new(Engine::open(cfg).context(RaftEngineSnafu)?);
88        let gc_task = RepeatedTask::new(
89            config.purge_interval,
90            Box::new(PurgeExpiredFilesFunction {
91                engine: engine.clone(),
92            }),
93        );
94        gc_task
95            .start(common_runtime::global_runtime())
96            .context(StartWalTaskSnafu { name: "gc_task" })?;
97
98        Ok(Self {
99            engine: RwLock::new(engine),
100            _gc_task: gc_task,
101        })
102    }
103}
104
105#[async_trait::async_trait]
106impl TxnService for RaftEngineBackend {
107    type Error = meta_error::Error;
108
109    async fn txn(&self, txn: Txn) -> meta_error::Result<TxnResponse> {
110        let TxnRequest {
111            compare,
112            success,
113            failure,
114        } = txn.into();
115
116        let mut succeeded = true;
117
118        // Here we are using the write lock to guard against parallel access inside "txn", and
119        // outside "get" or "put" etc. It doesn't serve the purpose of mutating some Rust data, so
120        // the variable is not "mut". Suppress the clippy warning because of this.
121        #[allow(clippy::readonly_write_lock)]
122        let engine = self.engine.write().unwrap();
123
124        for cmp in compare {
125            let existing_value = engine_get(&engine, &cmp.key)?.map(|kv| kv.value);
126            if !cmp.compare_value(existing_value.as_ref()) {
127                succeeded = false;
128                break;
129            }
130        }
131
132        let mut batch = LogBatch::default();
133        let do_txn = |txn_op| match txn_op {
134            TxnOp::Put(key, value) => {
135                batch
136                    .put(SYSTEM_NAMESPACE, key, value)
137                    .context(RaftEngineSnafu)
138                    .map_err(BoxedError::new)
139                    .context(meta_error::ExternalSnafu)?;
140                Ok(TxnOpResponse::ResponsePut(PutResponse { prev_kv: None }))
141            }
142
143            TxnOp::Get(key) => {
144                let value = engine_get(&engine, &key)?.map(|kv| kv.value);
145                let kvs = value
146                    .map(|value| KeyValue { key, value })
147                    .into_iter()
148                    .collect();
149                Ok(TxnOpResponse::ResponseGet(RangeResponse {
150                    kvs,
151                    more: false,
152                }))
153            }
154
155            TxnOp::Delete(key) => {
156                let prev = engine_get(&engine, &key)?;
157                batch.delete(SYSTEM_NAMESPACE, key);
158                let deleted = if prev.is_some() { 1 } else { 0 };
159                Ok(TxnOpResponse::ResponseDelete(DeleteRangeResponse {
160                    deleted,
161                    prev_kvs: vec![],
162                }))
163            }
164        };
165
166        let responses = if succeeded { success } else { failure }
167            .into_iter()
168            .map(do_txn)
169            .collect::<meta_error::Result<_>>()?;
170
171        engine
172            .write(&mut batch, false)
173            .context(RaftEngineSnafu)
174            .map_err(BoxedError::new)
175            .context(meta_error::ExternalSnafu)?;
176
177        Ok(TxnResponse {
178            succeeded,
179            responses,
180        })
181    }
182
183    fn max_txn_ops(&self) -> usize {
184        usize::MAX
185    }
186}
187
188#[async_trait::async_trait]
189impl KvBackend for RaftEngineBackend {
190    fn name(&self) -> &str {
191        "RaftEngineBackend"
192    }
193
194    fn as_any(&self) -> &dyn Any {
195        self
196    }
197
198    async fn range(&self, req: RangeRequest) -> Result<RangeResponse, Self::Error> {
199        let mut res = vec![];
200        let (start, end) = req.range();
201        let RangeRequest {
202            keys_only, limit, ..
203        } = req;
204
205        let (start_key, end_key) = match (start, end) {
206            (Included(start), Included(end)) => (Some(start), Some(get_next_prefix_key(&end))),
207            (Unbounded, Unbounded) => (None, None),
208            (Included(start), Excluded(end)) => (Some(start), Some(end)),
209            (Included(start), Unbounded) => (Some(start), None),
210            _ => unreachable!(),
211        };
212        let mut more = false;
213        let mut iter = 0;
214
215        self.engine
216            .read()
217            .unwrap()
218            .scan_raw_messages(
219                SYSTEM_NAMESPACE,
220                start_key.as_deref(),
221                end_key.as_deref(),
222                false,
223                |key, value| {
224                    let take = limit == 0 || iter != limit;
225                    iter += 1;
226                    more = limit > 0 && iter > limit;
227
228                    if take {
229                        res.push(KeyValue {
230                            key: key.to_vec(),
231                            value: if keys_only { vec![] } else { value.to_vec() },
232                        });
233                    }
234
235                    take
236                },
237            )
238            .context(RaftEngineSnafu)
239            .map_err(BoxedError::new)
240            .context(meta_error::ExternalSnafu)?;
241        Ok(RangeResponse { kvs: res, more })
242    }
243
244    async fn put(&self, req: PutRequest) -> Result<PutResponse, Self::Error> {
245        let PutRequest {
246            key,
247            value,
248            prev_kv,
249        } = req;
250
251        let mut prev = None;
252        // Engine::write assures that one batch is written atomically. The read/write lock is
253        // just to prevent race condition between put and txn.
254        let engine = self.engine.read().unwrap();
255        if prev_kv {
256            prev = engine_get(&engine, &key)?;
257        }
258        engine_put(&engine, key, value)?;
259        Ok(PutResponse { prev_kv: prev })
260    }
261
262    async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse, Self::Error> {
263        let BatchPutRequest { kvs, prev_kv } = req;
264        let mut batch = LogBatch::with_capacity(kvs.len());
265
266        let mut prev_kvs = if prev_kv {
267            Vec::with_capacity(kvs.len())
268        } else {
269            vec![]
270        };
271
272        let engine = self.engine.read().unwrap();
273        for kv in kvs {
274            if prev_kv && let Some(kv) = engine_get(&engine, &kv.key)? {
275                prev_kvs.push(kv);
276            }
277            batch
278                .put(SYSTEM_NAMESPACE, kv.key, kv.value)
279                .context(RaftEngineSnafu)
280                .map_err(BoxedError::new)
281                .context(meta_error::ExternalSnafu)?;
282        }
283
284        engine
285            .write(&mut batch, false)
286            .context(RaftEngineSnafu)
287            .map_err(BoxedError::new)
288            .context(meta_error::ExternalSnafu)?;
289
290        Ok(BatchPutResponse { prev_kvs })
291    }
292
293    async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse, Self::Error> {
294        let mut response = BatchGetResponse {
295            kvs: Vec::with_capacity(req.keys.len()),
296        };
297        for key in req.keys {
298            let Some(value) = self.engine.read().unwrap().get(SYSTEM_NAMESPACE, &key) else {
299                continue;
300            };
301            response.kvs.push(KeyValue { key, value });
302        }
303        Ok(response)
304    }
305
306    async fn delete_range(
307        &self,
308        req: DeleteRangeRequest,
309    ) -> Result<DeleteRangeResponse, Self::Error> {
310        let DeleteRangeRequest {
311            key,
312            range_end,
313            prev_kv,
314        } = req;
315
316        let range = RangeRequest {
317            key,
318            range_end,
319            limit: 0,
320            keys_only: false,
321        };
322        let range_resp = self.range(range).await?;
323
324        let mut prev_kvs = vec![];
325        let mut deleted = 0;
326
327        let engine = self.engine.read().unwrap();
328        for kv in range_resp.kvs {
329            engine_delete(&engine, &kv.key)?;
330            if prev_kv {
331                prev_kvs.push(kv);
332            }
333            deleted += 1;
334        }
335
336        Ok(DeleteRangeResponse { deleted, prev_kvs })
337    }
338
339    async fn batch_delete(
340        &self,
341        req: BatchDeleteRequest,
342    ) -> Result<BatchDeleteResponse, Self::Error> {
343        let BatchDeleteRequest { keys, prev_kv } = req;
344
345        let mut prev_kvs = if prev_kv {
346            Vec::with_capacity(keys.len())
347        } else {
348            vec![]
349        };
350        let mut batch = LogBatch::with_capacity(keys.len());
351        let engine = self.engine.read().unwrap();
352        for key in keys {
353            if prev_kv && let Some(prev) = engine_get(&engine, &key)? {
354                prev_kvs.push(prev);
355            }
356            batch.delete(SYSTEM_NAMESPACE, key);
357        }
358        engine
359            .write(&mut batch, false)
360            .context(RaftEngineSnafu)
361            .map_err(BoxedError::new)
362            .context(meta_error::ExternalSnafu)?;
363        Ok(BatchDeleteResponse { prev_kvs })
364    }
365
366    async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>, Self::Error> {
367        engine_get(&self.engine.read().unwrap(), key)
368    }
369
370    async fn exists(&self, key: &[u8]) -> Result<bool, Self::Error> {
371        Ok(engine_get(&self.engine.read().unwrap(), key)?.is_some())
372    }
373
374    async fn delete(&self, key: &[u8], prev_kv: bool) -> Result<Option<KeyValue>, Self::Error> {
375        let engine = self.engine.read().unwrap();
376        let prev = if prev_kv {
377            engine_get(&engine, key)?
378        } else {
379            None
380        };
381        engine_delete(&engine, key)?;
382        Ok(prev)
383    }
384}
385
386fn engine_get(engine: &Engine, key: &[u8]) -> meta_error::Result<Option<KeyValue>> {
387    let res = engine.get(SYSTEM_NAMESPACE, key);
388    Ok(res.map(|value| KeyValue {
389        key: key.to_vec(),
390        value,
391    }))
392}
393
394fn engine_put(engine: &Engine, key: Vec<u8>, value: Vec<u8>) -> meta_error::Result<()> {
395    let mut batch = LogBatch::with_capacity(1);
396    batch
397        .put(SYSTEM_NAMESPACE, key, value)
398        .context(RaftEngineSnafu)
399        .map_err(BoxedError::new)
400        .context(meta_error::ExternalSnafu)?;
401    engine
402        .write(&mut batch, false)
403        .context(RaftEngineSnafu)
404        .map_err(BoxedError::new)
405        .context(meta_error::ExternalSnafu)?;
406    Ok(())
407}
408
409fn engine_delete(engine: &Engine, key: &[u8]) -> meta_error::Result<()> {
410    let mut batch = LogBatch::with_capacity(1);
411    batch.delete(SYSTEM_NAMESPACE, key.to_vec());
412    engine
413        .write(&mut batch, false)
414        .context(RaftEngineSnafu)
415        .map_err(BoxedError::new)
416        .context(meta_error::ExternalSnafu)?;
417    Ok(())
418}
419
420#[cfg(test)]
421mod tests {
422    use std::collections::HashSet;
423    use std::sync::Arc;
424
425    use common_meta::kv_backend::test::{
426        prepare_kv, test_kv_batch_delete, test_kv_batch_get, test_kv_compare_and_put,
427        test_kv_delete_range, test_kv_put, test_kv_range, test_kv_range_2,
428    };
429    use common_meta::rpc::store::{CompareAndPutRequest, CompareAndPutResponse};
430    use common_test_util::temp_dir::create_temp_dir;
431
432    use super::*;
433
434    fn build_kv_backend(dir: String) -> RaftEngineBackend {
435        RaftEngineBackend::try_open_with_cfg(dir, &KvBackendConfig::default()).unwrap()
436    }
437
438    #[tokio::test]
439    async fn test_raft_engine_kv() {
440        let dir = create_temp_dir("raft-engine-kv");
441        let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
442        assert!(backend.get(b"hello").await.unwrap().is_none());
443
444        let response = backend
445            .put(PutRequest {
446                key: b"hello".to_vec(),
447                value: b"world".to_vec(),
448                prev_kv: false,
449            })
450            .await
451            .unwrap();
452        assert!(response.prev_kv.is_none());
453        assert_eq!(
454            b"world".as_slice(),
455            &backend.get(b"hello").await.unwrap().unwrap().value
456        );
457    }
458
459    #[tokio::test]
460    async fn test_compare_and_put() {
461        let dir = create_temp_dir("compare_and_put");
462        let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
463
464        let key = b"hello".to_vec();
465        backend
466            .put(PutRequest {
467                key: key.clone(),
468                value: b"word".to_vec(),
469                prev_kv: false,
470            })
471            .await
472            .unwrap();
473
474        let CompareAndPutResponse { success, prev_kv } = backend
475            .compare_and_put(CompareAndPutRequest {
476                key: key.clone(),
477                expect: b"world".to_vec(),
478                value: b"whatever".to_vec(),
479            })
480            .await
481            .unwrap();
482        assert!(!success);
483        assert_eq!(b"word".as_slice(), &prev_kv.unwrap().value);
484
485        let CompareAndPutResponse { success, prev_kv } = backend
486            .compare_and_put(CompareAndPutRequest {
487                key: key.clone(),
488                expect: b"word".to_vec(),
489                value: b"world".to_vec(),
490            })
491            .await
492            .unwrap();
493        assert!(success);
494        // Do not return prev_kv on success
495        assert!(prev_kv.is_none());
496
497        assert_eq!(
498            b"world".as_slice(),
499            &backend.get(b"hello").await.unwrap().unwrap().value
500        );
501    }
502
503    fn build_batch_key_values(start: usize, end: usize) -> Vec<KeyValue> {
504        (start..end)
505            .map(|idx| {
506                let bytes = idx.to_ne_bytes().to_vec();
507                KeyValue {
508                    key: bytes.clone(),
509                    value: bytes,
510                }
511            })
512            .collect()
513    }
514
515    #[tokio::test]
516    async fn test_compare_and_put_empty() {
517        let dir = create_temp_dir("compare_and_put_empty");
518        let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
519        let CompareAndPutResponse { success, prev_kv } = backend
520            .compare_and_put(CompareAndPutRequest {
521                key: b"hello".to_vec(),
522                expect: vec![],
523                value: b"world".to_vec(),
524            })
525            .await
526            .unwrap();
527        assert!(success);
528        assert!(prev_kv.is_none());
529
530        assert_eq!(
531            b"world".as_slice(),
532            &backend.get(b"hello").await.unwrap().unwrap().value
533        );
534    }
535
536    #[tokio::test]
537    async fn test_batch_put_and_scan_delete() {
538        let dir = create_temp_dir("compare_and_put");
539        let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
540
541        // put 0..10
542        let BatchPutResponse { prev_kvs } = backend
543            .batch_put(BatchPutRequest {
544                kvs: build_batch_key_values(0, 10),
545                prev_kv: false,
546            })
547            .await
548            .unwrap();
549        assert_eq!(0, prev_kvs.len());
550
551        let BatchPutResponse { prev_kvs } = backend
552            .batch_put(BatchPutRequest {
553                kvs: build_batch_key_values(5, 15),
554                prev_kv: true,
555            })
556            .await
557            .unwrap();
558        let prev_kvs = prev_kvs
559            .into_iter()
560            .map(|kv| kv.key)
561            .collect::<HashSet<_>>();
562        assert_eq!(
563            build_batch_key_values(5, 10)
564                .into_iter()
565                .map(|kv| kv.key)
566                .collect::<HashSet<_>>(),
567            prev_kvs
568        );
569
570        // range 2..10
571        let RangeResponse { kvs, more } = backend
572            .range(RangeRequest {
573                key: 2usize.to_ne_bytes().to_vec(),
574                range_end: 10usize.to_ne_bytes().to_vec(),
575                limit: 0,
576                keys_only: false,
577            })
578            .await
579            .unwrap();
580        assert!(!more);
581        assert_eq!(
582            build_batch_key_values(2, 10)
583                .into_iter()
584                .map(|kv| kv.key)
585                .collect::<HashSet<_>>(),
586            kvs.into_iter().map(|kv| kv.key).collect::<HashSet<_>>()
587        );
588
589        //raneg 0..1000
590        let RangeResponse { kvs, more } = backend
591            .range(RangeRequest {
592                key: 0usize.to_ne_bytes().to_vec(),
593                range_end: 1000usize.to_ne_bytes().to_vec(),
594                limit: 0,
595                keys_only: false,
596            })
597            .await
598            .unwrap();
599        assert!(!more);
600        assert_eq!(
601            build_batch_key_values(0, 15)
602                .into_iter()
603                .map(|kv| kv.key)
604                .collect::<HashSet<_>>(),
605            kvs.into_iter().map(|kv| kv.key).collect::<HashSet<_>>()
606        );
607
608        // then delete 3..7
609        let BatchDeleteResponse { prev_kvs } = backend
610            .batch_delete(BatchDeleteRequest {
611                keys: build_batch_key_values(3, 7)
612                    .into_iter()
613                    .map(|kv| kv.key)
614                    .collect(),
615                prev_kv: true,
616            })
617            .await
618            .unwrap();
619        assert_eq!(
620            build_batch_key_values(3, 7)
621                .into_iter()
622                .map(|kv| kv.key)
623                .collect::<HashSet<_>>(),
624            prev_kvs
625                .into_iter()
626                .map(|kv| kv.key)
627                .collect::<HashSet<_>>()
628        );
629
630        // finally assert existing keys to be 0..3 ∪ 7..15
631        let RangeResponse { kvs, more } = backend
632            .range(RangeRequest {
633                key: 0usize.to_ne_bytes().to_vec(),
634                range_end: 1000usize.to_ne_bytes().to_vec(),
635                limit: 0,
636                keys_only: false,
637            })
638            .await
639            .unwrap();
640        assert!(!more);
641
642        let keys = kvs.into_iter().map(|kv| kv.key).collect::<HashSet<_>>();
643        assert_eq!(
644            build_batch_key_values(0, 3)
645                .into_iter()
646                .chain(build_batch_key_values(7, 15))
647                .map(|kv| kv.key)
648                .collect::<HashSet<_>>(),
649            keys
650        );
651    }
652
653    #[tokio::test]
654    async fn test_range() {
655        let dir = create_temp_dir("range");
656        let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
657        prepare_kv(&backend).await;
658
659        test_kv_range(&backend).await;
660    }
661
662    #[tokio::test]
663    async fn test_range_2() {
664        let dir = create_temp_dir("range2");
665        let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
666
667        test_kv_range_2(&backend).await;
668    }
669
670    #[tokio::test]
671    async fn test_put() {
672        let dir = create_temp_dir("put");
673        let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
674        prepare_kv(&backend).await;
675
676        test_kv_put(&backend).await;
677    }
678
679    #[tokio::test]
680    async fn test_batch_get() {
681        let dir = create_temp_dir("batch_get");
682        let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
683        prepare_kv(&backend).await;
684
685        test_kv_batch_get(&backend).await;
686    }
687
688    #[tokio::test]
689    async fn test_batch_delete() {
690        let dir = create_temp_dir("batch_delete");
691        let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
692        prepare_kv(&backend).await;
693
694        test_kv_batch_delete(&backend).await;
695    }
696
697    #[tokio::test]
698    async fn test_delete_range() {
699        let dir = create_temp_dir("delete_range");
700        let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
701        prepare_kv(&backend).await;
702
703        test_kv_delete_range(&backend).await;
704    }
705
706    #[tokio::test(flavor = "multi_thread")]
707    async fn test_compare_and_put_2() {
708        let dir = create_temp_dir("compare_and_put");
709        let backend = build_kv_backend(dir.path().to_str().unwrap().to_string());
710        prepare_kv(&backend).await;
711
712        test_kv_compare_and_put(Arc::new(backend)).await;
713    }
714}