object_store/layers/lru_cache/
read_cache.rsuse std::sync::Arc;
use common_telemetry::debug;
use futures::{FutureExt, TryStreamExt};
use moka::future::Cache;
use moka::notification::ListenerFuture;
use moka::policy::EvictionPolicy;
use opendal::raw::oio::{Read, Reader, Write};
use opendal::raw::{oio, Access, OpDelete, OpRead, OpStat, OpWrite, RpRead};
use opendal::{Error as OpendalError, ErrorKind, OperatorBuilder, Result};
use crate::metrics::{
OBJECT_STORE_LRU_CACHE_BYTES, OBJECT_STORE_LRU_CACHE_ENTRIES, OBJECT_STORE_LRU_CACHE_HIT,
OBJECT_STORE_LRU_CACHE_MISS, OBJECT_STORE_READ_ERROR,
};
const RECOVER_CACHE_LIST_CONCURRENT: usize = 8;
const READ_CACHE_DIR: &str = "cache/object/read";
#[derive(Debug, Clone, PartialEq, Eq, Copy)]
enum ReadResult {
Success(u32),
NotFound,
}
impl ReadResult {
fn size_bytes(&self) -> u32 {
match self {
ReadResult::NotFound => 0,
ReadResult::Success(size) => *size,
}
}
}
fn can_cache(path: &str) -> bool {
!path.ends_with("_last_checkpoint")
}
fn read_cache_key(path: &str, args: &OpRead) -> String {
format!(
"{READ_CACHE_DIR}/{:x}.cache-{}",
md5::compute(path),
args.range().to_header()
)
}
fn read_cache_root() -> String {
format!("/{READ_CACHE_DIR}/")
}
fn read_cache_key_prefix(path: &str) -> String {
format!("{READ_CACHE_DIR}/{:x}", md5::compute(path))
}
#[derive(Debug)]
pub(crate) struct ReadCache<C> {
file_cache: Arc<C>,
mem_cache: Cache<String, ReadResult>,
}
impl<C> Clone for ReadCache<C> {
fn clone(&self) -> Self {
Self {
file_cache: self.file_cache.clone(),
mem_cache: self.mem_cache.clone(),
}
}
}
impl<C: Access> ReadCache<C> {
pub(crate) fn new(file_cache: Arc<C>, capacity: usize) -> Self {
let file_cache_cloned = OperatorBuilder::new(file_cache.clone()).finish();
let eviction_listener =
move |read_key: Arc<String>, read_result: ReadResult, cause| -> ListenerFuture {
OBJECT_STORE_LRU_CACHE_ENTRIES.dec();
let file_cache_cloned = file_cache_cloned.clone();
async move {
if let ReadResult::Success(size) = read_result {
OBJECT_STORE_LRU_CACHE_BYTES.sub(size as i64);
let result = file_cache_cloned.delete(&read_key).await;
debug!(
"Deleted local cache file `{}`, result: {:?}, cause: {:?}.",
read_key, result, cause
);
}
}
.boxed()
};
Self {
file_cache,
mem_cache: Cache::builder()
.max_capacity(capacity as u64)
.eviction_policy(EvictionPolicy::lru())
.weigher(|_key, value: &ReadResult| -> u32 {
value.size_bytes()
})
.async_eviction_listener(eviction_listener)
.support_invalidation_closures()
.build(),
}
}
pub(crate) async fn cache_stat(&self) -> (u64, u64) {
self.mem_cache.run_pending_tasks().await;
(self.mem_cache.entry_count(), self.mem_cache.weighted_size())
}
pub(crate) fn invalidate_entries_with_prefix(&self, path: &str) {
let prefix = read_cache_key_prefix(path);
self.mem_cache
.invalidate_entries_if(move |k: &String, &_v| k.starts_with(&prefix))
.ok();
}
pub(crate) async fn recover_cache(&self) -> Result<(u64, u64)> {
let op = OperatorBuilder::new(self.file_cache.clone()).finish();
let cloned_op = op.clone();
let root = read_cache_root();
let mut entries = op
.lister_with(&root)
.await?
.map_ok(|entry| async {
let (path, mut meta) = entry.into_parts();
if !cloned_op.info().full_capability().list_has_content_length {
meta = cloned_op.stat(&path).await?;
}
Ok((path, meta))
})
.try_buffer_unordered(RECOVER_CACHE_LIST_CONCURRENT)
.try_collect::<Vec<_>>()
.await?;
while let Some((read_key, metadata)) = entries.pop() {
if !metadata.is_file() {
continue;
}
let size = metadata.content_length();
OBJECT_STORE_LRU_CACHE_ENTRIES.inc();
OBJECT_STORE_LRU_CACHE_BYTES.add(size as i64);
self.mem_cache
.insert(read_key.to_string(), ReadResult::Success(size as u32))
.await;
}
Ok(self.cache_stat().await)
}
pub(crate) async fn contains_file(&self, path: &str) -> bool {
self.mem_cache.run_pending_tasks().await;
self.mem_cache.contains_key(path)
&& self.file_cache.stat(path, OpStat::default()).await.is_ok()
}
pub(crate) async fn read_from_cache<I>(
&self,
inner: &I,
path: &str,
args: OpRead,
) -> Result<(RpRead, Reader)>
where
I: Access,
{
if !can_cache(path) {
return inner.read(path, args).await.map(to_output_reader);
}
let read_key = read_cache_key(path, &args);
let read_result = self
.mem_cache
.try_get_with(
read_key.clone(),
self.read_remote(inner, &read_key, path, args.clone()),
)
.await
.map_err(|e| OpendalError::new(e.kind(), e.to_string()))?;
match read_result {
ReadResult::Success(_) => {
match self.file_cache.read(&read_key, OpRead::default()).await {
Ok(ret) => {
OBJECT_STORE_LRU_CACHE_HIT
.with_label_values(&["success"])
.inc();
Ok(to_output_reader(ret))
}
Err(_) => {
OBJECT_STORE_LRU_CACHE_MISS.inc();
inner.read(path, args).await.map(to_output_reader)
}
}
}
ReadResult::NotFound => {
OBJECT_STORE_LRU_CACHE_HIT
.with_label_values(&["not_found"])
.inc();
Err(OpendalError::new(
ErrorKind::NotFound,
format!("File not found: {path}"),
))
}
}
}
async fn try_write_cache<I>(&self, mut reader: I::Reader, read_key: &str) -> Result<usize>
where
I: Access,
{
let (_, mut writer) = self.file_cache.write(read_key, OpWrite::new()).await?;
let mut total = 0;
loop {
let bytes = reader.read().await?;
if bytes.is_empty() {
break;
}
total += bytes.len();
writer.write(bytes).await?;
}
writer.close().await?;
Ok(total)
}
async fn read_remote<I>(
&self,
inner: &I,
read_key: &str,
path: &str,
args: OpRead,
) -> Result<ReadResult>
where
I: Access,
{
OBJECT_STORE_LRU_CACHE_MISS.inc();
let (_, reader) = inner.read(path, args).await?;
let result = self.try_write_cache::<I>(reader, read_key).await;
match result {
Ok(read_bytes) => {
OBJECT_STORE_LRU_CACHE_ENTRIES.inc();
OBJECT_STORE_LRU_CACHE_BYTES.add(read_bytes as i64);
Ok(ReadResult::Success(read_bytes as u32))
}
Err(e) if e.kind() == ErrorKind::NotFound => {
OBJECT_STORE_READ_ERROR
.with_label_values(&[e.kind().to_string().as_str()])
.inc();
OBJECT_STORE_LRU_CACHE_ENTRIES.inc();
Ok(ReadResult::NotFound)
}
Err(e) => {
OBJECT_STORE_READ_ERROR
.with_label_values(&[e.kind().to_string().as_str()])
.inc();
Err(e)
}
}
}
}
pub struct CacheAwareDeleter<C, D> {
cache: ReadCache<C>,
deleter: D,
}
impl<C: Access, D: oio::Delete> CacheAwareDeleter<C, D> {
pub(crate) fn new(cache: ReadCache<C>, deleter: D) -> Self {
Self { cache, deleter }
}
}
impl<C: Access, D: oio::Delete> oio::Delete for CacheAwareDeleter<C, D> {
fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
self.cache.invalidate_entries_with_prefix(path);
self.deleter.delete(path, args)?;
Ok(())
}
async fn flush(&mut self) -> Result<usize> {
self.deleter.flush().await
}
}
fn to_output_reader<R: Read + 'static>(input: (RpRead, R)) -> (RpRead, Reader) {
(input.0, Box::new(input.1))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_can_cache() {
assert!(can_cache("test"));
assert!(can_cache("a/b/c.parquet"));
assert!(can_cache("1.json"));
assert!(can_cache("100.checkpoint"));
assert!(can_cache("test/last_checkpoint"));
assert!(!can_cache("test/__last_checkpoint"));
assert!(!can_cache("a/b/c/__last_checkpoint"));
}
}