object_store/layers/
lru_cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use opendal::raw::oio::Reader;
18use opendal::raw::{
19    Access, Layer, LayeredAccess, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead, RpWrite,
20};
21use opendal::Result;
22mod read_cache;
23use std::time::Instant;
24
25use common_telemetry::{error, info};
26use read_cache::ReadCache;
27
28use crate::layers::lru_cache::read_cache::CacheAwareDeleter;
29
30/// An opendal layer with local LRU file cache supporting.
31pub struct LruCacheLayer<C: Access> {
32    // The read cache
33    read_cache: ReadCache<C>,
34}
35
36impl<C: Access> Clone for LruCacheLayer<C> {
37    fn clone(&self) -> Self {
38        Self {
39            read_cache: self.read_cache.clone(),
40        }
41    }
42}
43
44impl<C: Access> LruCacheLayer<C> {
45    /// Create a [`LruCacheLayer`] with local file cache and capacity in bytes.
46    pub fn new(file_cache: Arc<C>, capacity: usize) -> Result<Self> {
47        let read_cache = ReadCache::new(file_cache, capacity);
48        Ok(Self { read_cache })
49    }
50
51    /// Recovers cache
52    pub async fn recover_cache(&self, sync: bool) {
53        let now = Instant::now();
54        let moved_read_cache = self.read_cache.clone();
55        let handle = tokio::spawn(async move {
56            match moved_read_cache.recover_cache().await {
57                Ok((entries, bytes)) => info!(
58                    "Recovered {} entries and total size {} in bytes for LruCacheLayer, cost: {:?}",
59                    entries,
60                    bytes,
61                    now.elapsed()
62                ),
63                Err(err) => error!(err; "Failed to recover file cache."),
64            }
65        });
66        if sync {
67            let _ = handle.await;
68        }
69    }
70
71    /// Returns true when the local cache contains the specific file
72    pub async fn contains_file(&self, path: &str) -> bool {
73        self.read_cache.contains_file(path).await
74    }
75
76    /// Returns the read cache statistics info `(EntryCount, SizeInBytes)`.
77    pub async fn read_cache_stat(&self) -> (u64, u64) {
78        self.read_cache.cache_stat().await
79    }
80}
81
82impl<I: Access, C: Access> Layer<I> for LruCacheLayer<C> {
83    type LayeredAccess = LruCacheAccess<I, C>;
84
85    fn layer(&self, inner: I) -> Self::LayeredAccess {
86        LruCacheAccess {
87            inner,
88            read_cache: self.read_cache.clone(),
89        }
90    }
91}
92
93#[derive(Debug)]
94pub struct LruCacheAccess<I, C> {
95    inner: I,
96    read_cache: ReadCache<C>,
97}
98
99impl<I: Access, C: Access> LayeredAccess for LruCacheAccess<I, C> {
100    type Inner = I;
101    type Reader = Reader;
102    type BlockingReader = I::BlockingReader;
103    type Writer = I::Writer;
104    type BlockingWriter = I::BlockingWriter;
105    type Lister = I::Lister;
106    type BlockingLister = I::BlockingLister;
107    type Deleter = CacheAwareDeleter<C, I::Deleter>;
108    type BlockingDeleter = I::BlockingDeleter;
109
110    fn inner(&self) -> &Self::Inner {
111        &self.inner
112    }
113
114    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
115        self.read_cache
116            .read_from_cache(&self.inner, path, args)
117            .await
118    }
119
120    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
121        let result = self.inner.write(path, args).await;
122
123        self.read_cache.invalidate_entries_with_prefix(path);
124
125        result
126    }
127
128    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
129        self.inner
130            .delete()
131            .await
132            .map(|(rp, deleter)| (rp, CacheAwareDeleter::new(self.read_cache.clone(), deleter)))
133    }
134
135    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
136        self.inner.list(path, args).await
137    }
138
139    fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
140        // TODO(dennis): support blocking read cache
141        self.inner.blocking_read(path, args)
142    }
143
144    fn blocking_write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::BlockingWriter)> {
145        let result = self.inner.blocking_write(path, args);
146
147        self.read_cache.invalidate_entries_with_prefix(path);
148
149        result
150    }
151
152    fn blocking_list(&self, path: &str, args: OpList) -> Result<(RpList, Self::BlockingLister)> {
153        self.inner.blocking_list(path, args)
154    }
155
156    fn blocking_delete(&self) -> Result<(RpDelete, Self::BlockingDeleter)> {
157        self.inner.blocking_delete()
158    }
159}