meta_srv/service/store/
cached_kv.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16use std::collections::HashSet;
17use std::sync::atomic::{AtomicUsize, Ordering};
18use std::sync::{Arc, RwLock};
19
20use common_meta::error::{Error, Result};
21use common_meta::key::CACHE_KEY_PREFIXES;
22use common_meta::kv_backend::memory::MemoryKvBackend;
23use common_meta::kv_backend::txn::{Txn, TxnOp, TxnRequest, TxnResponse};
24use common_meta::kv_backend::{
25    KvBackend, KvBackendRef, ResettableKvBackend, ResettableKvBackendRef, TxnService,
26};
27use common_meta::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
28use common_meta::rpc::store::{
29    BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
30    BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
31    DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
32};
33use common_meta::rpc::KeyValue;
34use futures::TryStreamExt;
35
36use crate::metrics;
37use crate::state::State;
38
39pub type CheckLeaderRef = Arc<dyn CheckLeader>;
40
41pub trait CheckLeader: Sync + Send {
42    fn check(&self) -> bool;
43}
44
45struct AlwaysLeader;
46
47impl CheckLeader for AlwaysLeader {
48    fn check(&self) -> bool {
49        true
50    }
51}
52
53impl CheckLeader for RwLock<State> {
54    fn check(&self) -> bool {
55        self.read().unwrap().enable_leader_cache()
56    }
57}
58
59/// A cache dedicated to a Leader node, in order to cache some metadata.
60///
61/// To use this cache, the following constraints must be followed:
62///   1. The leader node can create this metadata.
63///   2. The follower node can create this metadata. The leader node can lazily retrieve
64///      the corresponding data through the caching loading mechanism.
65///   3. Only the leader node can update this metadata, as the cache cannot detect
66///      modifications made to the data on the follower node.
67///   4. Only the leader node can delete this metadata for the same reason mentioned above.
68pub struct LeaderCachedKvBackend {
69    check_leader: CheckLeaderRef,
70    store: KvBackendRef,
71    cache: ResettableKvBackendRef,
72    version: AtomicUsize,
73    name: String,
74}
75
76impl LeaderCachedKvBackend {
77    pub fn new(check_leader: CheckLeaderRef, store: KvBackendRef) -> Self {
78        let name = format!("LeaderCached({})", store.name());
79        Self {
80            check_leader,
81            store,
82            cache: Arc::new(MemoryKvBackend::new()),
83            version: AtomicUsize::new(0),
84            name,
85        }
86    }
87
88    /// With a leader checker which always returns true when checking,
89    /// mainly used in test scenarios.
90    pub fn with_always_leader(store: KvBackendRef) -> Self {
91        Self::new(Arc::new(AlwaysLeader), store)
92    }
93
94    /// The caller MUST ensure during the loading, there are no mutation requests reaching the `LeaderCachedKvStore`.
95    pub async fn load(&self) -> Result<()> {
96        for prefix in &CACHE_KEY_PREFIXES[..] {
97            let _timer =
98                metrics::METRIC_META_LEADER_CACHED_KV_LOAD_ELAPSED.with_label_values(&[prefix]);
99
100            // TODO(weny): Refactors PaginationStream's output to unary output.
101            let stream = PaginationStream::new(
102                self.store.clone(),
103                RangeRequest::new().with_prefix(prefix.as_bytes()),
104                DEFAULT_PAGE_SIZE,
105                Ok,
106            )
107            .into_stream();
108
109            let kvs = stream.try_collect::<Vec<_>>().await?;
110
111            self.cache
112                .batch_put(BatchPutRequest {
113                    kvs,
114                    prev_kv: false,
115                })
116                .await?;
117        }
118
119        Ok(())
120    }
121
122    #[inline]
123    fn is_leader(&self) -> bool {
124        self.check_leader.check()
125    }
126
127    #[inline]
128    async fn invalid_key(&self, key: Vec<u8>) -> Result<()> {
129        let _ = self.cache.delete(&key, false).await?;
130        Ok(())
131    }
132
133    #[inline]
134    async fn invalid_keys(&self, keys: Vec<Vec<u8>>) -> Result<()> {
135        let txn = Txn::new().and_then(keys.into_iter().map(TxnOp::Delete).collect::<Vec<_>>());
136        let _ = self.cache.txn(txn).await?;
137        Ok(())
138    }
139
140    #[inline]
141    fn get_version(&self) -> usize {
142        self.version.load(Ordering::Relaxed)
143    }
144
145    #[inline]
146    fn create_new_version(&self) -> usize {
147        self.version.fetch_add(1, Ordering::Relaxed) + 1
148    }
149
150    #[inline]
151    fn validate_version(&self, version: usize) -> bool {
152        version == self.version.load(Ordering::Relaxed)
153    }
154}
155
156#[async_trait::async_trait]
157impl KvBackend for LeaderCachedKvBackend {
158    fn name(&self) -> &str {
159        &self.name
160    }
161
162    fn as_any(&self) -> &dyn Any {
163        self
164    }
165
166    async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
167        if !self.is_leader() {
168            return self.store.range(req).await;
169        }
170
171        // We can only cache for exact key queries (i.e. get requests)
172        // because we cannot confirm if a range response is complete.
173        if !req.range_end.is_empty() {
174            return self.store.range(req).await;
175        }
176
177        let res = self.cache.range(req.clone()).await?;
178        if !res.kvs.is_empty() {
179            return Ok(res);
180        }
181
182        let ver = self.get_version();
183
184        let res = self
185            .store
186            .range(RangeRequest {
187                // ignores `keys_only`
188                keys_only: false,
189                ..req.clone()
190            })
191            .await?;
192        if !res.kvs.is_empty() {
193            let KeyValue { key, value } = res.kvs[0].clone();
194            let put_req = PutRequest {
195                key: key.clone(),
196                value,
197                ..Default::default()
198            };
199            let _ = self.cache.put(put_req).await?;
200
201            if !self.validate_version(ver) {
202                self.invalid_key(key).await?;
203            }
204        }
205
206        return Ok(res);
207    }
208
209    async fn put(&self, req: PutRequest) -> Result<PutResponse> {
210        if !self.is_leader() {
211            return self.store.put(req).await;
212        }
213
214        let ver = self.create_new_version();
215
216        let res = self.store.put(req.clone()).await?;
217        let _ = self.cache.put(req.clone()).await?;
218
219        if !self.validate_version(ver) {
220            self.invalid_key(req.key).await?;
221        }
222
223        Ok(res)
224    }
225
226    async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
227        if !self.is_leader() {
228            return self.store.batch_put(req).await;
229        }
230
231        let ver = self.create_new_version();
232
233        let res = self.store.batch_put(req.clone()).await?;
234        let _ = self.cache.batch_put(req.clone()).await?;
235
236        if !self.validate_version(ver) {
237            let keys = req.kvs.into_iter().map(|kv| kv.key).collect::<Vec<_>>();
238            self.invalid_keys(keys).await?;
239        }
240
241        Ok(res)
242    }
243
244    async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
245        if !self.is_leader() {
246            return self.store.batch_get(req).await;
247        }
248
249        let cached_res = self.cache.batch_get(req.clone()).await?;
250        // The cache hit all keys
251        if cached_res.kvs.len() == req.keys.len() {
252            return Ok(cached_res);
253        }
254
255        let hit_keys = cached_res
256            .kvs
257            .iter()
258            .map(|kv| kv.key.clone())
259            .collect::<HashSet<_>>();
260
261        metrics::METRIC_META_KV_CACHE_HIT
262            .with_label_values(&["batch_get"])
263            .inc_by(hit_keys.len() as u64);
264
265        let missed_keys = req
266            .keys
267            .iter()
268            .filter(|key| !hit_keys.contains(*key))
269            .cloned()
270            .collect::<Vec<_>>();
271        metrics::METRIC_META_KV_CACHE_MISS
272            .with_label_values(&["batch_get"])
273            .inc_by(missed_keys.len() as u64);
274
275        let remote_req = BatchGetRequest { keys: missed_keys };
276
277        let ver = self.get_version();
278
279        let remote_res = self.store.batch_get(remote_req).await?;
280        let put_req = BatchPutRequest {
281            kvs: remote_res.kvs.clone().into_iter().collect(),
282            ..Default::default()
283        };
284        let _ = self.cache.batch_put(put_req).await?;
285
286        if !self.validate_version(ver) {
287            let keys = remote_res
288                .kvs
289                .iter()
290                .map(|kv| kv.key().to_vec())
291                .collect::<Vec<_>>();
292            self.invalid_keys(keys).await?;
293        }
294
295        let mut merged_res = cached_res;
296        merged_res.kvs.extend(remote_res.kvs);
297        Ok(merged_res)
298    }
299
300    async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result<CompareAndPutResponse> {
301        if !self.is_leader() {
302            return self.store.compare_and_put(req).await;
303        }
304
305        let _ = self.create_new_version();
306
307        let key = req.key.clone();
308        let res = self.store.compare_and_put(req).await?;
309        // Delete key in the cache.
310        //
311        // Cache can not deal with the CAS operation, because it does
312        // not contain full data, so we need to delete the key.
313        self.invalid_key(key).await?;
314        Ok(res)
315    }
316
317    async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
318        if !self.is_leader() {
319            return self.store.delete_range(req).await;
320        }
321
322        let _ = self.create_new_version();
323
324        let res = self.store.delete_range(req.clone()).await?;
325        let _ = self.cache.delete_range(req).await?;
326        Ok(res)
327    }
328
329    async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
330        if !self.is_leader() {
331            return self.store.batch_delete(req).await;
332        }
333
334        let _ = self.create_new_version();
335
336        let res = self.store.batch_delete(req.clone()).await?;
337        let _ = self.cache.batch_delete(req).await?;
338        Ok(res)
339    }
340}
341
342#[async_trait::async_trait]
343impl TxnService for LeaderCachedKvBackend {
344    type Error = Error;
345
346    async fn txn(&self, txn: Txn) -> Result<TxnResponse> {
347        if !self.is_leader() {
348            return self.store.txn(txn).await;
349        }
350
351        let _ = self.create_new_version();
352
353        let res = self.store.txn(txn.clone()).await?;
354        let TxnRequest {
355            success, failure, ..
356        } = txn.into();
357        let mut all = success;
358        all.extend(failure);
359        // Delete all keys in the cache.
360        //
361        // Cache can not deal with the txn operation, because it does
362        // not contain full data, so we need to delete both keys.
363        let mut keys = Vec::with_capacity(all.len());
364        for txn_op in all {
365            match txn_op {
366                TxnOp::Put(key, _) => {
367                    keys.push(key);
368                }
369                TxnOp::Delete(key) => {
370                    keys.push(key);
371                }
372                TxnOp::Get(_) => {}
373            }
374        }
375        self.invalid_keys(keys).await?;
376
377        Ok(res)
378    }
379
380    fn max_txn_ops(&self) -> usize {
381        self.store.max_txn_ops()
382    }
383}
384
385impl ResettableKvBackend for LeaderCachedKvBackend {
386    fn reset(&self) {
387        self.cache.reset()
388    }
389
390    fn as_kv_backend_ref(self: Arc<Self>) -> KvBackendRef<Self::Error> {
391        self
392    }
393}
394
395#[cfg(test)]
396mod tests {
397    use common_meta::rpc::KeyValue;
398
399    use super::*;
400
401    fn create_leader_cached_kv_backend() -> LeaderCachedKvBackend {
402        let store = Arc::new(MemoryKvBackend::new());
403        LeaderCachedKvBackend::with_always_leader(store)
404    }
405
406    #[tokio::test]
407    async fn test_get_put_delete() {
408        let cached_store = create_leader_cached_kv_backend();
409        let inner_store = cached_store.store.clone();
410        let inner_cache = cached_store.cache.clone();
411
412        let key = "test_key".to_owned().into_bytes();
413        let value = "value".to_owned().into_bytes();
414
415        let put_req = PutRequest {
416            key: key.clone(),
417            value: value.clone(),
418            ..Default::default()
419        };
420        let _ = inner_store.put(put_req).await.unwrap();
421
422        let cached_value = inner_cache.get(&key).await.unwrap();
423        assert!(cached_value.is_none());
424
425        let cached_value = cached_store.get(&key).await.unwrap().unwrap();
426        assert_eq!(cached_value.value(), value);
427
428        let cached_value = inner_cache.get(&key).await.unwrap().unwrap();
429        assert_eq!(cached_value.value(), value);
430
431        let res = cached_store.delete(&key, true).await.unwrap().unwrap();
432        assert_eq!(res.value(), value);
433
434        let cached_value = inner_cache.get(&key).await.unwrap();
435        assert!(cached_value.is_none());
436    }
437
438    #[tokio::test]
439    async fn test_batch_get_put_delete() {
440        let cached_store = create_leader_cached_kv_backend();
441        let inner_store = cached_store.store.clone();
442        let inner_cache = cached_store.cache.clone();
443
444        let kvs = (1..3)
445            .map(|i| {
446                let key = format!("test_key_{}", i).into_bytes();
447                let value = format!("value_{}", i).into_bytes();
448                KeyValue { key, value }
449            })
450            .collect::<Vec<_>>();
451
452        let batch_put_req = BatchPutRequest {
453            kvs: kvs.clone(),
454            ..Default::default()
455        };
456
457        let _ = inner_store.batch_put(batch_put_req).await.unwrap();
458
459        let keys = (1..5)
460            .map(|i| format!("test_key_{}", i).into_bytes())
461            .collect::<Vec<_>>();
462
463        let batch_get_req = BatchGetRequest { keys };
464
465        let cached_values = inner_cache.batch_get(batch_get_req.clone()).await.unwrap();
466        assert!(cached_values.kvs.is_empty());
467
468        let cached_values = cached_store.batch_get(batch_get_req.clone()).await.unwrap();
469        assert_eq!(cached_values.kvs.len(), 2);
470
471        let cached_values = inner_cache.batch_get(batch_get_req.clone()).await.unwrap();
472        assert_eq!(cached_values.kvs.len(), 2);
473
474        cached_store.reset();
475
476        let cached_values = inner_cache.batch_get(batch_get_req).await.unwrap();
477        assert!(cached_values.kvs.is_empty());
478    }
479
480    #[tokio::test]
481    async fn test_txn() {
482        let cached_store = create_leader_cached_kv_backend();
483        let inner_cache = cached_store.cache.clone();
484
485        let kvs = (1..5)
486            .map(|i| {
487                let key = format!("test_key_{}", i).into_bytes();
488                let value = format!("value_{}", i).into_bytes();
489                KeyValue { key, value }
490            })
491            .collect::<Vec<_>>();
492
493        let batch_put_req = BatchPutRequest {
494            kvs: kvs.clone(),
495            ..Default::default()
496        };
497        let _ = cached_store.batch_put(batch_put_req).await.unwrap();
498
499        let keys = (1..5)
500            .map(|i| format!("test_key_{}", i).into_bytes())
501            .collect::<Vec<_>>();
502        let batch_get_req = BatchGetRequest { keys };
503        let cached_values = inner_cache.batch_get(batch_get_req.clone()).await.unwrap();
504        assert_eq!(cached_values.kvs.len(), 4);
505
506        let put_ops = (1..5)
507            .map(|i| {
508                let key = format!("test_key_{}", i).into_bytes();
509                let value = format!("value_{}", i).into_bytes();
510                TxnOp::Put(key, value)
511            })
512            .collect::<Vec<_>>();
513        let txn = Txn::new().and_then(put_ops);
514        let _ = cached_store.txn(txn).await.unwrap();
515
516        let cached_values = inner_cache.batch_get(batch_get_req).await.unwrap();
517        assert!(cached_values.kvs.is_empty());
518    }
519}