1use std::any::Any;
16use std::fmt::Debug;
17use std::sync::atomic::{AtomicUsize, Ordering};
18use std::sync::{Arc, Mutex};
19use std::time::Duration;
20
21use common_error::ext::BoxedError;
22use common_meta::cache_invalidator::KvCacheInvalidator;
23use common_meta::error::Error::CacheNotGet;
24use common_meta::error::{CacheNotGetSnafu, Error, ExternalSnafu, GetKvCacheSnafu, Result};
25use common_meta::kv_backend::txn::{Txn, TxnResponse};
26use common_meta::kv_backend::{KvBackend, KvBackendRef, TxnService};
27use common_meta::rpc::store::{
28 BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
29 BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
30 DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
31};
32use common_meta::rpc::KeyValue;
33use common_telemetry::debug;
34use meta_client::client::MetaClient;
35use moka::future::{Cache, CacheBuilder};
36use snafu::{OptionExt, ResultExt};
37
38use crate::metrics::{
39 METRIC_CATALOG_KV_BATCH_GET, METRIC_CATALOG_KV_GET, METRIC_CATALOG_KV_REMOTE_GET,
40};
41
42const DEFAULT_CACHE_MAX_CAPACITY: u64 = 10000;
43const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(10 * 60);
44const DEFAULT_CACHE_TTI: Duration = Duration::from_secs(5 * 60);
45
46pub struct CachedKvBackendBuilder {
47 cache_max_capacity: Option<u64>,
48 cache_ttl: Option<Duration>,
49 cache_tti: Option<Duration>,
50 inner: KvBackendRef,
51}
52
53impl CachedKvBackendBuilder {
54 pub fn new(inner: KvBackendRef) -> Self {
55 Self {
56 cache_max_capacity: None,
57 cache_ttl: None,
58 cache_tti: None,
59 inner,
60 }
61 }
62
63 pub fn cache_max_capacity(mut self, cache_max_capacity: u64) -> Self {
64 self.cache_max_capacity.replace(cache_max_capacity);
65 self
66 }
67
68 pub fn cache_ttl(mut self, cache_ttl: Duration) -> Self {
69 self.cache_ttl.replace(cache_ttl);
70 self
71 }
72
73 pub fn cache_tti(mut self, cache_tti: Duration) -> Self {
74 self.cache_tti.replace(cache_tti);
75 self
76 }
77
78 pub fn build(self) -> CachedKvBackend {
79 let cache_max_capacity = self
80 .cache_max_capacity
81 .unwrap_or(DEFAULT_CACHE_MAX_CAPACITY);
82 let cache_ttl = self.cache_ttl.unwrap_or(DEFAULT_CACHE_TTL);
83 let cache_tti = self.cache_tti.unwrap_or(DEFAULT_CACHE_TTI);
84
85 let cache = CacheBuilder::new(cache_max_capacity)
86 .time_to_live(cache_ttl)
87 .time_to_idle(cache_tti)
88 .build();
89 let kv_backend = self.inner;
90 let name = format!("CachedKvBackend({})", kv_backend.name());
91 let version = AtomicUsize::new(0);
92
93 CachedKvBackend {
94 kv_backend,
95 cache,
96 name,
97 version,
98 }
99 }
100}
101
102pub type CacheBackend = Cache<Vec<u8>, KeyValue>;
103
104pub struct CachedKvBackend {
114 kv_backend: KvBackendRef,
115 cache: CacheBackend,
116 name: String,
117 version: AtomicUsize,
118}
119
120#[async_trait::async_trait]
121impl TxnService for CachedKvBackend {
122 type Error = Error;
123
124 async fn txn(&self, txn: Txn) -> std::result::Result<TxnResponse, Self::Error> {
125 self.kv_backend.txn(txn).await
127 }
128
129 fn max_txn_ops(&self) -> usize {
130 self.kv_backend.max_txn_ops()
131 }
132}
133
134#[async_trait::async_trait]
135impl KvBackend for CachedKvBackend {
136 fn name(&self) -> &str {
137 &self.name
138 }
139
140 fn as_any(&self) -> &dyn Any {
141 self
142 }
143
144 async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
145 self.kv_backend.range(req).await
146 }
147
148 async fn put(&self, req: PutRequest) -> Result<PutResponse> {
149 let key = &req.key.clone();
150
151 let ret = self.kv_backend.put(req).await;
152
153 if ret.is_ok() {
154 self.invalidate_key(key).await;
155 }
156
157 ret
158 }
159
160 async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
161 let keys = req
162 .kvs
163 .iter()
164 .map(|kv| kv.key().to_vec())
165 .collect::<Vec<_>>();
166
167 let resp = self.kv_backend.batch_put(req).await;
168
169 if resp.is_ok() {
170 for key in keys {
171 self.invalidate_key(&key).await;
172 }
173 }
174
175 resp
176 }
177
178 async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
179 let _timer = METRIC_CATALOG_KV_BATCH_GET.start_timer();
180
181 let mut kvs = Vec::with_capacity(req.keys.len());
182 let mut miss_keys = Vec::with_capacity(req.keys.len());
183
184 for key in req.keys {
185 if let Some(val) = self.cache.get(&key).await {
186 kvs.push(val);
187 } else {
188 miss_keys.push(key);
189 }
190 }
191
192 let batch_get_req = BatchGetRequest::new().with_keys(miss_keys.clone());
193
194 let pre_version = self.version();
195
196 let unhit_kvs = self.kv_backend.batch_get(batch_get_req).await?.kvs;
197
198 for kv in unhit_kvs.iter() {
199 self.cache.insert(kv.key().to_vec(), kv.clone()).await;
200 }
201
202 if !self.validate_version(pre_version) {
203 for key in miss_keys.iter() {
204 self.cache.invalidate(key).await;
205 }
206 }
207
208 kvs.extend(unhit_kvs);
209
210 Ok(BatchGetResponse { kvs })
211 }
212
213 async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result<CompareAndPutResponse> {
214 let key = &req.key.clone();
215
216 let ret = self.kv_backend.compare_and_put(req).await;
217
218 if ret.is_ok() {
219 self.invalidate_key(key).await;
220 }
221
222 ret
223 }
224
225 async fn delete_range(&self, mut req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
226 let prev_kv = req.prev_kv;
227
228 req.prev_kv = true;
229 let resp = self.kv_backend.delete_range(req).await;
230 match resp {
231 Ok(mut resp) => {
232 for prev_kv in resp.prev_kvs.iter() {
233 self.invalidate_key(prev_kv.key()).await;
234 }
235
236 if !prev_kv {
237 resp.prev_kvs = vec![];
238 }
239 Ok(resp)
240 }
241 Err(e) => Err(e),
242 }
243 }
244
245 async fn batch_delete(&self, mut req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
246 let prev_kv = req.prev_kv;
247
248 req.prev_kv = true;
249 let resp = self.kv_backend.batch_delete(req).await;
250 match resp {
251 Ok(mut resp) => {
252 for prev_kv in resp.prev_kvs.iter() {
253 self.invalidate_key(prev_kv.key()).await;
254 }
255
256 if !prev_kv {
257 resp.prev_kvs = vec![];
258 }
259 Ok(resp)
260 }
261 Err(e) => Err(e),
262 }
263 }
264
265 async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>> {
266 let _timer = METRIC_CATALOG_KV_GET.start_timer();
267
268 let pre_version = Arc::new(Mutex::new(None));
269
270 let init = async {
271 let version_clone = pre_version.clone();
272 let _timer = METRIC_CATALOG_KV_REMOTE_GET.start_timer();
273
274 version_clone.lock().unwrap().replace(self.version());
275
276 self.kv_backend.get(key).await.map(|val| {
277 val.with_context(|| CacheNotGetSnafu {
278 key: String::from_utf8_lossy(key),
279 })
280 })?
281 };
282
283 let ret = match self.cache.try_get_with_by_ref(key, init).await {
287 Ok(val) => Ok(Some(val)),
288 Err(e) => match e.as_ref() {
289 CacheNotGet { .. } => Ok(None),
290 _ => Err(e),
291 },
292 }
293 .map_err(|e| {
294 GetKvCacheSnafu {
295 err_msg: e.to_string(),
296 }
297 .build()
298 });
299
300 if pre_version
303 .lock()
304 .unwrap()
305 .as_ref()
306 .is_some_and(|v| !self.validate_version(*v))
307 {
308 self.cache.invalidate(key).await;
309 }
310
311 ret
312 }
313}
314
315#[async_trait::async_trait]
316impl KvCacheInvalidator for CachedKvBackend {
317 async fn invalidate_key(&self, key: &[u8]) {
318 self.create_new_version();
319 self.cache.invalidate(key).await;
320 debug!("invalidated cache key: {}", String::from_utf8_lossy(key));
321 }
322}
323
324impl CachedKvBackend {
325 #[cfg(test)]
327 fn wrap(kv_backend: KvBackendRef) -> Self {
328 let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
329 .time_to_live(DEFAULT_CACHE_TTL)
330 .time_to_idle(DEFAULT_CACHE_TTI)
331 .build();
332
333 let name = format!("CachedKvBackend({})", kv_backend.name());
334 Self {
335 kv_backend,
336 cache,
337 name,
338 version: AtomicUsize::new(0),
339 }
340 }
341
342 pub fn cache(&self) -> &CacheBackend {
343 &self.cache
344 }
345
346 fn version(&self) -> usize {
347 self.version.load(Ordering::Relaxed)
348 }
349
350 fn validate_version(&self, pre_version: usize) -> bool {
351 self.version() == pre_version
352 }
353
354 fn create_new_version(&self) -> usize {
355 self.version.fetch_add(1, Ordering::Relaxed) + 1
356 }
357}
358
359#[derive(Debug)]
360pub struct MetaKvBackend {
361 pub client: Arc<MetaClient>,
362}
363
364impl MetaKvBackend {
365 pub fn new(client: Arc<MetaClient>) -> MetaKvBackend {
367 MetaKvBackend { client }
368 }
369}
370
371impl TxnService for MetaKvBackend {
372 type Error = Error;
373}
374
375#[async_trait::async_trait]
379impl KvBackend for MetaKvBackend {
380 fn name(&self) -> &str {
381 "MetaKvBackend"
382 }
383
384 fn as_any(&self) -> &dyn Any {
385 self
386 }
387
388 async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
389 self.client
390 .range(req)
391 .await
392 .map_err(BoxedError::new)
393 .context(ExternalSnafu)
394 }
395
396 async fn put(&self, req: PutRequest) -> Result<PutResponse> {
397 self.client
398 .put(req)
399 .await
400 .map_err(BoxedError::new)
401 .context(ExternalSnafu)
402 }
403
404 async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
405 self.client
406 .batch_put(req)
407 .await
408 .map_err(BoxedError::new)
409 .context(ExternalSnafu)
410 }
411
412 async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
413 self.client
414 .batch_get(req)
415 .await
416 .map_err(BoxedError::new)
417 .context(ExternalSnafu)
418 }
419
420 async fn compare_and_put(
421 &self,
422 request: CompareAndPutRequest,
423 ) -> Result<CompareAndPutResponse> {
424 self.client
425 .compare_and_put(request)
426 .await
427 .map_err(BoxedError::new)
428 .context(ExternalSnafu)
429 }
430
431 async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
432 self.client
433 .delete_range(req)
434 .await
435 .map_err(BoxedError::new)
436 .context(ExternalSnafu)
437 }
438
439 async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
440 self.client
441 .batch_delete(req)
442 .await
443 .map_err(BoxedError::new)
444 .context(ExternalSnafu)
445 }
446
447 async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>> {
448 let mut response = self
449 .client
450 .range(RangeRequest::new().with_key(key))
451 .await
452 .map_err(BoxedError::new)
453 .context(ExternalSnafu)?;
454 Ok(response.take_kvs().get_mut(0).map(|kv| KeyValue {
455 key: kv.take_key(),
456 value: kv.take_value(),
457 }))
458 }
459}
460
461#[cfg(test)]
462mod tests {
463 use std::any::Any;
464 use std::sync::atomic::{AtomicU32, Ordering};
465 use std::sync::Arc;
466
467 use async_trait::async_trait;
468 use common_meta::kv_backend::{KvBackend, TxnService};
469 use common_meta::rpc::store::{
470 BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse,
471 BatchPutRequest, BatchPutResponse, DeleteRangeRequest, DeleteRangeResponse, PutRequest,
472 PutResponse, RangeRequest, RangeResponse,
473 };
474 use common_meta::rpc::KeyValue;
475 use dashmap::DashMap;
476
477 use super::CachedKvBackend;
478
479 #[derive(Default)]
480 pub struct SimpleKvBackend {
481 inner_map: DashMap<Vec<u8>, Vec<u8>>,
482 get_execute_times: Arc<AtomicU32>,
483 }
484
485 impl TxnService for SimpleKvBackend {
486 type Error = common_meta::error::Error;
487 }
488
489 #[async_trait]
490 impl KvBackend for SimpleKvBackend {
491 fn name(&self) -> &str {
492 "SimpleKvBackend"
493 }
494
495 fn as_any(&self) -> &dyn Any {
496 self
497 }
498
499 async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse, Self::Error> {
500 let mut kvs = Vec::with_capacity(req.keys.len());
501 for key in req.keys.iter() {
502 if let Some(kv) = self.get(key).await? {
503 kvs.push(kv);
504 }
505 }
506 Ok(BatchGetResponse { kvs })
507 }
508
509 async fn put(&self, req: PutRequest) -> Result<PutResponse, Self::Error> {
510 self.inner_map.insert(req.key, req.value);
511 Ok(PutResponse { prev_kv: None })
513 }
514
515 async fn get(&self, key: &[u8]) -> Result<Option<KeyValue>, Self::Error> {
516 self.get_execute_times
517 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
518 Ok(self.inner_map.get(key).map(|v| KeyValue {
519 key: key.to_vec(),
520 value: v.value().clone(),
521 }))
522 }
523
524 async fn range(&self, _req: RangeRequest) -> Result<RangeResponse, Self::Error> {
525 unimplemented!()
526 }
527
528 async fn batch_put(&self, _req: BatchPutRequest) -> Result<BatchPutResponse, Self::Error> {
529 unimplemented!()
530 }
531
532 async fn delete_range(
533 &self,
534 _req: DeleteRangeRequest,
535 ) -> Result<DeleteRangeResponse, Self::Error> {
536 unimplemented!()
537 }
538
539 async fn batch_delete(
540 &self,
541 _req: BatchDeleteRequest,
542 ) -> Result<BatchDeleteResponse, Self::Error> {
543 unimplemented!()
544 }
545 }
546
547 #[tokio::test]
548 async fn test_cached_kv_backend() {
549 let simple_kv = Arc::new(SimpleKvBackend::default());
550 let get_execute_times = simple_kv.get_execute_times.clone();
551 let cached_kv = CachedKvBackend::wrap(simple_kv);
552
553 add_some_vals(&cached_kv).await;
554
555 let batch_get_req = BatchGetRequest {
556 keys: vec![b"k1".to_vec(), b"k2".to_vec()],
557 };
558
559 assert_eq!(get_execute_times.load(Ordering::SeqCst), 0);
560
561 for _ in 0..10 {
562 let _batch_get_resp = cached_kv.batch_get(batch_get_req.clone()).await.unwrap();
563
564 assert_eq!(get_execute_times.load(Ordering::SeqCst), 2);
565 }
566
567 let batch_get_req = BatchGetRequest {
568 keys: vec![b"k1".to_vec(), b"k2".to_vec(), b"k3".to_vec()],
569 };
570
571 let _batch_get_resp = cached_kv.batch_get(batch_get_req.clone()).await.unwrap();
572
573 assert_eq!(get_execute_times.load(Ordering::SeqCst), 3);
574
575 for _ in 0..10 {
576 let _batch_get_resp = cached_kv.batch_get(batch_get_req.clone()).await.unwrap();
577
578 assert_eq!(get_execute_times.load(Ordering::SeqCst), 3);
579 }
580 }
581
582 async fn add_some_vals(kv_backend: &impl KvBackend) {
583 kv_backend
584 .put(PutRequest {
585 key: b"k1".to_vec(),
586 value: b"v1".to_vec(),
587 prev_kv: false,
588 })
589 .await
590 .unwrap();
591
592 kv_backend
593 .put(PutRequest {
594 key: b"k2".to_vec(),
595 value: b"v2".to_vec(),
596 prev_kv: false,
597 })
598 .await
599 .unwrap();
600
601 kv_backend
602 .put(PutRequest {
603 key: b"k3".to_vec(),
604 value: b"v3".to_vec(),
605 prev_kv: false,
606 })
607 .await
608 .unwrap();
609 }
610}