object_store/layers/lru_cache/
read_cache.rs1use std::sync::Arc;
16
17use common_telemetry::debug;
18use futures::{FutureExt, TryStreamExt};
19use moka::future::Cache;
20use moka::notification::ListenerFuture;
21use moka::policy::EvictionPolicy;
22use opendal::raw::oio::{Read, Reader, Write};
23use opendal::raw::{oio, Access, OpDelete, OpRead, OpStat, OpWrite, RpRead};
24use opendal::{Error as OpendalError, ErrorKind, OperatorBuilder, Result};
25
26use crate::metrics::{
27 OBJECT_STORE_LRU_CACHE_BYTES, OBJECT_STORE_LRU_CACHE_ENTRIES, OBJECT_STORE_LRU_CACHE_HIT,
28 OBJECT_STORE_LRU_CACHE_MISS, OBJECT_STORE_READ_ERROR,
29};
30
31const RECOVER_CACHE_LIST_CONCURRENT: usize = 8;
32const READ_CACHE_DIR: &str = "cache/object/read";
36
37#[derive(Debug, Clone, PartialEq, Eq, Copy)]
39enum ReadResult {
40 Success(u32),
42 NotFound,
44}
45
46impl ReadResult {
47 fn size_bytes(&self) -> u32 {
48 match self {
49 ReadResult::NotFound => 0,
50 ReadResult::Success(size) => *size,
51 }
52 }
53}
54
55fn can_cache(path: &str) -> bool {
57 !path.ends_with("_last_checkpoint")
59}
60
61fn read_cache_key(path: &str, args: &OpRead) -> String {
63 format!(
64 "{READ_CACHE_DIR}/{:x}.cache-{}",
65 md5::compute(path),
66 args.range().to_header()
67 )
68}
69
70fn read_cache_root() -> String {
71 format!("/{READ_CACHE_DIR}/")
72}
73
74fn read_cache_key_prefix(path: &str) -> String {
75 format!("{READ_CACHE_DIR}/{:x}", md5::compute(path))
76}
77
78#[derive(Debug)]
80pub(crate) struct ReadCache<C> {
81 file_cache: Arc<C>,
83 mem_cache: Cache<String, ReadResult>,
85}
86
87impl<C> Clone for ReadCache<C> {
88 fn clone(&self) -> Self {
89 Self {
90 file_cache: self.file_cache.clone(),
91 mem_cache: self.mem_cache.clone(),
92 }
93 }
94}
95
96impl<C: Access> ReadCache<C> {
97 pub(crate) fn new(file_cache: Arc<C>, capacity: usize) -> Self {
99 let file_cache_cloned = OperatorBuilder::new(file_cache.clone()).finish();
100 let eviction_listener =
101 move |read_key: Arc<String>, read_result: ReadResult, cause| -> ListenerFuture {
102 OBJECT_STORE_LRU_CACHE_ENTRIES.dec();
104 let file_cache_cloned = file_cache_cloned.clone();
105
106 async move {
107 if let ReadResult::Success(size) = read_result {
108 OBJECT_STORE_LRU_CACHE_BYTES.sub(size as i64);
109
110 let result = file_cache_cloned.delete(&read_key).await;
111 debug!(
112 "Deleted local cache file `{}`, result: {:?}, cause: {:?}.",
113 read_key, result, cause
114 );
115 }
116 }
117 .boxed()
118 };
119
120 Self {
121 file_cache,
122 mem_cache: Cache::builder()
123 .max_capacity(capacity as u64)
124 .eviction_policy(EvictionPolicy::lru())
125 .weigher(|_key, value: &ReadResult| -> u32 {
126 value.size_bytes()
128 })
129 .async_eviction_listener(eviction_listener)
130 .support_invalidation_closures()
131 .build(),
132 }
133 }
134
135 pub(crate) async fn cache_stat(&self) -> (u64, u64) {
137 self.mem_cache.run_pending_tasks().await;
138
139 (self.mem_cache.entry_count(), self.mem_cache.weighted_size())
140 }
141
142 pub(crate) fn invalidate_entries_with_prefix(&self, path: &str) {
144 let prefix = read_cache_key_prefix(path);
145 self.mem_cache
147 .invalidate_entries_if(move |k: &String, &_v| k.starts_with(&prefix))
148 .ok();
149 }
150
151 pub(crate) async fn recover_cache(&self) -> Result<(u64, u64)> {
154 let op = OperatorBuilder::new(self.file_cache.clone()).finish();
155 let cloned_op = op.clone();
156 let root = read_cache_root();
157 let mut entries = op
158 .lister_with(&root)
159 .await?
160 .map_ok(|entry| async {
161 let (path, mut meta) = entry.into_parts();
162
163 if !cloned_op.info().full_capability().list_has_content_length {
164 meta = cloned_op.stat(&path).await?;
165 }
166
167 Ok((path, meta))
168 })
169 .try_buffer_unordered(RECOVER_CACHE_LIST_CONCURRENT)
170 .try_collect::<Vec<_>>()
171 .await?;
172
173 while let Some((read_key, metadata)) = entries.pop() {
174 if !metadata.is_file() {
175 continue;
176 }
177
178 let size = metadata.content_length();
179 OBJECT_STORE_LRU_CACHE_ENTRIES.inc();
180 OBJECT_STORE_LRU_CACHE_BYTES.add(size as i64);
181
182 self.mem_cache
183 .insert(read_key.to_string(), ReadResult::Success(size as u32))
184 .await;
185 }
186
187 Ok(self.cache_stat().await)
188 }
189
190 pub(crate) async fn contains_file(&self, path: &str) -> bool {
192 self.mem_cache.run_pending_tasks().await;
193 self.mem_cache.contains_key(path)
194 && self.file_cache.stat(path, OpStat::default()).await.is_ok()
195 }
196
197 pub(crate) async fn read_from_cache<I>(
203 &self,
204 inner: &I,
205 path: &str,
206 args: OpRead,
207 ) -> Result<(RpRead, Reader)>
208 where
209 I: Access,
210 {
211 if !can_cache(path) {
212 return inner.read(path, args).await.map(to_output_reader);
213 }
214
215 let read_key = read_cache_key(path, &args);
216
217 let read_result = self
218 .mem_cache
219 .try_get_with(
220 read_key.clone(),
221 self.read_remote(inner, &read_key, path, args.clone()),
222 )
223 .await
224 .map_err(|e| OpendalError::new(e.kind(), e.to_string()))?;
225
226 match read_result {
227 ReadResult::Success(_) => {
228 match self.file_cache.read(&read_key, OpRead::default()).await {
231 Ok(ret) => {
232 OBJECT_STORE_LRU_CACHE_HIT
233 .with_label_values(&["success"])
234 .inc();
235 Ok(to_output_reader(ret))
236 }
237 Err(_) => {
238 OBJECT_STORE_LRU_CACHE_MISS.inc();
239 inner.read(path, args).await.map(to_output_reader)
240 }
241 }
242 }
243 ReadResult::NotFound => {
244 OBJECT_STORE_LRU_CACHE_HIT
245 .with_label_values(&["not_found"])
246 .inc();
247
248 Err(OpendalError::new(
249 ErrorKind::NotFound,
250 format!("File not found: {path}"),
251 ))
252 }
253 }
254 }
255
256 async fn try_write_cache<I>(&self, mut reader: I::Reader, read_key: &str) -> Result<usize>
257 where
258 I: Access,
259 {
260 let (_, mut writer) = self.file_cache.write(read_key, OpWrite::new()).await?;
261 let mut total = 0;
262 loop {
263 let bytes = reader.read().await?;
264 if bytes.is_empty() {
265 break;
266 }
267
268 total += bytes.len();
269 writer.write(bytes).await?;
270 }
271 writer.close().await?;
273 Ok(total)
274 }
275
276 async fn read_remote<I>(
278 &self,
279 inner: &I,
280 read_key: &str,
281 path: &str,
282 args: OpRead,
283 ) -> Result<ReadResult>
284 where
285 I: Access,
286 {
287 OBJECT_STORE_LRU_CACHE_MISS.inc();
288
289 let (_, reader) = inner.read(path, args).await?;
290 let result = self.try_write_cache::<I>(reader, read_key).await;
291
292 match result {
293 Ok(read_bytes) => {
294 OBJECT_STORE_LRU_CACHE_ENTRIES.inc();
295 OBJECT_STORE_LRU_CACHE_BYTES.add(read_bytes as i64);
296
297 Ok(ReadResult::Success(read_bytes as u32))
298 }
299
300 Err(e) if e.kind() == ErrorKind::NotFound => {
301 OBJECT_STORE_READ_ERROR
302 .with_label_values(&[e.kind().to_string().as_str()])
303 .inc();
304 OBJECT_STORE_LRU_CACHE_ENTRIES.inc();
305
306 Ok(ReadResult::NotFound)
307 }
308
309 Err(e) => {
310 OBJECT_STORE_READ_ERROR
311 .with_label_values(&[e.kind().to_string().as_str()])
312 .inc();
313 Err(e)
314 }
315 }
316 }
317}
318
319pub struct CacheAwareDeleter<C, D> {
320 cache: ReadCache<C>,
321 deleter: D,
322}
323
324impl<C: Access, D: oio::Delete> CacheAwareDeleter<C, D> {
325 pub(crate) fn new(cache: ReadCache<C>, deleter: D) -> Self {
326 Self { cache, deleter }
327 }
328}
329
330impl<C: Access, D: oio::Delete> oio::Delete for CacheAwareDeleter<C, D> {
331 fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
332 self.cache.invalidate_entries_with_prefix(path);
333 self.deleter.delete(path, args)?;
334 Ok(())
335 }
336
337 async fn flush(&mut self) -> Result<usize> {
338 self.deleter.flush().await
339 }
340}
341
342fn to_output_reader<R: Read + 'static>(input: (RpRead, R)) -> (RpRead, Reader) {
343 (input.0, Box::new(input.1))
344}
345
346#[cfg(test)]
347mod tests {
348 use super::*;
349
350 #[test]
351 fn test_can_cache() {
352 assert!(can_cache("test"));
353 assert!(can_cache("a/b/c.parquet"));
354 assert!(can_cache("1.json"));
355 assert!(can_cache("100.checkpoint"));
356 assert!(can_cache("test/last_checkpoint"));
357 assert!(!can_cache("test/__last_checkpoint"));
358 assert!(!can_cache("a/b/c/__last_checkpoint"));
359 }
360}