object_store/layers/lru_cache/
read_cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use common_telemetry::debug;
18use futures::{FutureExt, TryStreamExt};
19use moka::future::Cache;
20use moka::notification::ListenerFuture;
21use moka::policy::EvictionPolicy;
22use opendal::raw::oio::{Read, Reader, Write};
23use opendal::raw::{oio, Access, OpDelete, OpRead, OpStat, OpWrite, RpRead};
24use opendal::{Error as OpendalError, ErrorKind, OperatorBuilder, Result};
25
26use crate::metrics::{
27    OBJECT_STORE_LRU_CACHE_BYTES, OBJECT_STORE_LRU_CACHE_ENTRIES, OBJECT_STORE_LRU_CACHE_HIT,
28    OBJECT_STORE_LRU_CACHE_MISS, OBJECT_STORE_READ_ERROR,
29};
30
31const RECOVER_CACHE_LIST_CONCURRENT: usize = 8;
32/// Subdirectory of cached files for read.
33///
34/// This must contain three layers, corresponding to [`build_prometheus_metrics_layer`](object_store::layers::build_prometheus_metrics_layer).
35const READ_CACHE_DIR: &str = "cache/object/read";
36
37/// Cache value for read file
38#[derive(Debug, Clone, PartialEq, Eq, Copy)]
39enum ReadResult {
40    // Read success with size
41    Success(u32),
42    // File not found
43    NotFound,
44}
45
46impl ReadResult {
47    fn size_bytes(&self) -> u32 {
48        match self {
49            ReadResult::NotFound => 0,
50            ReadResult::Success(size) => *size,
51        }
52    }
53}
54
55/// Returns true when the path of the file can be cached.
56fn can_cache(path: &str) -> bool {
57    // TODO(dennis): find a better way
58    !path.ends_with("_last_checkpoint")
59}
60
61/// Generate a unique cache key for the read path and range.
62fn read_cache_key(path: &str, args: &OpRead) -> String {
63    format!(
64        "{READ_CACHE_DIR}/{:x}.cache-{}",
65        md5::compute(path),
66        args.range().to_header()
67    )
68}
69
70fn read_cache_root() -> String {
71    format!("/{READ_CACHE_DIR}/")
72}
73
74fn read_cache_key_prefix(path: &str) -> String {
75    format!("{READ_CACHE_DIR}/{:x}", md5::compute(path))
76}
77
78/// Local read cache for files in object storage
79#[derive(Debug)]
80pub(crate) struct ReadCache<C> {
81    /// Local file cache backend
82    file_cache: Arc<C>,
83    /// Local memory cache to track local cache files
84    mem_cache: Cache<String, ReadResult>,
85}
86
87impl<C> Clone for ReadCache<C> {
88    fn clone(&self) -> Self {
89        Self {
90            file_cache: self.file_cache.clone(),
91            mem_cache: self.mem_cache.clone(),
92        }
93    }
94}
95
96impl<C: Access> ReadCache<C> {
97    /// Create a [`ReadCache`] with capacity in bytes.
98    pub(crate) fn new(file_cache: Arc<C>, capacity: usize) -> Self {
99        let file_cache_cloned = OperatorBuilder::new(file_cache.clone()).finish();
100        let eviction_listener =
101            move |read_key: Arc<String>, read_result: ReadResult, cause| -> ListenerFuture {
102                // Delete the file from local file cache when it's purged from mem_cache.
103                OBJECT_STORE_LRU_CACHE_ENTRIES.dec();
104                let file_cache_cloned = file_cache_cloned.clone();
105
106                async move {
107                    if let ReadResult::Success(size) = read_result {
108                        OBJECT_STORE_LRU_CACHE_BYTES.sub(size as i64);
109
110                        let result = file_cache_cloned.delete(&read_key).await;
111                        debug!(
112                            "Deleted local cache file `{}`, result: {:?}, cause: {:?}.",
113                            read_key, result, cause
114                        );
115                    }
116                }
117                .boxed()
118            };
119
120        Self {
121            file_cache,
122            mem_cache: Cache::builder()
123                .max_capacity(capacity as u64)
124                .eviction_policy(EvictionPolicy::lru())
125                .weigher(|_key, value: &ReadResult| -> u32 {
126                    // TODO(dennis): add key's length to weight?
127                    value.size_bytes()
128                })
129                .async_eviction_listener(eviction_listener)
130                .support_invalidation_closures()
131                .build(),
132        }
133    }
134
135    /// Returns the cache's entry count and total approximate entry size in bytes.
136    pub(crate) async fn cache_stat(&self) -> (u64, u64) {
137        self.mem_cache.run_pending_tasks().await;
138
139        (self.mem_cache.entry_count(), self.mem_cache.weighted_size())
140    }
141
142    /// Invalidate all cache items belong to the specific path.
143    pub(crate) fn invalidate_entries_with_prefix(&self, path: &str) {
144        let prefix = read_cache_key_prefix(path);
145        // Safety: always ok when building cache with `support_invalidation_closures`.
146        self.mem_cache
147            .invalidate_entries_if(move |k: &String, &_v| k.starts_with(&prefix))
148            .ok();
149    }
150
151    /// Recover existing cache items from `file_cache` to `mem_cache`.
152    /// Return entry count and total approximate entry size in bytes.
153    pub(crate) async fn recover_cache(&self) -> Result<(u64, u64)> {
154        let op = OperatorBuilder::new(self.file_cache.clone()).finish();
155        let cloned_op = op.clone();
156        let root = read_cache_root();
157        let mut entries = op
158            .lister_with(&root)
159            .await?
160            .map_ok(|entry| async {
161                let (path, mut meta) = entry.into_parts();
162
163                // TODO(dennis): Use a better API, see https://github.com/apache/opendal/issues/6522
164                if meta.content_length() == 0 {
165                    meta = cloned_op.stat(&path).await?;
166                }
167
168                Ok((path, meta))
169            })
170            .try_buffer_unordered(RECOVER_CACHE_LIST_CONCURRENT)
171            .try_collect::<Vec<_>>()
172            .await?;
173
174        while let Some((read_key, metadata)) = entries.pop() {
175            if !metadata.is_file() {
176                continue;
177            }
178
179            let size = metadata.content_length();
180            OBJECT_STORE_LRU_CACHE_ENTRIES.inc();
181            OBJECT_STORE_LRU_CACHE_BYTES.add(size as i64);
182
183            self.mem_cache
184                .insert(read_key.to_string(), ReadResult::Success(size as u32))
185                .await;
186        }
187
188        Ok(self.cache_stat().await)
189    }
190
191    /// Returns true when the read cache contains the specific file.
192    pub(crate) async fn contains_file(&self, path: &str) -> bool {
193        self.mem_cache.run_pending_tasks().await;
194        self.mem_cache.contains_key(path)
195            && self.file_cache.stat(path, OpStat::default()).await.is_ok()
196    }
197
198    /// Read from a specific path using the OpRead operation.
199    /// It will attempt to retrieve the data from the local cache.
200    /// If the data is not found in the local cache,
201    /// it will fall back to retrieving it from remote object storage
202    /// and cache the result locally.
203    pub(crate) async fn read_from_cache<I>(
204        &self,
205        inner: &I,
206        path: &str,
207        args: OpRead,
208    ) -> Result<(RpRead, Reader)>
209    where
210        I: Access,
211    {
212        if !can_cache(path) {
213            return inner.read(path, args).await.map(to_output_reader);
214        }
215
216        let read_key = read_cache_key(path, &args);
217
218        let read_result = self
219            .mem_cache
220            .try_get_with(
221                read_key.clone(),
222                self.read_remote(inner, &read_key, path, args.clone()),
223            )
224            .await
225            .map_err(|e| OpendalError::new(e.kind(), e.to_string()))?;
226
227        match read_result {
228            ReadResult::Success(_) => {
229                // There is a concurrent issue here, the local cache may be purged
230                // while reading, we have to fall back to remote read
231                match self.file_cache.read(&read_key, OpRead::default()).await {
232                    Ok(ret) => {
233                        OBJECT_STORE_LRU_CACHE_HIT
234                            .with_label_values(&["success"])
235                            .inc();
236                        Ok(to_output_reader(ret))
237                    }
238                    Err(_) => {
239                        OBJECT_STORE_LRU_CACHE_MISS.inc();
240                        inner.read(path, args).await.map(to_output_reader)
241                    }
242                }
243            }
244            ReadResult::NotFound => {
245                OBJECT_STORE_LRU_CACHE_HIT
246                    .with_label_values(&["not_found"])
247                    .inc();
248
249                Err(OpendalError::new(
250                    ErrorKind::NotFound,
251                    format!("File not found: {path}"),
252                ))
253            }
254        }
255    }
256
257    async fn try_write_cache<I>(&self, mut reader: I::Reader, read_key: &str) -> Result<usize>
258    where
259        I: Access,
260    {
261        let (_, mut writer) = self.file_cache.write(read_key, OpWrite::new()).await?;
262        let mut total = 0;
263        loop {
264            let bytes = reader.read().await?;
265            if bytes.is_empty() {
266                break;
267            }
268
269            total += bytes.len();
270            writer.write(bytes).await?;
271        }
272        // Call `close` to ensure data is written.
273        writer.close().await?;
274        Ok(total)
275    }
276
277    /// Read the file from remote storage. If success, write the content into local cache.
278    async fn read_remote<I>(
279        &self,
280        inner: &I,
281        read_key: &str,
282        path: &str,
283        args: OpRead,
284    ) -> Result<ReadResult>
285    where
286        I: Access,
287    {
288        OBJECT_STORE_LRU_CACHE_MISS.inc();
289
290        let (_, reader) = inner.read(path, args).await?;
291        let result = self.try_write_cache::<I>(reader, read_key).await;
292
293        match result {
294            Ok(read_bytes) => {
295                OBJECT_STORE_LRU_CACHE_ENTRIES.inc();
296                OBJECT_STORE_LRU_CACHE_BYTES.add(read_bytes as i64);
297
298                Ok(ReadResult::Success(read_bytes as u32))
299            }
300
301            Err(e) if e.kind() == ErrorKind::NotFound => {
302                OBJECT_STORE_READ_ERROR
303                    .with_label_values(&[e.kind().to_string().as_str()])
304                    .inc();
305                OBJECT_STORE_LRU_CACHE_ENTRIES.inc();
306
307                Ok(ReadResult::NotFound)
308            }
309
310            Err(e) => {
311                OBJECT_STORE_READ_ERROR
312                    .with_label_values(&[e.kind().to_string().as_str()])
313                    .inc();
314                Err(e)
315            }
316        }
317    }
318}
319
320pub struct CacheAwareDeleter<C, D> {
321    cache: ReadCache<C>,
322    deleter: D,
323}
324
325impl<C: Access, D: oio::Delete> CacheAwareDeleter<C, D> {
326    pub(crate) fn new(cache: ReadCache<C>, deleter: D) -> Self {
327        Self { cache, deleter }
328    }
329}
330
331impl<C: Access, D: oio::Delete> oio::Delete for CacheAwareDeleter<C, D> {
332    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
333        self.cache.invalidate_entries_with_prefix(path);
334        self.deleter.delete(path, args)?;
335        Ok(())
336    }
337
338    async fn flush(&mut self) -> Result<usize> {
339        self.deleter.flush().await
340    }
341}
342
343fn to_output_reader<R: Read + 'static>(input: (RpRead, R)) -> (RpRead, Reader) {
344    (input.0, Box::new(input.1))
345}
346
347#[cfg(test)]
348mod tests {
349    use super::*;
350
351    #[test]
352    fn test_can_cache() {
353        assert!(can_cache("test"));
354        assert!(can_cache("a/b/c.parquet"));
355        assert!(can_cache("1.json"));
356        assert!(can_cache("100.checkpoint"));
357        assert!(can_cache("test/last_checkpoint"));
358        assert!(!can_cache("test/__last_checkpoint"));
359        assert!(!can_cache("a/b/c/__last_checkpoint"));
360    }
361}