common_meta/
kv_backend.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use common_error::ext::ErrorExt;
20pub use txn::TxnService;
21
22use crate::error::Error;
23use crate::kv_backend::txn::{Txn, TxnOpResponse};
24use crate::rpc::KeyValue;
25use crate::rpc::store::{
26    BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
27    BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
28    DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
29};
30
31pub mod chroot;
32pub mod etcd;
33pub mod memory;
34#[cfg(any(feature = "mysql_kvbackend", feature = "pg_kvbackend"))]
35pub mod rds;
36pub mod test;
37#[cfg(any(test, feature = "testing"))]
38pub mod test_util;
39pub mod txn;
40pub mod util;
41pub type KvBackendRef<E = Error> = Arc<dyn KvBackend<Error = E> + Send + Sync>;
42
43#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
44// The default meta table name, default is "greptime_metakv".
45pub const DEFAULT_META_TABLE_NAME: &str = "greptime_metakv";
46
47#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
48// The default lock id for election, default is 1.
49pub const DEFAULT_META_ELECTION_LOCK_ID: u64 = 1;
50
51#[async_trait]
52pub trait KvBackend: TxnService
53where
54    Self::Error: ErrorExt,
55{
56    fn name(&self) -> &str;
57
58    fn as_any(&self) -> &dyn Any;
59
60    async fn range(&self, req: RangeRequest) -> Result<RangeResponse, Self::Error>;
61
62    async fn put(&self, req: PutRequest) -> Result<PutResponse, Self::Error>;
63
64    async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse, Self::Error>;
65
66    async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse, Self::Error>;
67
68    async fn delete_range(
69        &self,
70        req: DeleteRangeRequest,
71    ) -> Result<DeleteRangeResponse, Self::Error>;
72
73    async fn batch_delete(
74        &self,
75        req: BatchDeleteRequest,
76    ) -> Result<BatchDeleteResponse, Self::Error>;
77
78    // The following methods are implemented based on the above methods,
79    // and a higher-level interface is provided for to simplify usage.
80
81    async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>, Self::Error> {
82        let req = RangeRequest::new().with_key(key.to_vec());
83        let mut resp = self.range(req).await?;
84        Ok(if resp.kvs.is_empty() {
85            None
86        } else {
87            Some(resp.kvs.remove(0))
88        })
89    }
90
91    /// CAS: Compares the value at the key with the given value, and if they are
92    /// equal, puts the new value at the key.
93    async fn compare_and_put(
94        &self,
95        req: CompareAndPutRequest,
96    ) -> Result<CompareAndPutResponse, Self::Error> {
97        let CompareAndPutRequest { key, expect, value } = req;
98        let txn = if expect.is_empty() {
99            Txn::put_if_not_exists(key, value)
100        } else {
101            Txn::compare_and_put(key, expect, value)
102        };
103        let txn_res = self.txn(txn).await?;
104
105        let success = txn_res.succeeded;
106        // The response is guaranteed to have at most one element.
107        let op_res = txn_res.responses.into_iter().next();
108        let prev_kv = match op_res {
109            Some(TxnOpResponse::ResponsePut(res)) => res.prev_kv,
110            Some(TxnOpResponse::ResponseGet(res)) => res.kvs.into_iter().next(),
111            Some(TxnOpResponse::ResponseDelete(res)) => res.prev_kvs.into_iter().next(),
112            None => None,
113        };
114
115        Ok(CompareAndPutResponse { success, prev_kv })
116    }
117
118    /// Puts a value at a key. If `if_not_exists` is `true`, the operation
119    /// ensures the key does not exist before applying the PUT operation.
120    /// Otherwise, it simply applies the PUT operation without checking for
121    /// the key's existence.
122    async fn put_conditionally(
123        &self,
124        key: Vec<u8>,
125        value: Vec<u8>,
126        if_not_exists: bool,
127    ) -> Result<bool, Self::Error> {
128        let success = if if_not_exists {
129            let req = CompareAndPutRequest::new()
130                .with_key(key)
131                .with_expect(vec![])
132                .with_value(value);
133            let res = self.compare_and_put(req).await?;
134            res.success
135        } else {
136            let req = PutRequest::new().with_key(key).with_value(value);
137            self.put(req).await?;
138            true
139        };
140
141        Ok(success)
142    }
143
144    /// Check if the key exists, not returning the value.
145    /// If the value is large, this method is more efficient than `get`.
146    async fn exists(&self, key: &[u8]) -> Result<bool, Self::Error> {
147        let req = RangeRequest::new().with_key(key.to_vec()).with_keys_only();
148        let resp = self.range(req).await?;
149        Ok(!resp.kvs.is_empty())
150    }
151
152    /// Returns previous key-value pair if `prev_kv` is `true`.
153    async fn delete(&self, key: &[u8], prev_kv: bool) -> Result<Option<KeyValue>, Self::Error> {
154        let mut req = DeleteRangeRequest::new().with_key(key.to_vec());
155        if prev_kv {
156            req = req.with_prev_kv();
157        }
158
159        let resp = self.delete_range(req).await?;
160
161        if prev_kv {
162            Ok(resp.prev_kvs.into_iter().next())
163        } else {
164            Ok(None)
165        }
166    }
167}
168
169pub trait ResettableKvBackend: KvBackend
170where
171    Self::Error: ErrorExt,
172{
173    fn reset(&self);
174
175    /// Upcast as `KvBackendRef`. Since https://github.com/rust-lang/rust/issues/65991 is not yet stable.
176    fn as_kv_backend_ref(self: Arc<Self>) -> KvBackendRef<Self::Error>;
177}
178
179pub type ResettableKvBackendRef<E = Error> = Arc<dyn ResettableKvBackend<Error = E> + Send + Sync>;