object_store/layers/
lru_cache.rs1use std::sync::Arc;
16
17use opendal::raw::oio::Reader;
18use opendal::raw::{
19 Access, Layer, LayeredAccess, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead, RpWrite,
20};
21use opendal::Result;
22mod read_cache;
23use std::time::Instant;
24
25use common_telemetry::{error, info};
26use read_cache::ReadCache;
27
28use crate::layers::lru_cache::read_cache::CacheAwareDeleter;
29
30pub struct LruCacheLayer<C: Access> {
32 read_cache: ReadCache<C>,
34}
35
36impl<C: Access> Clone for LruCacheLayer<C> {
37 fn clone(&self) -> Self {
38 Self {
39 read_cache: self.read_cache.clone(),
40 }
41 }
42}
43
44impl<C: Access> LruCacheLayer<C> {
45 pub fn new(file_cache: Arc<C>, capacity: usize) -> Result<Self> {
47 let read_cache = ReadCache::new(file_cache, capacity);
48 Ok(Self { read_cache })
49 }
50
51 pub async fn recover_cache(&self, sync: bool) {
53 let now = Instant::now();
54 let moved_read_cache = self.read_cache.clone();
55 let handle = tokio::spawn(async move {
56 match moved_read_cache.recover_cache().await {
57 Ok((entries, bytes)) => info!(
58 "Recovered {} entries and total size {} in bytes for LruCacheLayer, cost: {:?}",
59 entries,
60 bytes,
61 now.elapsed()
62 ),
63 Err(err) => error!(err; "Failed to recover file cache."),
64 }
65 });
66 if sync {
67 let _ = handle.await;
68 }
69 }
70
71 pub async fn contains_file(&self, path: &str) -> bool {
73 self.read_cache.contains_file(path).await
74 }
75
76 pub async fn read_cache_stat(&self) -> (u64, u64) {
78 self.read_cache.cache_stat().await
79 }
80}
81
82impl<I: Access, C: Access> Layer<I> for LruCacheLayer<C> {
83 type LayeredAccess = LruCacheAccess<I, C>;
84
85 fn layer(&self, inner: I) -> Self::LayeredAccess {
86 LruCacheAccess {
87 inner,
88 read_cache: self.read_cache.clone(),
89 }
90 }
91}
92
93#[derive(Debug)]
94pub struct LruCacheAccess<I, C> {
95 inner: I,
96 read_cache: ReadCache<C>,
97}
98
99impl<I: Access, C: Access> LayeredAccess for LruCacheAccess<I, C> {
100 type Inner = I;
101 type Reader = Reader;
102 type BlockingReader = I::BlockingReader;
103 type Writer = I::Writer;
104 type BlockingWriter = I::BlockingWriter;
105 type Lister = I::Lister;
106 type BlockingLister = I::BlockingLister;
107 type Deleter = CacheAwareDeleter<C, I::Deleter>;
108 type BlockingDeleter = I::BlockingDeleter;
109
110 fn inner(&self) -> &Self::Inner {
111 &self.inner
112 }
113
114 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
115 self.read_cache
116 .read_from_cache(&self.inner, path, args)
117 .await
118 }
119
120 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
121 let result = self.inner.write(path, args).await;
122
123 self.read_cache.invalidate_entries_with_prefix(path);
124
125 result
126 }
127
128 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
129 self.inner
130 .delete()
131 .await
132 .map(|(rp, deleter)| (rp, CacheAwareDeleter::new(self.read_cache.clone(), deleter)))
133 }
134
135 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
136 self.inner.list(path, args).await
137 }
138
139 fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
140 self.inner.blocking_read(path, args)
142 }
143
144 fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
145 let result = self.inner.blocking_write(path, args);
146
147 self.read_cache.invalidate_entries_with_prefix(path);
148
149 result
150 }
151
152 fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
153 self.inner.blocking_list(path, args)
154 }
155
156 fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
157 self.inner.blocking_delete()
158 }
159}