object_store/layers/lru_cache/
read_cache.rs1use std::sync::Arc;
16
17use common_telemetry::debug;
18use futures::{FutureExt, TryStreamExt};
19use moka::future::Cache;
20use moka::notification::ListenerFuture;
21use moka::policy::EvictionPolicy;
22use opendal::raw::oio::{Read, Reader, Write};
23use opendal::raw::{oio, Access, OpDelete, OpRead, OpStat, OpWrite, RpRead};
24use opendal::{Error as OpendalError, ErrorKind, OperatorBuilder, Result};
25
26use crate::metrics::{
27 OBJECT_STORE_LRU_CACHE_BYTES, OBJECT_STORE_LRU_CACHE_ENTRIES, OBJECT_STORE_LRU_CACHE_HIT,
28 OBJECT_STORE_LRU_CACHE_MISS, OBJECT_STORE_READ_ERROR,
29};
30
31const RECOVER_CACHE_LIST_CONCURRENT: usize = 8;
32const READ_CACHE_DIR: &str = "cache/object/read";
36
37#[derive(Debug, Clone, PartialEq, Eq, Copy)]
39enum ReadResult {
40 Success(u32),
42 NotFound,
44}
45
46impl ReadResult {
47 fn size_bytes(&self) -> u32 {
48 match self {
49 ReadResult::NotFound => 0,
50 ReadResult::Success(size) => *size,
51 }
52 }
53}
54
55fn can_cache(path: &str) -> bool {
57 !path.ends_with("_last_checkpoint")
59}
60
61fn read_cache_key(path: &str, args: &OpRead) -> String {
63 format!(
64 "{READ_CACHE_DIR}/{:x}.cache-{}",
65 md5::compute(path),
66 args.range().to_header()
67 )
68}
69
70fn read_cache_root() -> String {
71 format!("/{READ_CACHE_DIR}/")
72}
73
74fn read_cache_key_prefix(path: &str) -> String {
75 format!("{READ_CACHE_DIR}/{:x}", md5::compute(path))
76}
77
78#[derive(Debug)]
80pub(crate) struct ReadCache<C> {
81 file_cache: Arc<C>,
83 mem_cache: Cache<String, ReadResult>,
85}
86
87impl<C> Clone for ReadCache<C> {
88 fn clone(&self) -> Self {
89 Self {
90 file_cache: self.file_cache.clone(),
91 mem_cache: self.mem_cache.clone(),
92 }
93 }
94}
95
96impl<C: Access> ReadCache<C> {
97 pub(crate) fn new(file_cache: Arc<C>, capacity: usize) -> Self {
99 let file_cache_cloned = OperatorBuilder::new(file_cache.clone()).finish();
100 let eviction_listener =
101 move |read_key: Arc<String>, read_result: ReadResult, cause| -> ListenerFuture {
102 OBJECT_STORE_LRU_CACHE_ENTRIES.dec();
104 let file_cache_cloned = file_cache_cloned.clone();
105
106 async move {
107 if let ReadResult::Success(size) = read_result {
108 OBJECT_STORE_LRU_CACHE_BYTES.sub(size as i64);
109
110 let result = file_cache_cloned.delete(&read_key).await;
111 debug!(
112 "Deleted local cache file `{}`, result: {:?}, cause: {:?}.",
113 read_key, result, cause
114 );
115 }
116 }
117 .boxed()
118 };
119
120 Self {
121 file_cache,
122 mem_cache: Cache::builder()
123 .max_capacity(capacity as u64)
124 .eviction_policy(EvictionPolicy::lru())
125 .weigher(|_key, value: &ReadResult| -> u32 {
126 value.size_bytes()
128 })
129 .async_eviction_listener(eviction_listener)
130 .support_invalidation_closures()
131 .build(),
132 }
133 }
134
135 pub(crate) async fn cache_stat(&self) -> (u64, u64) {
137 self.mem_cache.run_pending_tasks().await;
138
139 (self.mem_cache.entry_count(), self.mem_cache.weighted_size())
140 }
141
142 pub(crate) fn invalidate_entries_with_prefix(&self, path: &str) {
144 let prefix = read_cache_key_prefix(path);
145 self.mem_cache
147 .invalidate_entries_if(move |k: &String, &_v| k.starts_with(&prefix))
148 .ok();
149 }
150
151 pub(crate) async fn recover_cache(&self) -> Result<(u64, u64)> {
154 let op = OperatorBuilder::new(self.file_cache.clone()).finish();
155 let cloned_op = op.clone();
156 let root = read_cache_root();
157 let mut entries = op
158 .lister_with(&root)
159 .await?
160 .map_ok(|entry| async {
161 let (path, mut meta) = entry.into_parts();
162
163 if meta.content_length() == 0 {
165 meta = cloned_op.stat(&path).await?;
166 }
167
168 Ok((path, meta))
169 })
170 .try_buffer_unordered(RECOVER_CACHE_LIST_CONCURRENT)
171 .try_collect::<Vec<_>>()
172 .await?;
173
174 while let Some((read_key, metadata)) = entries.pop() {
175 if !metadata.is_file() {
176 continue;
177 }
178
179 let size = metadata.content_length();
180 OBJECT_STORE_LRU_CACHE_ENTRIES.inc();
181 OBJECT_STORE_LRU_CACHE_BYTES.add(size as i64);
182
183 self.mem_cache
184 .insert(read_key.to_string(), ReadResult::Success(size as u32))
185 .await;
186 }
187
188 Ok(self.cache_stat().await)
189 }
190
191 pub(crate) async fn contains_file(&self, path: &str) -> bool {
193 self.mem_cache.run_pending_tasks().await;
194 self.mem_cache.contains_key(path)
195 && self.file_cache.stat(path, OpStat::default()).await.is_ok()
196 }
197
198 pub(crate) async fn read_from_cache<I>(
204 &self,
205 inner: &I,
206 path: &str,
207 args: OpRead,
208 ) -> Result<(RpRead, Reader)>
209 where
210 I: Access,
211 {
212 if !can_cache(path) {
213 return inner.read(path, args).await.map(to_output_reader);
214 }
215
216 let read_key = read_cache_key(path, &args);
217
218 let read_result = self
219 .mem_cache
220 .try_get_with(
221 read_key.clone(),
222 self.read_remote(inner, &read_key, path, args.clone()),
223 )
224 .await
225 .map_err(|e| OpendalError::new(e.kind(), e.to_string()))?;
226
227 match read_result {
228 ReadResult::Success(_) => {
229 match self.file_cache.read(&read_key, OpRead::default()).await {
232 Ok(ret) => {
233 OBJECT_STORE_LRU_CACHE_HIT
234 .with_label_values(&["success"])
235 .inc();
236 Ok(to_output_reader(ret))
237 }
238 Err(_) => {
239 OBJECT_STORE_LRU_CACHE_MISS.inc();
240 inner.read(path, args).await.map(to_output_reader)
241 }
242 }
243 }
244 ReadResult::NotFound => {
245 OBJECT_STORE_LRU_CACHE_HIT
246 .with_label_values(&["not_found"])
247 .inc();
248
249 Err(OpendalError::new(
250 ErrorKind::NotFound,
251 format!("File not found: {path}"),
252 ))
253 }
254 }
255 }
256
257 async fn try_write_cache<I>(&self, mut reader: I::Reader, read_key: &str) -> Result<usize>
258 where
259 I: Access,
260 {
261 let (_, mut writer) = self.file_cache.write(read_key, OpWrite::new()).await?;
262 let mut total = 0;
263 loop {
264 let bytes = reader.read().await?;
265 if bytes.is_empty() {
266 break;
267 }
268
269 total += bytes.len();
270 writer.write(bytes).await?;
271 }
272 writer.close().await?;
274 Ok(total)
275 }
276
277 async fn read_remote<I>(
279 &self,
280 inner: &I,
281 read_key: &str,
282 path: &str,
283 args: OpRead,
284 ) -> Result<ReadResult>
285 where
286 I: Access,
287 {
288 OBJECT_STORE_LRU_CACHE_MISS.inc();
289
290 let (_, reader) = inner.read(path, args).await?;
291 let result = self.try_write_cache::<I>(reader, read_key).await;
292
293 match result {
294 Ok(read_bytes) => {
295 OBJECT_STORE_LRU_CACHE_ENTRIES.inc();
296 OBJECT_STORE_LRU_CACHE_BYTES.add(read_bytes as i64);
297
298 Ok(ReadResult::Success(read_bytes as u32))
299 }
300
301 Err(e) if e.kind() == ErrorKind::NotFound => {
302 OBJECT_STORE_READ_ERROR
303 .with_label_values(&[e.kind().to_string().as_str()])
304 .inc();
305 OBJECT_STORE_LRU_CACHE_ENTRIES.inc();
306
307 Ok(ReadResult::NotFound)
308 }
309
310 Err(e) => {
311 OBJECT_STORE_READ_ERROR
312 .with_label_values(&[e.kind().to_string().as_str()])
313 .inc();
314 Err(e)
315 }
316 }
317 }
318}
319
320pub struct CacheAwareDeleter<C, D> {
321 cache: ReadCache<C>,
322 deleter: D,
323}
324
325impl<C: Access, D: oio::Delete> CacheAwareDeleter<C, D> {
326 pub(crate) fn new(cache: ReadCache<C>, deleter: D) -> Self {
327 Self { cache, deleter }
328 }
329}
330
331impl<C: Access, D: oio::Delete> oio::Delete for CacheAwareDeleter<C, D> {
332 fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
333 self.cache.invalidate_entries_with_prefix(path);
334 self.deleter.delete(path, args)?;
335 Ok(())
336 }
337
338 async fn flush(&mut self) -> Result<usize> {
339 self.deleter.flush().await
340 }
341}
342
343fn to_output_reader<R: Read + 'static>(input: (RpRead, R)) -> (RpRead, Reader) {
344 (input.0, Box::new(input.1))
345}
346
347#[cfg(test)]
348mod tests {
349 use super::*;
350
351 #[test]
352 fn test_can_cache() {
353 assert!(can_cache("test"));
354 assert!(can_cache("a/b/c.parquet"));
355 assert!(can_cache("1.json"));
356 assert!(can_cache("100.checkpoint"));
357 assert!(can_cache("test/last_checkpoint"));
358 assert!(!can_cache("test/__last_checkpoint"));
359 assert!(!can_cache("a/b/c/__last_checkpoint"));
360 }
361}