common_meta/
kv_backend.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::sync::Arc;
17
18use async_trait::async_trait;
19use common_error::ext::ErrorExt;
20pub use txn::TxnService;
21
22use crate::error::Error;
23use crate::kv_backend::txn::{Txn, TxnOpResponse};
24use crate::rpc::store::{
25    BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
26    BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
27    DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
28};
29use crate::rpc::KeyValue;
30
31pub mod chroot;
32pub mod etcd;
33pub mod memory;
34#[cfg(any(feature = "mysql_kvbackend", feature = "pg_kvbackend"))]
35pub mod rds;
36pub mod test;
37pub mod txn;
38pub mod util;
39pub type KvBackendRef<E = Error> = Arc<dyn KvBackend<Error = E> + Send + Sync>;
40
41#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
42// The default meta table name, default is "greptime_metakv".
43pub const DEFAULT_META_TABLE_NAME: &str = "greptime_metakv";
44
45#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
46// The default lock id for election, default is 1.
47pub const DEFAULT_META_ELECTION_LOCK_ID: u64 = 1;
48
49#[async_trait]
50pub trait KvBackend: TxnService
51where
52    Self::Error: ErrorExt,
53{
54    fn name(&self) -> &str;
55
56    fn as_any(&self) -> &dyn Any;
57
58    async fn range(&self, req: RangeRequest) -> Result<RangeResponse, Self::Error>;
59
60    async fn put(&self, req: PutRequest) -> Result<PutResponse, Self::Error>;
61
62    async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse, Self::Error>;
63
64    async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse, Self::Error>;
65
66    async fn delete_range(
67        &self,
68        req: DeleteRangeRequest,
69    ) -> Result<DeleteRangeResponse, Self::Error>;
70
71    async fn batch_delete(
72        &self,
73        req: BatchDeleteRequest,
74    ) -> Result<BatchDeleteResponse, Self::Error>;
75
76    // The following methods are implemented based on the above methods,
77    // and a higher-level interface is provided for to simplify usage.
78
79    async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>, Self::Error> {
80        let req = RangeRequest::new().with_key(key.to_vec());
81        let mut resp = self.range(req).await?;
82        Ok(if resp.kvs.is_empty() {
83            None
84        } else {
85            Some(resp.kvs.remove(0))
86        })
87    }
88
89    /// CAS: Compares the value at the key with the given value, and if they are
90    /// equal, puts the new value at the key.
91    async fn compare_and_put(
92        &self,
93        req: CompareAndPutRequest,
94    ) -> Result<CompareAndPutResponse, Self::Error> {
95        let CompareAndPutRequest { key, expect, value } = req;
96        let txn = if expect.is_empty() {
97            Txn::put_if_not_exists(key, value)
98        } else {
99            Txn::compare_and_put(key, expect, value)
100        };
101        let txn_res = self.txn(txn).await?;
102
103        let success = txn_res.succeeded;
104        // The response is guaranteed to have at most one element.
105        let op_res = txn_res.responses.into_iter().next();
106        let prev_kv = match op_res {
107            Some(TxnOpResponse::ResponsePut(res)) => res.prev_kv,
108            Some(TxnOpResponse::ResponseGet(res)) => res.kvs.into_iter().next(),
109            Some(TxnOpResponse::ResponseDelete(res)) => res.prev_kvs.into_iter().next(),
110            None => None,
111        };
112
113        Ok(CompareAndPutResponse { success, prev_kv })
114    }
115
116    /// Puts a value at a key. If `if_not_exists` is `true`, the operation
117    /// ensures the key does not exist before applying the PUT operation.
118    /// Otherwise, it simply applies the PUT operation without checking for
119    /// the key's existence.
120    async fn put_conditionally(
121        &self,
122        key: Vec<u8>,
123        value: Vec<u8>,
124        if_not_exists: bool,
125    ) -> Result<bool, Self::Error> {
126        let success = if if_not_exists {
127            let req = CompareAndPutRequest::new()
128                .with_key(key)
129                .with_expect(vec![])
130                .with_value(value);
131            let res = self.compare_and_put(req).await?;
132            res.success
133        } else {
134            let req = PutRequest::new().with_key(key).with_value(value);
135            self.put(req).await?;
136            true
137        };
138
139        Ok(success)
140    }
141
142    /// Check if the key exists, not returning the value.
143    /// If the value is large, this method is more efficient than `get`.
144    async fn exists(&self, key: &[u8]) -> Result<bool, Self::Error> {
145        let req = RangeRequest::new().with_key(key.to_vec()).with_keys_only();
146        let resp = self.range(req).await?;
147        Ok(!resp.kvs.is_empty())
148    }
149
150    /// Returns previous key-value pair if `prev_kv` is `true`.
151    async fn delete(&self, key: &[u8], prev_kv: bool) -> Result<Option<KeyValue>, Self::Error> {
152        let mut req = DeleteRangeRequest::new().with_key(key.to_vec());
153        if prev_kv {
154            req = req.with_prev_kv();
155        }
156
157        let resp = self.delete_range(req).await?;
158
159        if prev_kv {
160            Ok(resp.prev_kvs.into_iter().next())
161        } else {
162            Ok(None)
163        }
164    }
165}
166
167pub trait ResettableKvBackend: KvBackend
168where
169    Self::Error: ErrorExt,
170{
171    fn reset(&self);
172
173    /// Upcast as `KvBackendRef`. Since https://github.com/rust-lang/rust/issues/65991 is not yet stable.
174    fn as_kv_backend_ref(self: Arc<Self>) -> KvBackendRef<Self::Error>;
175}
176
177pub type ResettableKvBackendRef<E = Error> = Arc<dyn ResettableKvBackend<Error = E> + Send + Sync>;