Skip to main content

common_meta/
kv_backend.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use common_error::ext::ErrorExt;
20pub use txn::TxnService;
21
22use crate::error::Error;
23use crate::kv_backend::txn::{Txn, TxnOpResponse};
24use crate::rpc::KeyValue;
25use crate::rpc::store::{
26    BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
27    BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
28    DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
29};
30
31pub mod chroot;
32pub mod etcd;
33pub mod memory;
34#[cfg(any(feature = "mysql_kvbackend", feature = "pg_kvbackend"))]
35pub mod rds;
36pub mod read_only;
37pub mod test;
38#[cfg(any(test, feature = "testing"))]
39pub mod test_util;
40pub mod txn;
41pub mod util;
42pub type KvBackendRef<E = Error> = Arc<dyn KvBackend<Error = E> + Send + Sync>;
43
44#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
45// The default meta table name, default is "greptime_metakv".
46pub const DEFAULT_META_TABLE_NAME: &str = "greptime_metakv";
47
48#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
49// The default lock id for election, default is 1.
50pub const DEFAULT_META_ELECTION_LOCK_ID: u64 = 1;
51
52#[async_trait]
53pub trait KvBackend: TxnService
54where
55    Self::Error: ErrorExt,
56{
57    fn name(&self) -> &str;
58
59    fn as_any(&self) -> &dyn Any;
60
61    async fn range(&self, req: RangeRequest) -> Result<RangeResponse, Self::Error>;
62
63    async fn put(&self, req: PutRequest) -> Result<PutResponse, Self::Error>;
64
65    async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse, Self::Error>;
66
67    async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse, Self::Error>;
68
69    async fn delete_range(
70        &self,
71        req: DeleteRangeRequest,
72    ) -> Result<DeleteRangeResponse, Self::Error>;
73
74    async fn batch_delete(
75        &self,
76        req: BatchDeleteRequest,
77    ) -> Result<BatchDeleteResponse, Self::Error>;
78
79    // The following methods are implemented based on the above methods,
80    // and a higher-level interface is provided for to simplify usage.
81
82    async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>, Self::Error> {
83        let req = RangeRequest::new().with_key(key.to_vec());
84        let mut resp = self.range(req).await?;
85        Ok(if resp.kvs.is_empty() {
86            None
87        } else {
88            Some(resp.kvs.remove(0))
89        })
90    }
91
92    /// CAS: Compares the value at the key with the given value, and if they are
93    /// equal, puts the new value at the key.
94    async fn compare_and_put(
95        &self,
96        req: CompareAndPutRequest,
97    ) -> Result<CompareAndPutResponse, Self::Error> {
98        let CompareAndPutRequest { key, expect, value } = req;
99        let txn = if expect.is_empty() {
100            Txn::put_if_not_exists(key, value)
101        } else {
102            Txn::compare_and_put(key, expect, value)
103        };
104        let txn_res = self.txn(txn).await?;
105
106        let success = txn_res.succeeded;
107        // The response is guaranteed to have at most one element.
108        let op_res = txn_res.responses.into_iter().next();
109        let prev_kv = match op_res {
110            Some(TxnOpResponse::ResponsePut(res)) => res.prev_kv,
111            Some(TxnOpResponse::ResponseGet(res)) => res.kvs.into_iter().next(),
112            Some(TxnOpResponse::ResponseDelete(res)) => res.prev_kvs.into_iter().next(),
113            None => None,
114        };
115
116        Ok(CompareAndPutResponse { success, prev_kv })
117    }
118
119    /// Puts a value at a key. If `if_not_exists` is `true`, the operation
120    /// ensures the key does not exist before applying the PUT operation.
121    /// Otherwise, it simply applies the PUT operation without checking for
122    /// the key's existence.
123    async fn put_conditionally(
124        &self,
125        key: Vec<u8>,
126        value: Vec<u8>,
127        if_not_exists: bool,
128    ) -> Result<bool, Self::Error> {
129        let success = if if_not_exists {
130            let req = CompareAndPutRequest::new()
131                .with_key(key)
132                .with_expect(vec![])
133                .with_value(value);
134            let res = self.compare_and_put(req).await?;
135            res.success
136        } else {
137            let req = PutRequest::new().with_key(key).with_value(value);
138            self.put(req).await?;
139            true
140        };
141
142        Ok(success)
143    }
144
145    /// Check if the key exists, not returning the value.
146    /// If the value is large, this method is more efficient than `get`.
147    async fn exists(&self, key: &[u8]) -> Result<bool, Self::Error> {
148        let req = RangeRequest::new().with_key(key.to_vec()).with_keys_only();
149        let resp = self.range(req).await?;
150        Ok(!resp.kvs.is_empty())
151    }
152
153    /// Returns previous key-value pair if `prev_kv` is `true`.
154    async fn delete(&self, key: &[u8], prev_kv: bool) -> Result<Option<KeyValue>, Self::Error> {
155        let mut req = DeleteRangeRequest::new().with_key(key.to_vec());
156        if prev_kv {
157            req = req.with_prev_kv();
158        }
159
160        let resp = self.delete_range(req).await?;
161
162        if prev_kv {
163            Ok(resp.prev_kvs.into_iter().next())
164        } else {
165            Ok(None)
166        }
167    }
168}
169
170pub trait ResettableKvBackend: KvBackend
171where
172    Self::Error: ErrorExt,
173{
174    fn reset(&self);
175
176    /// Upcast as `KvBackendRef`. Since https://github.com/rust-lang/rust/issues/65991 is not yet stable.
177    fn as_kv_backend_ref(self: Arc<Self>) -> KvBackendRef<Self::Error>;
178}
179
180pub type ResettableKvBackendRef<E = Error> = Arc<dyn ResettableKvBackend<Error = E> + Send + Sync>;