object_store/layers/
lru_cache.rs1use std::sync::Arc;
16
17use opendal::raw::oio::Reader;
18use opendal::raw::{
19 Access, Layer, LayeredAccess, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead, RpWrite,
20};
21use opendal::Result;
22mod read_cache;
23use std::time::Instant;
24
25use common_telemetry::{error, info};
26use read_cache::ReadCache;
27
28use crate::layers::lru_cache::read_cache::CacheAwareDeleter;
29
30pub struct LruCacheLayer<C: Access> {
32 read_cache: ReadCache<C>,
34}
35
36impl<C: Access> Clone for LruCacheLayer<C> {
37 fn clone(&self) -> Self {
38 Self {
39 read_cache: self.read_cache.clone(),
40 }
41 }
42}
43
44impl<C: Access> LruCacheLayer<C> {
45 pub fn new(file_cache: Arc<C>, capacity: usize) -> Result<Self> {
47 let read_cache = ReadCache::new(file_cache, capacity);
48 Ok(Self { read_cache })
49 }
50
51 pub async fn recover_cache(&self, sync: bool) {
53 let now = Instant::now();
54 let moved_read_cache = self.read_cache.clone();
55 let handle = tokio::spawn(async move {
56 match moved_read_cache.recover_cache().await {
57 Ok((entries, bytes)) => info!(
58 "Recovered {} entries and total size {} in bytes for LruCacheLayer, cost: {:?}",
59 entries,
60 bytes,
61 now.elapsed()
62 ),
63 Err(err) => error!(err; "Failed to recover file cache."),
64 }
65 });
66 if sync {
67 let _ = handle.await;
68 }
69 }
70
71 pub async fn contains_file(&self, path: &str) -> bool {
73 self.read_cache.contains_file(path).await
74 }
75
76 pub async fn read_cache_stat(&self) -> (u64, u64) {
78 self.read_cache.cache_stat().await
79 }
80}
81
82impl<I: Access, C: Access> Layer<I> for LruCacheLayer<C> {
83 type LayeredAccess = LruCacheAccess<I, C>;
84
85 fn layer(&self, inner: I) -> Self::LayeredAccess {
86 LruCacheAccess {
87 inner,
88 read_cache: self.read_cache.clone(),
89 }
90 }
91}
92
93#[derive(Debug)]
94pub struct LruCacheAccess<I, C> {
95 inner: I,
96 read_cache: ReadCache<C>,
97}
98
99impl<I: Access, C: Access> LayeredAccess for LruCacheAccess<I, C> {
100 type Inner = I;
101 type Reader = Reader;
102 type Writer = I::Writer;
103 type Lister = I::Lister;
104 type Deleter = CacheAwareDeleter<C, I::Deleter>;
105
106 fn inner(&self) -> &Self::Inner {
107 &self.inner
108 }
109
110 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
111 self.read_cache
112 .read_from_cache(&self.inner, path, args)
113 .await
114 }
115
116 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
117 let result = self.inner.write(path, args).await;
118
119 self.read_cache.invalidate_entries_with_prefix(path);
120
121 result
122 }
123
124 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
125 self.inner
126 .delete()
127 .await
128 .map(|(rp, deleter)| (rp, CacheAwareDeleter::new(self.read_cache.clone(), deleter)))
129 }
130
131 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
132 self.inner.list(path, args).await
133 }
134}