catalog/kvbackend/
client.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::fmt::Debug;
17use std::sync::atomic::{AtomicUsize, Ordering};
18use std::sync::{Arc, Mutex};
19use std::time::Duration;
20
21use common_error::ext::BoxedError;
22use common_meta::cache_invalidator::KvCacheInvalidator;
23use common_meta::error::Error::CacheNotGet;
24use common_meta::error::{CacheNotGetSnafu, Error, ExternalSnafu, GetKvCacheSnafu, Result};
25use common_meta::kv_backend::txn::{Txn, TxnResponse};
26use common_meta::kv_backend::{KvBackend, KvBackendRef, TxnService};
27use common_meta::rpc::store::{
28    BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
29    BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
30    DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
31};
32use common_meta::rpc::KeyValue;
33use common_telemetry::debug;
34use meta_client::client::MetaClient;
35use moka::future::{Cache, CacheBuilder};
36use snafu::{OptionExt, ResultExt};
37
38use crate::metrics::{
39    METRIC_CATALOG_KV_BATCH_GET, METRIC_CATALOG_KV_GET, METRIC_CATALOG_KV_REMOTE_GET,
40};
41
42const DEFAULT_CACHE_MAX_CAPACITY: u64 = 10000;
43const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(10 * 60);
44const DEFAULT_CACHE_TTI: Duration = Duration::from_secs(5 * 60);
45
46pub struct CachedKvBackendBuilder {
47    cache_max_capacity: Option<u64>,
48    cache_ttl: Option<Duration>,
49    cache_tti: Option<Duration>,
50    inner: KvBackendRef,
51}
52
53impl CachedKvBackendBuilder {
54    pub fn new(inner: KvBackendRef) -> Self {
55        Self {
56            cache_max_capacity: None,
57            cache_ttl: None,
58            cache_tti: None,
59            inner,
60        }
61    }
62
63    pub fn cache_max_capacity(mut self, cache_max_capacity: u64) -> Self {
64        self.cache_max_capacity.replace(cache_max_capacity);
65        self
66    }
67
68    pub fn cache_ttl(mut self, cache_ttl: Duration) -> Self {
69        self.cache_ttl.replace(cache_ttl);
70        self
71    }
72
73    pub fn cache_tti(mut self, cache_tti: Duration) -> Self {
74        self.cache_tti.replace(cache_tti);
75        self
76    }
77
78    pub fn build(self) -> CachedKvBackend {
79        let cache_max_capacity = self
80            .cache_max_capacity
81            .unwrap_or(DEFAULT_CACHE_MAX_CAPACITY);
82        let cache_ttl = self.cache_ttl.unwrap_or(DEFAULT_CACHE_TTL);
83        let cache_tti = self.cache_tti.unwrap_or(DEFAULT_CACHE_TTI);
84
85        let cache = CacheBuilder::new(cache_max_capacity)
86            .time_to_live(cache_ttl)
87            .time_to_idle(cache_tti)
88            .build();
89        let kv_backend = self.inner;
90        let name = format!("CachedKvBackend({})", kv_backend.name());
91        let version = AtomicUsize::new(0);
92
93        CachedKvBackend {
94            kv_backend,
95            cache,
96            name,
97            version,
98        }
99    }
100}
101
102pub type CacheBackend = Cache<Vec<u8>, KeyValue>;
103
104/// A wrapper of `MetaKvBackend` with cache support.
105///
106/// CachedMetaKvBackend is mainly used to read metadata information from Metasrv, and provides
107/// cache for get and batch_get. One way to trigger cache invalidation of CachedMetaKvBackend:
108/// when metadata information changes, Metasrv will broadcast a metadata invalidation request.
109///
110/// Therefore, it is recommended to use CachedMetaKvBackend to only read metadata related
111/// information. Note: If you read other information, you may read expired data, which depends on
112/// TTL and TTI for cache.
113pub struct CachedKvBackend {
114    kv_backend: KvBackendRef,
115    cache: CacheBackend,
116    name: String,
117    version: AtomicUsize,
118}
119
120#[async_trait::async_trait]
121impl TxnService for CachedKvBackend {
122    type Error = Error;
123
124    async fn txn(&self, txn: Txn) -> std::result::Result<TxnResponse, Self::Error> {
125        // TODO(hl): txn of CachedKvBackend simply pass through to inner backend without invalidating caches.
126        self.kv_backend.txn(txn).await
127    }
128
129    fn max_txn_ops(&self) -> usize {
130        self.kv_backend.max_txn_ops()
131    }
132}
133
134#[async_trait::async_trait]
135impl KvBackend for CachedKvBackend {
136    fn name(&self) -> &str {
137        &self.name
138    }
139
140    fn as_any(&self) -> &dyn Any {
141        self
142    }
143
144    async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
145        self.kv_backend.range(req).await
146    }
147
148    async fn put(&self, req: PutRequest) -> Result<PutResponse> {
149        let key = &req.key.clone();
150
151        let ret = self.kv_backend.put(req).await;
152
153        if ret.is_ok() {
154            self.invalidate_key(key).await;
155        }
156
157        ret
158    }
159
160    async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
161        let keys = req
162            .kvs
163            .iter()
164            .map(|kv| kv.key().to_vec())
165            .collect::<Vec<_>>();
166
167        let resp = self.kv_backend.batch_put(req).await;
168
169        if resp.is_ok() {
170            for key in keys {
171                self.invalidate_key(&key).await;
172            }
173        }
174
175        resp
176    }
177
178    async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
179        let _timer = METRIC_CATALOG_KV_BATCH_GET.start_timer();
180
181        let mut kvs = Vec::with_capacity(req.keys.len());
182        let mut miss_keys = Vec::with_capacity(req.keys.len());
183
184        for key in req.keys {
185            if let Some(val) = self.cache.get(&key).await {
186                kvs.push(val);
187            } else {
188                miss_keys.push(key);
189            }
190        }
191
192        let batch_get_req = BatchGetRequest::new().with_keys(miss_keys.clone());
193
194        let pre_version = self.version();
195
196        let unhit_kvs = self.kv_backend.batch_get(batch_get_req).await?.kvs;
197
198        for kv in unhit_kvs.iter() {
199            self.cache.insert(kv.key().to_vec(), kv.clone()).await;
200        }
201
202        if !self.validate_version(pre_version) {
203            for key in miss_keys.iter() {
204                self.cache.invalidate(key).await;
205            }
206        }
207
208        kvs.extend(unhit_kvs);
209
210        Ok(BatchGetResponse { kvs })
211    }
212
213    async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result<CompareAndPutResponse> {
214        let key = &req.key.clone();
215
216        let ret = self.kv_backend.compare_and_put(req).await;
217
218        if ret.is_ok() {
219            self.invalidate_key(key).await;
220        }
221
222        ret
223    }
224
225    async fn delete_range(&self, mut req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
226        let prev_kv = req.prev_kv;
227
228        req.prev_kv = true;
229        let resp = self.kv_backend.delete_range(req).await;
230        match resp {
231            Ok(mut resp) => {
232                for prev_kv in resp.prev_kvs.iter() {
233                    self.invalidate_key(prev_kv.key()).await;
234                }
235
236                if !prev_kv {
237                    resp.prev_kvs = vec![];
238                }
239                Ok(resp)
240            }
241            Err(e) => Err(e),
242        }
243    }
244
245    async fn batch_delete(&self, mut req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
246        let prev_kv = req.prev_kv;
247
248        req.prev_kv = true;
249        let resp = self.kv_backend.batch_delete(req).await;
250        match resp {
251            Ok(mut resp) => {
252                for prev_kv in resp.prev_kvs.iter() {
253                    self.invalidate_key(prev_kv.key()).await;
254                }
255
256                if !prev_kv {
257                    resp.prev_kvs = vec![];
258                }
259                Ok(resp)
260            }
261            Err(e) => Err(e),
262        }
263    }
264
265    async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>> {
266        let _timer = METRIC_CATALOG_KV_GET.start_timer();
267
268        let pre_version = Arc::new(Mutex::new(None));
269
270        let init = async {
271            let version_clone = pre_version.clone();
272            let _timer = METRIC_CATALOG_KV_REMOTE_GET.start_timer();
273
274            version_clone.lock().unwrap().replace(self.version());
275
276            self.kv_backend.get(key).await.map(|val| {
277                val.with_context(|| CacheNotGetSnafu {
278                    key: String::from_utf8_lossy(key),
279                })
280            })?
281        };
282
283        // currently moka doesn't have `optionally_try_get_with_by_ref`
284        // TODO(fys): change to moka method when available
285        // https://github.com/moka-rs/moka/issues/254
286        let ret = match self.cache.try_get_with_by_ref(key, init).await {
287            Ok(val) => Ok(Some(val)),
288            Err(e) => match e.as_ref() {
289                CacheNotGet { .. } => Ok(None),
290                _ => Err(e),
291            },
292        }
293        .map_err(|e| {
294            GetKvCacheSnafu {
295                err_msg: e.to_string(),
296            }
297            .build()
298        });
299
300        // "cache.invalidate_key" and "cache.try_get_with_by_ref" are not mutually exclusive. So we need
301        // to use the version mechanism to prevent expired data from being put into the cache.
302        if pre_version
303            .lock()
304            .unwrap()
305            .as_ref()
306            .is_some_and(|v| !self.validate_version(*v))
307        {
308            self.cache.invalidate(key).await;
309        }
310
311        ret
312    }
313}
314
315#[async_trait::async_trait]
316impl KvCacheInvalidator for CachedKvBackend {
317    async fn invalidate_key(&self, key: &[u8]) {
318        self.create_new_version();
319        self.cache.invalidate(key).await;
320        debug!("invalidated cache key: {}", String::from_utf8_lossy(key));
321    }
322}
323
324impl CachedKvBackend {
325    // only for test
326    #[cfg(test)]
327    fn wrap(kv_backend: KvBackendRef) -> Self {
328        let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
329            .time_to_live(DEFAULT_CACHE_TTL)
330            .time_to_idle(DEFAULT_CACHE_TTI)
331            .build();
332
333        let name = format!("CachedKvBackend({})", kv_backend.name());
334        Self {
335            kv_backend,
336            cache,
337            name,
338            version: AtomicUsize::new(0),
339        }
340    }
341
342    pub fn cache(&self) -> &CacheBackend {
343        &self.cache
344    }
345
346    fn version(&self) -> usize {
347        self.version.load(Ordering::Relaxed)
348    }
349
350    fn validate_version(&self, pre_version: usize) -> bool {
351        self.version() == pre_version
352    }
353
354    fn create_new_version(&self) -> usize {
355        self.version.fetch_add(1, Ordering::Relaxed) + 1
356    }
357}
358
359#[derive(Debug)]
360pub struct MetaKvBackend {
361    pub client: Arc<MetaClient>,
362}
363
364impl MetaKvBackend {
365    /// Constructs a [MetaKvBackend].
366    pub fn new(client: Arc<MetaClient>) -> MetaKvBackend {
367        MetaKvBackend { client }
368    }
369}
370
371impl TxnService for MetaKvBackend {
372    type Error = Error;
373}
374
375/// Implement `KvBackend` trait for `MetaKvBackend` instead of opendal's `Accessor` since
376/// `MetaClient`'s range method can return both keys and values, which can reduce IO overhead
377/// comparing to `Accessor`'s list and get method.
378#[async_trait::async_trait]
379impl KvBackend for MetaKvBackend {
380    fn name(&self) -> &str {
381        "MetaKvBackend"
382    }
383
384    fn as_any(&self) -> &dyn Any {
385        self
386    }
387
388    async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
389        self.client
390            .range(req)
391            .await
392            .map_err(BoxedError::new)
393            .context(ExternalSnafu)
394    }
395
396    async fn put(&self, req: PutRequest) -> Result<PutResponse> {
397        self.client
398            .put(req)
399            .await
400            .map_err(BoxedError::new)
401            .context(ExternalSnafu)
402    }
403
404    async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
405        self.client
406            .batch_put(req)
407            .await
408            .map_err(BoxedError::new)
409            .context(ExternalSnafu)
410    }
411
412    async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
413        self.client
414            .batch_get(req)
415            .await
416            .map_err(BoxedError::new)
417            .context(ExternalSnafu)
418    }
419
420    async fn compare_and_put(
421        &self,
422        request: CompareAndPutRequest,
423    ) -> Result<CompareAndPutResponse> {
424        self.client
425            .compare_and_put(request)
426            .await
427            .map_err(BoxedError::new)
428            .context(ExternalSnafu)
429    }
430
431    async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
432        self.client
433            .delete_range(req)
434            .await
435            .map_err(BoxedError::new)
436            .context(ExternalSnafu)
437    }
438
439    async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
440        self.client
441            .batch_delete(req)
442            .await
443            .map_err(BoxedError::new)
444            .context(ExternalSnafu)
445    }
446
447    async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>> {
448        let mut response = self
449            .client
450            .range(RangeRequest::new().with_key(key))
451            .await
452            .map_err(BoxedError::new)
453            .context(ExternalSnafu)?;
454        Ok(response.take_kvs().get_mut(0).map(|kv| KeyValue {
455            key: kv.take_key(),
456            value: kv.take_value(),
457        }))
458    }
459}
460
461#[cfg(test)]
462mod tests {
463    use std::any::Any;
464    use std::sync::atomic::{AtomicU32, Ordering};
465    use std::sync::Arc;
466
467    use async_trait::async_trait;
468    use common_meta::kv_backend::{KvBackend, TxnService};
469    use common_meta::rpc::store::{
470        BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse,
471        BatchPutRequest, BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest,
472        PutResponse, RangeRequest, RangeResponse,
473    };
474    use common_meta::rpc::KeyValue;
475    use dashmap::DashMap;
476
477    use super::CachedKvBackend;
478
479    #[derive(Default)]
480    pub struct SimpleKvBackend {
481        inner_map: DashMap<Vec<u8>, Vec<u8>>,
482        get_execute_times: Arc<AtomicU32>,
483    }
484
485    impl TxnService for SimpleKvBackend {
486        type Error = common_meta::error::Error;
487    }
488
489    #[async_trait]
490    impl KvBackend for SimpleKvBackend {
491        fn name(&self) -> &str {
492            "SimpleKvBackend"
493        }
494
495        fn as_any(&self) -> &dyn Any {
496            self
497        }
498
499        async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse, Self::Error> {
500            let mut kvs = Vec::with_capacity(req.keys.len());
501            for key in req.keys.iter() {
502                if let Some(kv) = self.get(key).await? {
503                    kvs.push(kv);
504                }
505            }
506            Ok(BatchGetResponse { kvs })
507        }
508
509        async fn put(&self, req: PutRequest) -> Result<PutResponse, Self::Error> {
510            self.inner_map.insert(req.key, req.value);
511            // always return None as prev_kv, since we don't use it in this test.
512            Ok(PutResponse { prev_kv: None })
513        }
514
515        async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>, Self::Error> {
516            self.get_execute_times
517                .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
518            Ok(self.inner_map.get(key).map(|v| KeyValue {
519                key: key.to_vec(),
520                value: v.value().clone(),
521            }))
522        }
523
524        async fn range(&self, _req: RangeRequest) -> Result<RangeResponse, Self::Error> {
525            unimplemented!()
526        }
527
528        async fn batch_put(&self, _req: BatchPutRequest) -> Result<BatchPutResponse, Self::Error> {
529            unimplemented!()
530        }
531
532        async fn delete_range(
533            &self,
534            _req: DeleteRangeRequest,
535        ) -> Result<DeleteRangeResponse, Self::Error> {
536            unimplemented!()
537        }
538
539        async fn batch_delete(
540            &self,
541            _req: BatchDeleteRequest,
542        ) -> Result<BatchDeleteResponse, Self::Error> {
543            unimplemented!()
544        }
545    }
546
547    #[tokio::test]
548    async fn test_cached_kv_backend() {
549        let simple_kv = Arc::new(SimpleKvBackend::default());
550        let get_execute_times = simple_kv.get_execute_times.clone();
551        let cached_kv = CachedKvBackend::wrap(simple_kv);
552
553        add_some_vals(&cached_kv).await;
554
555        let batch_get_req = BatchGetRequest {
556            keys: vec![b"k1".to_vec(), b"k2".to_vec()],
557        };
558
559        assert_eq!(get_execute_times.load(Ordering::SeqCst), 0);
560
561        for _ in 0..10 {
562            let _batch_get_resp = cached_kv.batch_get(batch_get_req.clone()).await.unwrap();
563
564            assert_eq!(get_execute_times.load(Ordering::SeqCst), 2);
565        }
566
567        let batch_get_req = BatchGetRequest {
568            keys: vec![b"k1".to_vec(), b"k2".to_vec(), b"k3".to_vec()],
569        };
570
571        let _batch_get_resp = cached_kv.batch_get(batch_get_req.clone()).await.unwrap();
572
573        assert_eq!(get_execute_times.load(Ordering::SeqCst), 3);
574
575        for _ in 0..10 {
576            let _batch_get_resp = cached_kv.batch_get(batch_get_req.clone()).await.unwrap();
577
578            assert_eq!(get_execute_times.load(Ordering::SeqCst), 3);
579        }
580    }
581
582    async fn add_some_vals(kv_backend: &impl KvBackend) {
583        kv_backend
584            .put(PutRequest {
585                key: b"k1".to_vec(),
586                value: b"v1".to_vec(),
587                prev_kv: false,
588            })
589            .await
590            .unwrap();
591
592        kv_backend
593            .put(PutRequest {
594                key: b"k2".to_vec(),
595                value: b"v2".to_vec(),
596                prev_kv: false,
597            })
598            .await
599            .unwrap();
600
601        kv_backend
602            .put(PutRequest {
603                key: b"k3".to_vec(),
604                value: b"v3".to_vec(),
605                prev_kv: false,
606            })
607            .await
608            .unwrap();
609    }
610}