object_store/layers/lru_cache/
read_cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use common_telemetry::debug;
18use futures::{FutureExt, TryStreamExt};
19use moka::future::Cache;
20use moka::notification::ListenerFuture;
21use moka::policy::EvictionPolicy;
22use opendal::raw::oio::{Read, Reader, Write};
23use opendal::raw::{oio, Access, OpDelete, OpRead, OpStat, OpWrite, RpRead};
24use opendal::{Error as OpendalError, ErrorKind, OperatorBuilder, Result};
25
26use crate::metrics::{
27    OBJECT_STORE_LRU_CACHE_BYTES, OBJECT_STORE_LRU_CACHE_ENTRIES, OBJECT_STORE_LRU_CACHE_HIT,
28    OBJECT_STORE_LRU_CACHE_MISS, OBJECT_STORE_READ_ERROR,
29};
30
31const RECOVER_CACHE_LIST_CONCURRENT: usize = 8;
32/// Subdirectory of cached files for read.
33///
34/// This must contain three layers, corresponding to [`build_prometheus_metrics_layer`](object_store::layers::build_prometheus_metrics_layer).
35const READ_CACHE_DIR: &str = "cache/object/read";
36
37/// Cache value for read file
38#[derive(Debug, Clone, PartialEq, Eq, Copy)]
39enum ReadResult {
40    // Read success with size
41    Success(u32),
42    // File not found
43    NotFound,
44}
45
46impl ReadResult {
47    fn size_bytes(&self) -> u32 {
48        match self {
49            ReadResult::NotFound => 0,
50            ReadResult::Success(size) => *size,
51        }
52    }
53}
54
55/// Returns true when the path of the file can be cached.
56fn can_cache(path: &str) -> bool {
57    // TODO(dennis): find a better way
58    !path.ends_with("_last_checkpoint")
59}
60
61/// Generate a unique cache key for the read path and range.
62fn read_cache_key(path: &str, args: &OpRead) -> String {
63    format!(
64        "{READ_CACHE_DIR}/{:x}.cache-{}",
65        md5::compute(path),
66        args.range().to_header()
67    )
68}
69
70fn read_cache_root() -> String {
71    format!("/{READ_CACHE_DIR}/")
72}
73
74fn read_cache_key_prefix(path: &str) -> String {
75    format!("{READ_CACHE_DIR}/{:x}", md5::compute(path))
76}
77
78/// Local read cache for files in object storage
79#[derive(Debug)]
80pub(crate) struct ReadCache<C> {
81    /// Local file cache backend
82    file_cache: Arc<C>,
83    /// Local memory cache to track local cache files
84    mem_cache: Cache<String, ReadResult>,
85}
86
87impl<C> Clone for ReadCache<C> {
88    fn clone(&self) -> Self {
89        Self {
90            file_cache: self.file_cache.clone(),
91            mem_cache: self.mem_cache.clone(),
92        }
93    }
94}
95
96impl<C: Access> ReadCache<C> {
97    /// Create a [`ReadCache`] with capacity in bytes.
98    pub(crate) fn new(file_cache: Arc<C>, capacity: usize) -> Self {
99        let file_cache_cloned = OperatorBuilder::new(file_cache.clone()).finish();
100        let eviction_listener =
101            move |read_key: Arc<String>, read_result: ReadResult, cause| -> ListenerFuture {
102                // Delete the file from local file cache when it's purged from mem_cache.
103                OBJECT_STORE_LRU_CACHE_ENTRIES.dec();
104                let file_cache_cloned = file_cache_cloned.clone();
105
106                async move {
107                    if let ReadResult::Success(size) = read_result {
108                        OBJECT_STORE_LRU_CACHE_BYTES.sub(size as i64);
109
110                        let result = file_cache_cloned.delete(&read_key).await;
111                        debug!(
112                            "Deleted local cache file `{}`, result: {:?}, cause: {:?}.",
113                            read_key, result, cause
114                        );
115                    }
116                }
117                .boxed()
118            };
119
120        Self {
121            file_cache,
122            mem_cache: Cache::builder()
123                .max_capacity(capacity as u64)
124                .eviction_policy(EvictionPolicy::lru())
125                .weigher(|_key, value: &ReadResult| -> u32 {
126                    // TODO(dennis): add key's length to weight?
127                    value.size_bytes()
128                })
129                .async_eviction_listener(eviction_listener)
130                .support_invalidation_closures()
131                .build(),
132        }
133    }
134
135    /// Returns the cache's entry count and total approximate entry size in bytes.
136    pub(crate) async fn cache_stat(&self) -> (u64, u64) {
137        self.mem_cache.run_pending_tasks().await;
138
139        (self.mem_cache.entry_count(), self.mem_cache.weighted_size())
140    }
141
142    /// Invalidate all cache items belong to the specific path.
143    pub(crate) fn invalidate_entries_with_prefix(&self, path: &str) {
144        let prefix = read_cache_key_prefix(path);
145        // Safety: always ok when building cache with `support_invalidation_closures`.
146        self.mem_cache
147            .invalidate_entries_if(move |k: &String, &_v| k.starts_with(&prefix))
148            .ok();
149    }
150
151    /// Recover existing cache items from `file_cache` to `mem_cache`.
152    /// Return entry count and total approximate entry size in bytes.
153    pub(crate) async fn recover_cache(&self) -> Result<(u64, u64)> {
154        let op = OperatorBuilder::new(self.file_cache.clone()).finish();
155        let cloned_op = op.clone();
156        let root = read_cache_root();
157        let mut entries = op
158            .lister_with(&root)
159            .await?
160            .map_ok(|entry| async {
161                let (path, mut meta) = entry.into_parts();
162
163                if !cloned_op.info().full_capability().list_has_content_length {
164                    meta = cloned_op.stat(&path).await?;
165                }
166
167                Ok((path, meta))
168            })
169            .try_buffer_unordered(RECOVER_CACHE_LIST_CONCURRENT)
170            .try_collect::<Vec<_>>()
171            .await?;
172
173        while let Some((read_key, metadata)) = entries.pop() {
174            if !metadata.is_file() {
175                continue;
176            }
177
178            let size = metadata.content_length();
179            OBJECT_STORE_LRU_CACHE_ENTRIES.inc();
180            OBJECT_STORE_LRU_CACHE_BYTES.add(size as i64);
181
182            self.mem_cache
183                .insert(read_key.to_string(), ReadResult::Success(size as u32))
184                .await;
185        }
186
187        Ok(self.cache_stat().await)
188    }
189
190    /// Returns true when the read cache contains the specific file.
191    pub(crate) async fn contains_file(&self, path: &str) -> bool {
192        self.mem_cache.run_pending_tasks().await;
193        self.mem_cache.contains_key(path)
194            && self.file_cache.stat(path, OpStat::default()).await.is_ok()
195    }
196
197    /// Read from a specific path using the OpRead operation.
198    /// It will attempt to retrieve the data from the local cache.
199    /// If the data is not found in the local cache,
200    /// it will fall back to retrieving it from remote object storage
201    /// and cache the result locally.
202    pub(crate) async fn read_from_cache<I>(
203        &self,
204        inner: &I,
205        path: &str,
206        args: OpRead,
207    ) -> Result<(RpRead, Reader)>
208    where
209        I: Access,
210    {
211        if !can_cache(path) {
212            return inner.read(path, args).await.map(to_output_reader);
213        }
214
215        let read_key = read_cache_key(path, &args);
216
217        let read_result = self
218            .mem_cache
219            .try_get_with(
220                read_key.clone(),
221                self.read_remote(inner, &read_key, path, args.clone()),
222            )
223            .await
224            .map_err(|e| OpendalError::new(e.kind(), e.to_string()))?;
225
226        match read_result {
227            ReadResult::Success(_) => {
228                // There is a concurrent issue here, the local cache may be purged
229                // while reading, we have to fall back to remote read
230                match self.file_cache.read(&read_key, OpRead::default()).await {
231                    Ok(ret) => {
232                        OBJECT_STORE_LRU_CACHE_HIT
233                            .with_label_values(&["success"])
234                            .inc();
235                        Ok(to_output_reader(ret))
236                    }
237                    Err(_) => {
238                        OBJECT_STORE_LRU_CACHE_MISS.inc();
239                        inner.read(path, args).await.map(to_output_reader)
240                    }
241                }
242            }
243            ReadResult::NotFound => {
244                OBJECT_STORE_LRU_CACHE_HIT
245                    .with_label_values(&["not_found"])
246                    .inc();
247
248                Err(OpendalError::new(
249                    ErrorKind::NotFound,
250                    format!("File not found: {path}"),
251                ))
252            }
253        }
254    }
255
256    async fn try_write_cache<I>(&self, mut reader: I::Reader, read_key: &str) -> Result<usize>
257    where
258        I: Access,
259    {
260        let (_, mut writer) = self.file_cache.write(read_key, OpWrite::new()).await?;
261        let mut total = 0;
262        loop {
263            let bytes = reader.read().await?;
264            if bytes.is_empty() {
265                break;
266            }
267
268            total += bytes.len();
269            writer.write(bytes).await?;
270        }
271        // Call `close` to ensure data is written.
272        writer.close().await?;
273        Ok(total)
274    }
275
276    /// Read the file from remote storage. If success, write the content into local cache.
277    async fn read_remote<I>(
278        &self,
279        inner: &I,
280        read_key: &str,
281        path: &str,
282        args: OpRead,
283    ) -> Result<ReadResult>
284    where
285        I: Access,
286    {
287        OBJECT_STORE_LRU_CACHE_MISS.inc();
288
289        let (_, reader) = inner.read(path, args).await?;
290        let result = self.try_write_cache::<I>(reader, read_key).await;
291
292        match result {
293            Ok(read_bytes) => {
294                OBJECT_STORE_LRU_CACHE_ENTRIES.inc();
295                OBJECT_STORE_LRU_CACHE_BYTES.add(read_bytes as i64);
296
297                Ok(ReadResult::Success(read_bytes as u32))
298            }
299
300            Err(e) if e.kind() == ErrorKind::NotFound => {
301                OBJECT_STORE_READ_ERROR
302                    .with_label_values(&[e.kind().to_string().as_str()])
303                    .inc();
304                OBJECT_STORE_LRU_CACHE_ENTRIES.inc();
305
306                Ok(ReadResult::NotFound)
307            }
308
309            Err(e) => {
310                OBJECT_STORE_READ_ERROR
311                    .with_label_values(&[e.kind().to_string().as_str()])
312                    .inc();
313                Err(e)
314            }
315        }
316    }
317}
318
319pub struct CacheAwareDeleter<C, D> {
320    cache: ReadCache<C>,
321    deleter: D,
322}
323
324impl<C: Access, D: oio::Delete> CacheAwareDeleter<C, D> {
325    pub(crate) fn new(cache: ReadCache<C>, deleter: D) -> Self {
326        Self { cache, deleter }
327    }
328}
329
330impl<C: Access, D: oio::Delete> oio::Delete for CacheAwareDeleter<C, D> {
331    fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
332        self.cache.invalidate_entries_with_prefix(path);
333        self.deleter.delete(path, args)?;
334        Ok(())
335    }
336
337    async fn flush(&mut self) -> Result<usize> {
338        self.deleter.flush().await
339    }
340}
341
342fn to_output_reader<R: Read + 'static>(input: (RpRead, R)) -> (RpRead, Reader) {
343    (input.0, Box::new(input.1))
344}
345
346#[cfg(test)]
347mod tests {
348    use super::*;
349
350    #[test]
351    fn test_can_cache() {
352        assert!(can_cache("test"));
353        assert!(can_cache("a/b/c.parquet"));
354        assert!(can_cache("1.json"));
355        assert!(can_cache("100.checkpoint"));
356        assert!(can_cache("test/last_checkpoint"));
357        assert!(!can_cache("test/__last_checkpoint"));
358        assert!(!can_cache("a/b/c/__last_checkpoint"));
359    }
360}