common_meta/kv_backend/
chroot.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use crate::error::Error;
18use crate::kv_backend::txn::{Txn, TxnOp, TxnOpResponse, TxnResponse};
19use crate::kv_backend::{KvBackend, KvBackendRef, TxnService};
20use crate::rpc::store::{
21    BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
22    BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
23    DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
24};
25use crate::rpc::KeyValue;
26
27pub struct ChrootKvBackend {
28    root: Vec<u8>,
29    inner: KvBackendRef,
30}
31
32impl ChrootKvBackend {
33    pub fn new(root: Vec<u8>, inner: KvBackendRef) -> ChrootKvBackend {
34        debug_assert!(!root.is_empty());
35        ChrootKvBackend { root, inner }
36    }
37}
38
39#[async_trait::async_trait]
40impl TxnService for ChrootKvBackend {
41    type Error = Error;
42
43    async fn txn(&self, txn: Txn) -> Result<TxnResponse, Self::Error> {
44        let txn = self.txn_prepend_root(txn);
45        let txn_res = self.inner.txn(txn).await?;
46        Ok(self.chroot_txn_response(txn_res))
47    }
48
49    fn max_txn_ops(&self) -> usize {
50        self.inner.max_txn_ops()
51    }
52}
53
54#[async_trait::async_trait]
55impl KvBackend for ChrootKvBackend {
56    fn name(&self) -> &str {
57        self.inner.name()
58    }
59
60    fn as_any(&self) -> &dyn Any {
61        self
62    }
63
64    async fn range(&self, mut req: RangeRequest) -> Result<RangeResponse, Self::Error> {
65        req.key = self.key_prepend_root(req.key);
66        req.range_end = self.range_end_prepend_root(req.range_end);
67        let mut res = self.inner.range(req).await?;
68        res.kvs = res
69            .kvs
70            .drain(..)
71            .map(self.chroot_key_value_with())
72            .collect();
73        Ok(res)
74    }
75
76    async fn put(&self, mut req: PutRequest) -> Result<PutResponse, Self::Error> {
77        req.key = self.key_prepend_root(req.key);
78        let mut res = self.inner.put(req).await?;
79        res.prev_kv = res.prev_kv.take().map(self.chroot_key_value_with());
80        Ok(res)
81    }
82
83    async fn batch_put(&self, mut req: BatchPutRequest) -> Result<BatchPutResponse, Self::Error> {
84        for kv in req.kvs.iter_mut() {
85            kv.key = self.key_prepend_root(kv.key.drain(..).collect());
86        }
87        let mut res = self.inner.batch_put(req).await?;
88        res.prev_kvs = res
89            .prev_kvs
90            .drain(..)
91            .map(self.chroot_key_value_with())
92            .collect();
93        Ok(res)
94    }
95
96    async fn batch_get(&self, mut req: BatchGetRequest) -> Result<BatchGetResponse, Self::Error> {
97        req.keys = req
98            .keys
99            .drain(..)
100            .map(|key| self.key_prepend_root(key))
101            .collect();
102        let mut res = self.inner.batch_get(req).await?;
103        res.kvs = res
104            .kvs
105            .drain(..)
106            .map(self.chroot_key_value_with())
107            .collect();
108        Ok(res)
109    }
110
111    async fn compare_and_put(
112        &self,
113        mut req: CompareAndPutRequest,
114    ) -> Result<CompareAndPutResponse, Self::Error> {
115        req.key = self.key_prepend_root(req.key);
116        let mut res = self.inner.compare_and_put(req).await?;
117        res.prev_kv = res.prev_kv.take().map(self.chroot_key_value_with());
118        Ok(res)
119    }
120
121    async fn delete_range(
122        &self,
123        mut req: DeleteRangeRequest,
124    ) -> Result<DeleteRangeResponse, Self::Error> {
125        req.key = self.key_prepend_root(req.key);
126        req.range_end = self.range_end_prepend_root(req.range_end);
127        let mut res = self.inner.delete_range(req).await?;
128        res.prev_kvs = res
129            .prev_kvs
130            .drain(..)
131            .map(self.chroot_key_value_with())
132            .collect();
133        Ok(res)
134    }
135
136    async fn batch_delete(
137        &self,
138        mut req: BatchDeleteRequest,
139    ) -> Result<BatchDeleteResponse, Self::Error> {
140        req.keys = req
141            .keys
142            .drain(..)
143            .map(|key| self.key_prepend_root(key))
144            .collect();
145        let mut res = self.inner.batch_delete(req).await?;
146        res.prev_kvs = res
147            .prev_kvs
148            .drain(..)
149            .map(self.chroot_key_value_with())
150            .collect();
151        Ok(res)
152    }
153}
154
155impl ChrootKvBackend {
156    fn key_strip_root(&self, mut key: Vec<u8>) -> Vec<u8> {
157        let root = &self.root;
158        debug_assert!(
159            key.starts_with(root),
160            "key={}, root={}",
161            String::from_utf8_lossy(&key),
162            String::from_utf8_lossy(root),
163        );
164        key.split_off(root.len())
165    }
166
167    fn chroot_key_value_with(&self) -> impl FnMut(KeyValue) -> KeyValue + '_ {
168        |kv| KeyValue {
169            key: self.key_strip_root(kv.key),
170            value: kv.value,
171        }
172    }
173    fn chroot_txn_response(&self, mut txn_res: TxnResponse) -> TxnResponse {
174        for resp in txn_res.responses.iter_mut() {
175            match resp {
176                TxnOpResponse::ResponsePut(r) => {
177                    r.prev_kv = r.prev_kv.take().map(self.chroot_key_value_with());
178                }
179                TxnOpResponse::ResponseGet(r) => {
180                    r.kvs = r.kvs.drain(..).map(self.chroot_key_value_with()).collect();
181                }
182                TxnOpResponse::ResponseDelete(r) => {
183                    r.prev_kvs = r
184                        .prev_kvs
185                        .drain(..)
186                        .map(self.chroot_key_value_with())
187                        .collect();
188                }
189            }
190        }
191        txn_res
192    }
193
194    fn key_prepend_root(&self, mut key: Vec<u8>) -> Vec<u8> {
195        let mut new_key = self.root.clone();
196        new_key.append(&mut key);
197        new_key
198    }
199
200    // see namespace.prefixInterval - https://github.com/etcd-io/etcd/blob/v3.5.10/client/v3/namespace/util.go
201    fn range_end_prepend_root(&self, mut range_end: Vec<u8>) -> Vec<u8> {
202        let root = &self.root;
203        if range_end == [0] {
204            // the edge of the keyspace
205            let mut new_end = root.clone();
206            let mut ok = false;
207            for i in (0..new_end.len()).rev() {
208                new_end[i] = new_end[i].wrapping_add(1);
209                if new_end[i] != 0 {
210                    ok = true;
211                    break;
212                }
213            }
214            if !ok {
215                // 0xff..ff => 0x00
216                new_end = vec![0];
217            }
218            new_end
219        } else if !range_end.is_empty() {
220            let mut new_end = root.clone();
221            new_end.append(&mut range_end);
222            new_end
223        } else {
224            vec![]
225        }
226    }
227
228    fn txn_prepend_root(&self, mut txn: Txn) -> Txn {
229        let op_prepend_root = |op: TxnOp| match op {
230            TxnOp::Put(k, v) => TxnOp::Put(self.key_prepend_root(k), v),
231            TxnOp::Get(k) => TxnOp::Get(self.key_prepend_root(k)),
232            TxnOp::Delete(k) => TxnOp::Delete(self.key_prepend_root(k)),
233        };
234        txn.req.success = txn.req.success.drain(..).map(op_prepend_root).collect();
235        txn.req.failure = txn.req.failure.drain(..).map(op_prepend_root).collect();
236        txn.req.compare = txn
237            .req
238            .compare
239            .drain(..)
240            .map(|cmp| super::txn::Compare {
241                key: self.key_prepend_root(cmp.key),
242                cmp: cmp.cmp,
243                target: cmp.target,
244            })
245            .collect();
246        txn
247    }
248}
249
250#[cfg(test)]
251mod tests {
252    use std::sync::Arc;
253
254    use crate::kv_backend::chroot::ChrootKvBackend;
255    use crate::kv_backend::memory::MemoryKvBackend;
256
257    #[test]
258    fn test_prefix_key_and_range_end() {
259        fn run_test_case(pfx: &[u8], key: &[u8], end: &[u8], w_key: &[u8], w_end: &[u8]) {
260            let chroot = ChrootKvBackend::new(pfx.into(), Arc::new(MemoryKvBackend::new()));
261            assert_eq!(chroot.key_prepend_root(key.into()), w_key);
262            assert_eq!(chroot.range_end_prepend_root(end.into()), w_end);
263        }
264
265        // single key
266        run_test_case(b"pfx/", b"a", b"", b"pfx/a", b"");
267
268        // range
269        run_test_case(b"pfx/", b"abc", b"def", b"pfx/abc", b"pfx/def");
270
271        // one-sided range (HACK - b'/' + 1 = b'0')
272        run_test_case(b"pfx/", b"abc", b"\0", b"pfx/abc", b"pfx0");
273
274        // one-sided range, end of keyspace
275        run_test_case(b"\xFF\xFF", b"abc", b"\0", b"\xff\xffabc", b"\0");
276    }
277}