common_meta/kv_backend/
chroot.rs1use std::any::Any;
16
17use crate::error::Error;
18use crate::kv_backend::txn::{Txn, TxnOp, TxnOpResponse, TxnResponse};
19use crate::kv_backend::{KvBackend, KvBackendRef, TxnService};
20use crate::rpc::store::{
21 BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
22 BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
23 DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
24};
25use crate::rpc::KeyValue;
26
27pub struct ChrootKvBackend {
28 root: Vec<u8>,
29 inner: KvBackendRef,
30}
31
32impl ChrootKvBackend {
33 pub fn new(root: Vec<u8>, inner: KvBackendRef) -> ChrootKvBackend {
34 debug_assert!(!root.is_empty());
35 ChrootKvBackend { root, inner }
36 }
37}
38
39#[async_trait::async_trait]
40impl TxnService for ChrootKvBackend {
41 type Error = Error;
42
43 async fn txn(&self, txn: Txn) -> Result<TxnResponse, Self::Error> {
44 let txn = self.txn_prepend_root(txn);
45 let txn_res = self.inner.txn(txn).await?;
46 Ok(self.chroot_txn_response(txn_res))
47 }
48
49 fn max_txn_ops(&self) -> usize {
50 self.inner.max_txn_ops()
51 }
52}
53
54#[async_trait::async_trait]
55impl KvBackend for ChrootKvBackend {
56 fn name(&self) -> &str {
57 self.inner.name()
58 }
59
60 fn as_any(&self) -> &dyn Any {
61 self
62 }
63
64 async fn range(&self, mut req: RangeRequest) -> Result<RangeResponse, Self::Error> {
65 req.key = self.key_prepend_root(req.key);
66 req.range_end = self.range_end_prepend_root(req.range_end);
67 let mut res = self.inner.range(req).await?;
68 res.kvs = res
69 .kvs
70 .drain(..)
71 .map(self.chroot_key_value_with())
72 .collect();
73 Ok(res)
74 }
75
76 async fn put(&self, mut req: PutRequest) -> Result<PutResponse, Self::Error> {
77 req.key = self.key_prepend_root(req.key);
78 let mut res = self.inner.put(req).await?;
79 res.prev_kv = res.prev_kv.take().map(self.chroot_key_value_with());
80 Ok(res)
81 }
82
83 async fn batch_put(&self, mut req: BatchPutRequest) -> Result<BatchPutResponse, Self::Error> {
84 for kv in req.kvs.iter_mut() {
85 kv.key = self.key_prepend_root(kv.key.drain(..).collect());
86 }
87 let mut res = self.inner.batch_put(req).await?;
88 res.prev_kvs = res
89 .prev_kvs
90 .drain(..)
91 .map(self.chroot_key_value_with())
92 .collect();
93 Ok(res)
94 }
95
96 async fn batch_get(&self, mut req: BatchGetRequest) -> Result<BatchGetResponse, Self::Error> {
97 req.keys = req
98 .keys
99 .drain(..)
100 .map(|key| self.key_prepend_root(key))
101 .collect();
102 let mut res = self.inner.batch_get(req).await?;
103 res.kvs = res
104 .kvs
105 .drain(..)
106 .map(self.chroot_key_value_with())
107 .collect();
108 Ok(res)
109 }
110
111 async fn compare_and_put(
112 &self,
113 mut req: CompareAndPutRequest,
114 ) -> Result<CompareAndPutResponse, Self::Error> {
115 req.key = self.key_prepend_root(req.key);
116 let mut res = self.inner.compare_and_put(req).await?;
117 res.prev_kv = res.prev_kv.take().map(self.chroot_key_value_with());
118 Ok(res)
119 }
120
121 async fn delete_range(
122 &self,
123 mut req: DeleteRangeRequest,
124 ) -> Result<DeleteRangeResponse, Self::Error> {
125 req.key = self.key_prepend_root(req.key);
126 req.range_end = self.range_end_prepend_root(req.range_end);
127 let mut res = self.inner.delete_range(req).await?;
128 res.prev_kvs = res
129 .prev_kvs
130 .drain(..)
131 .map(self.chroot_key_value_with())
132 .collect();
133 Ok(res)
134 }
135
136 async fn batch_delete(
137 &self,
138 mut req: BatchDeleteRequest,
139 ) -> Result<BatchDeleteResponse, Self::Error> {
140 req.keys = req
141 .keys
142 .drain(..)
143 .map(|key| self.key_prepend_root(key))
144 .collect();
145 let mut res = self.inner.batch_delete(req).await?;
146 res.prev_kvs = res
147 .prev_kvs
148 .drain(..)
149 .map(self.chroot_key_value_with())
150 .collect();
151 Ok(res)
152 }
153}
154
155impl ChrootKvBackend {
156 fn key_strip_root(&self, mut key: Vec<u8>) -> Vec<u8> {
157 let root = &self.root;
158 debug_assert!(
159 key.starts_with(root),
160 "key={}, root={}",
161 String::from_utf8_lossy(&key),
162 String::from_utf8_lossy(root),
163 );
164 key.split_off(root.len())
165 }
166
167 fn chroot_key_value_with(&self) -> impl FnMut(KeyValue) -> KeyValue + '_ {
168 |kv| KeyValue {
169 key: self.key_strip_root(kv.key),
170 value: kv.value,
171 }
172 }
173 fn chroot_txn_response(&self, mut txn_res: TxnResponse) -> TxnResponse {
174 for resp in txn_res.responses.iter_mut() {
175 match resp {
176 TxnOpResponse::ResponsePut(r) => {
177 r.prev_kv = r.prev_kv.take().map(self.chroot_key_value_with());
178 }
179 TxnOpResponse::ResponseGet(r) => {
180 r.kvs = r.kvs.drain(..).map(self.chroot_key_value_with()).collect();
181 }
182 TxnOpResponse::ResponseDelete(r) => {
183 r.prev_kvs = r
184 .prev_kvs
185 .drain(..)
186 .map(self.chroot_key_value_with())
187 .collect();
188 }
189 }
190 }
191 txn_res
192 }
193
194 fn key_prepend_root(&self, mut key: Vec<u8>) -> Vec<u8> {
195 let mut new_key = self.root.clone();
196 new_key.append(&mut key);
197 new_key
198 }
199
200 fn range_end_prepend_root(&self, mut range_end: Vec<u8>) -> Vec<u8> {
202 let root = &self.root;
203 if range_end == [0] {
204 let mut new_end = root.clone();
206 let mut ok = false;
207 for i in (0..new_end.len()).rev() {
208 new_end[i] = new_end[i].wrapping_add(1);
209 if new_end[i] != 0 {
210 ok = true;
211 break;
212 }
213 }
214 if !ok {
215 new_end = vec![0];
217 }
218 new_end
219 } else if !range_end.is_empty() {
220 let mut new_end = root.clone();
221 new_end.append(&mut range_end);
222 new_end
223 } else {
224 vec![]
225 }
226 }
227
228 fn txn_prepend_root(&self, mut txn: Txn) -> Txn {
229 let op_prepend_root = |op: TxnOp| match op {
230 TxnOp::Put(k, v) => TxnOp::Put(self.key_prepend_root(k), v),
231 TxnOp::Get(k) => TxnOp::Get(self.key_prepend_root(k)),
232 TxnOp::Delete(k) => TxnOp::Delete(self.key_prepend_root(k)),
233 };
234 txn.req.success = txn.req.success.drain(..).map(op_prepend_root).collect();
235 txn.req.failure = txn.req.failure.drain(..).map(op_prepend_root).collect();
236 txn.req.compare = txn
237 .req
238 .compare
239 .drain(..)
240 .map(|cmp| super::txn::Compare {
241 key: self.key_prepend_root(cmp.key),
242 cmp: cmp.cmp,
243 target: cmp.target,
244 })
245 .collect();
246 txn
247 }
248}
249
250#[cfg(test)]
251mod tests {
252 use std::sync::Arc;
253
254 use crate::kv_backend::chroot::ChrootKvBackend;
255 use crate::kv_backend::memory::MemoryKvBackend;
256
257 #[test]
258 fn test_prefix_key_and_range_end() {
259 fn run_test_case(pfx: &[u8], key: &[u8], end: &[u8], w_key: &[u8], w_end: &[u8]) {
260 let chroot = ChrootKvBackend::new(pfx.into(), Arc::new(MemoryKvBackend::new()));
261 assert_eq!(chroot.key_prepend_root(key.into()), w_key);
262 assert_eq!(chroot.range_end_prepend_root(end.into()), w_end);
263 }
264
265 run_test_case(b"pfx/", b"a", b"", b"pfx/a", b"");
267
268 run_test_case(b"pfx/", b"abc", b"def", b"pfx/abc", b"pfx/def");
270
271 run_test_case(b"pfx/", b"abc", b"\0", b"pfx/abc", b"pfx0");
273
274 run_test_case(b"\xFF\xFF", b"abc", b"\0", b"\xff\xffabc", b"\0");
276 }
277}