1use std::any::Any;
16use std::collections::HashSet;
17use std::sync::atomic::{AtomicUsize, Ordering};
18use std::sync::{Arc, RwLock};
19
20use common_meta::error::{Error, Result};
21use common_meta::key::CACHE_KEY_PREFIXES;
22use common_meta::kv_backend::memory::MemoryKvBackend;
23use common_meta::kv_backend::txn::{Txn, TxnOp, TxnRequest, TxnResponse};
24use common_meta::kv_backend::{
25 KvBackend, KvBackendRef, ResettableKvBackend, ResettableKvBackendRef, TxnService,
26};
27use common_meta::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE};
28use common_meta::rpc::store::{
29 BatchDeleteRequest, BatchDeleteResponse, BatchGetRequest, BatchGetResponse, BatchPutRequest,
30 BatchPutResponse, CompareAndPutRequest, CompareAndPutResponse, DeleteRangeRequest,
31 DeleteRangeResponse, PutRequest, PutResponse, RangeRequest, RangeResponse,
32};
33use common_meta::rpc::KeyValue;
34use futures::TryStreamExt;
35
36use crate::metrics;
37use crate::state::State;
38
39pub type CheckLeaderRef = Arc<dyn CheckLeader>;
40
41pub trait CheckLeader: Sync + Send {
42 fn check(&self) -> bool;
43}
44
45struct AlwaysLeader;
46
47impl CheckLeader for AlwaysLeader {
48 fn check(&self) -> bool {
49 true
50 }
51}
52
53impl CheckLeader for RwLock<State> {
54 fn check(&self) -> bool {
55 self.read().unwrap().enable_leader_cache()
56 }
57}
58
59pub struct LeaderCachedKvBackend {
69 check_leader: CheckLeaderRef,
70 store: KvBackendRef,
71 cache: ResettableKvBackendRef,
72 version: AtomicUsize,
73 name: String,
74}
75
76impl LeaderCachedKvBackend {
77 pub fn new(check_leader: CheckLeaderRef, store: KvBackendRef) -> Self {
78 let name = format!("LeaderCached({})", store.name());
79 Self {
80 check_leader,
81 store,
82 cache: Arc::new(MemoryKvBackend::new()),
83 version: AtomicUsize::new(0),
84 name,
85 }
86 }
87
88 pub fn with_always_leader(store: KvBackendRef) -> Self {
91 Self::new(Arc::new(AlwaysLeader), store)
92 }
93
94 pub async fn load(&self) -> Result<()> {
96 for prefix in &CACHE_KEY_PREFIXES[..] {
97 let _timer =
98 metrics::METRIC_META_LEADER_CACHED_KV_LOAD_ELAPSED.with_label_values(&[prefix]);
99
100 let stream = PaginationStream::new(
102 self.store.clone(),
103 RangeRequest::new().with_prefix(prefix.as_bytes()),
104 DEFAULT_PAGE_SIZE,
105 Ok,
106 )
107 .into_stream();
108
109 let kvs = stream.try_collect::<Vec<_>>().await?;
110
111 self.cache
112 .batch_put(BatchPutRequest {
113 kvs,
114 prev_kv: false,
115 })
116 .await?;
117 }
118
119 Ok(())
120 }
121
122 #[inline]
123 fn is_leader(&self) -> bool {
124 self.check_leader.check()
125 }
126
127 #[inline]
128 async fn invalid_key(&self, key: Vec<u8>) -> Result<()> {
129 let _ = self.cache.delete(&key, false).await?;
130 Ok(())
131 }
132
133 #[inline]
134 async fn invalid_keys(&self, keys: Vec<Vec<u8>>) -> Result<()> {
135 let txn = Txn::new().and_then(keys.into_iter().map(TxnOp::Delete).collect::<Vec<_>>());
136 let _ = self.cache.txn(txn).await?;
137 Ok(())
138 }
139
140 #[inline]
141 fn get_version(&self) -> usize {
142 self.version.load(Ordering::Relaxed)
143 }
144
145 #[inline]
146 fn create_new_version(&self) -> usize {
147 self.version.fetch_add(1, Ordering::Relaxed) + 1
148 }
149
150 #[inline]
151 fn validate_version(&self, version: usize) -> bool {
152 version == self.version.load(Ordering::Relaxed)
153 }
154}
155
156#[async_trait::async_trait]
157impl KvBackend for LeaderCachedKvBackend {
158 fn name(&self) -> &str {
159 &self.name
160 }
161
162 fn as_any(&self) -> &dyn Any {
163 self
164 }
165
166 async fn range(&self, req: RangeRequest) -> Result<RangeResponse> {
167 if !self.is_leader() {
168 return self.store.range(req).await;
169 }
170
171 if !req.range_end.is_empty() {
174 return self.store.range(req).await;
175 }
176
177 let res = self.cache.range(req.clone()).await?;
178 if !res.kvs.is_empty() {
179 return Ok(res);
180 }
181
182 let ver = self.get_version();
183
184 let res = self
185 .store
186 .range(RangeRequest {
187 keys_only: false,
189 ..req.clone()
190 })
191 .await?;
192 if !res.kvs.is_empty() {
193 let KeyValue { key, value } = res.kvs[0].clone();
194 let put_req = PutRequest {
195 key: key.clone(),
196 value,
197 ..Default::default()
198 };
199 let _ = self.cache.put(put_req).await?;
200
201 if !self.validate_version(ver) {
202 self.invalid_key(key).await?;
203 }
204 }
205
206 return Ok(res);
207 }
208
209 async fn put(&self, req: PutRequest) -> Result<PutResponse> {
210 if !self.is_leader() {
211 return self.store.put(req).await;
212 }
213
214 let ver = self.create_new_version();
215
216 let res = self.store.put(req.clone()).await?;
217 let _ = self.cache.put(req.clone()).await?;
218
219 if !self.validate_version(ver) {
220 self.invalid_key(req.key).await?;
221 }
222
223 Ok(res)
224 }
225
226 async fn batch_put(&self, req: BatchPutRequest) -> Result<BatchPutResponse> {
227 if !self.is_leader() {
228 return self.store.batch_put(req).await;
229 }
230
231 let ver = self.create_new_version();
232
233 let res = self.store.batch_put(req.clone()).await?;
234 let _ = self.cache.batch_put(req.clone()).await?;
235
236 if !self.validate_version(ver) {
237 let keys = req.kvs.into_iter().map(|kv| kv.key).collect::<Vec<_>>();
238 self.invalid_keys(keys).await?;
239 }
240
241 Ok(res)
242 }
243
244 async fn batch_get(&self, req: BatchGetRequest) -> Result<BatchGetResponse> {
245 if !self.is_leader() {
246 return self.store.batch_get(req).await;
247 }
248
249 let cached_res = self.cache.batch_get(req.clone()).await?;
250 if cached_res.kvs.len() == req.keys.len() {
252 return Ok(cached_res);
253 }
254
255 let hit_keys = cached_res
256 .kvs
257 .iter()
258 .map(|kv| kv.key.clone())
259 .collect::<HashSet<_>>();
260
261 metrics::METRIC_META_KV_CACHE_HIT
262 .with_label_values(&["batch_get"])
263 .inc_by(hit_keys.len() as u64);
264
265 let missed_keys = req
266 .keys
267 .iter()
268 .filter(|key| !hit_keys.contains(*key))
269 .cloned()
270 .collect::<Vec<_>>();
271 metrics::METRIC_META_KV_CACHE_MISS
272 .with_label_values(&["batch_get"])
273 .inc_by(missed_keys.len() as u64);
274
275 let remote_req = BatchGetRequest { keys: missed_keys };
276
277 let ver = self.get_version();
278
279 let remote_res = self.store.batch_get(remote_req).await?;
280 let put_req = BatchPutRequest {
281 kvs: remote_res.kvs.clone().into_iter().collect(),
282 ..Default::default()
283 };
284 let _ = self.cache.batch_put(put_req).await?;
285
286 if !self.validate_version(ver) {
287 let keys = remote_res
288 .kvs
289 .iter()
290 .map(|kv| kv.key().to_vec())
291 .collect::<Vec<_>>();
292 self.invalid_keys(keys).await?;
293 }
294
295 let mut merged_res = cached_res;
296 merged_res.kvs.extend(remote_res.kvs);
297 Ok(merged_res)
298 }
299
300 async fn compare_and_put(&self, req: CompareAndPutRequest) -> Result<CompareAndPutResponse> {
301 if !self.is_leader() {
302 return self.store.compare_and_put(req).await;
303 }
304
305 let _ = self.create_new_version();
306
307 let key = req.key.clone();
308 let res = self.store.compare_and_put(req).await?;
309 self.invalid_key(key).await?;
314 Ok(res)
315 }
316
317 async fn delete_range(&self, req: DeleteRangeRequest) -> Result<DeleteRangeResponse> {
318 if !self.is_leader() {
319 return self.store.delete_range(req).await;
320 }
321
322 let _ = self.create_new_version();
323
324 let res = self.store.delete_range(req.clone()).await?;
325 let _ = self.cache.delete_range(req).await?;
326 Ok(res)
327 }
328
329 async fn batch_delete(&self, req: BatchDeleteRequest) -> Result<BatchDeleteResponse> {
330 if !self.is_leader() {
331 return self.store.batch_delete(req).await;
332 }
333
334 let _ = self.create_new_version();
335
336 let res = self.store.batch_delete(req.clone()).await?;
337 let _ = self.cache.batch_delete(req).await?;
338 Ok(res)
339 }
340}
341
342#[async_trait::async_trait]
343impl TxnService for LeaderCachedKvBackend {
344 type Error = Error;
345
346 async fn txn(&self, txn: Txn) -> Result<TxnResponse> {
347 if !self.is_leader() {
348 return self.store.txn(txn).await;
349 }
350
351 let _ = self.create_new_version();
352
353 let res = self.store.txn(txn.clone()).await?;
354 let TxnRequest {
355 success, failure, ..
356 } = txn.into();
357 let mut all = success;
358 all.extend(failure);
359 let mut keys = Vec::with_capacity(all.len());
364 for txn_op in all {
365 match txn_op {
366 TxnOp::Put(key, _) => {
367 keys.push(key);
368 }
369 TxnOp::Delete(key) => {
370 keys.push(key);
371 }
372 TxnOp::Get(_) => {}
373 }
374 }
375 self.invalid_keys(keys).await?;
376
377 Ok(res)
378 }
379
380 fn max_txn_ops(&self) -> usize {
381 self.store.max_txn_ops()
382 }
383}
384
385impl ResettableKvBackend for LeaderCachedKvBackend {
386 fn reset(&self) {
387 self.cache.reset()
388 }
389
390 fn as_kv_backend_ref(self: Arc<Self>) -> KvBackendRef<Self::Error> {
391 self
392 }
393}
394
395#[cfg(test)]
396mod tests {
397 use common_meta::rpc::KeyValue;
398
399 use super::*;
400
401 fn create_leader_cached_kv_backend() -> LeaderCachedKvBackend {
402 let store = Arc::new(MemoryKvBackend::new());
403 LeaderCachedKvBackend::with_always_leader(store)
404 }
405
406 #[tokio::test]
407 async fn test_get_put_delete() {
408 let cached_store = create_leader_cached_kv_backend();
409 let inner_store = cached_store.store.clone();
410 let inner_cache = cached_store.cache.clone();
411
412 let key = "test_key".to_owned().into_bytes();
413 let value = "value".to_owned().into_bytes();
414
415 let put_req = PutRequest {
416 key: key.clone(),
417 value: value.clone(),
418 ..Default::default()
419 };
420 let _ = inner_store.put(put_req).await.unwrap();
421
422 let cached_value = inner_cache.get(&key).await.unwrap();
423 assert!(cached_value.is_none());
424
425 let cached_value = cached_store.get(&key).await.unwrap().unwrap();
426 assert_eq!(cached_value.value(), value);
427
428 let cached_value = inner_cache.get(&key).await.unwrap().unwrap();
429 assert_eq!(cached_value.value(), value);
430
431 let res = cached_store.delete(&key, true).await.unwrap().unwrap();
432 assert_eq!(res.value(), value);
433
434 let cached_value = inner_cache.get(&key).await.unwrap();
435 assert!(cached_value.is_none());
436 }
437
438 #[tokio::test]
439 async fn test_batch_get_put_delete() {
440 let cached_store = create_leader_cached_kv_backend();
441 let inner_store = cached_store.store.clone();
442 let inner_cache = cached_store.cache.clone();
443
444 let kvs = (1..3)
445 .map(|i| {
446 let key = format!("test_key_{}", i).into_bytes();
447 let value = format!("value_{}", i).into_bytes();
448 KeyValue { key, value }
449 })
450 .collect::<Vec<_>>();
451
452 let batch_put_req = BatchPutRequest {
453 kvs: kvs.clone(),
454 ..Default::default()
455 };
456
457 let _ = inner_store.batch_put(batch_put_req).await.unwrap();
458
459 let keys = (1..5)
460 .map(|i| format!("test_key_{}", i).into_bytes())
461 .collect::<Vec<_>>();
462
463 let batch_get_req = BatchGetRequest { keys };
464
465 let cached_values = inner_cache.batch_get(batch_get_req.clone()).await.unwrap();
466 assert!(cached_values.kvs.is_empty());
467
468 let cached_values = cached_store.batch_get(batch_get_req.clone()).await.unwrap();
469 assert_eq!(cached_values.kvs.len(), 2);
470
471 let cached_values = inner_cache.batch_get(batch_get_req.clone()).await.unwrap();
472 assert_eq!(cached_values.kvs.len(), 2);
473
474 cached_store.reset();
475
476 let cached_values = inner_cache.batch_get(batch_get_req).await.unwrap();
477 assert!(cached_values.kvs.is_empty());
478 }
479
480 #[tokio::test]
481 async fn test_txn() {
482 let cached_store = create_leader_cached_kv_backend();
483 let inner_cache = cached_store.cache.clone();
484
485 let kvs = (1..5)
486 .map(|i| {
487 let key = format!("test_key_{}", i).into_bytes();
488 let value = format!("value_{}", i).into_bytes();
489 KeyValue { key, value }
490 })
491 .collect::<Vec<_>>();
492
493 let batch_put_req = BatchPutRequest {
494 kvs: kvs.clone(),
495 ..Default::default()
496 };
497 let _ = cached_store.batch_put(batch_put_req).await.unwrap();
498
499 let keys = (1..5)
500 .map(|i| format!("test_key_{}", i).into_bytes())
501 .collect::<Vec<_>>();
502 let batch_get_req = BatchGetRequest { keys };
503 let cached_values = inner_cache.batch_get(batch_get_req.clone()).await.unwrap();
504 assert_eq!(cached_values.kvs.len(), 4);
505
506 let put_ops = (1..5)
507 .map(|i| {
508 let key = format!("test_key_{}", i).into_bytes();
509 let value = format!("value_{}", i).into_bytes();
510 TxnOp::Put(key, value)
511 })
512 .collect::<Vec<_>>();
513 let txn = Txn::new().and_then(put_ops);
514 let _ = cached_store.txn(txn).await.unwrap();
515
516 let cached_values = inner_cache.batch_get(batch_get_req).await.unwrap();
517 assert!(cached_values.kvs.is_empty());
518 }
519}