mod cache_size;
pub(crate) mod file_cache;
pub(crate) mod index;
#[cfg(test)]
pub(crate) mod test_util;
pub(crate) mod write_cache;
use std::mem;
use std::sync::Arc;
use bytes::Bytes;
use datatypes::value::Value;
use datatypes::vectors::VectorRef;
use index::bloom_filter_index::{BloomFilterIndexCache, BloomFilterIndexCacheRef};
use moka::notification::RemovalCause;
use moka::sync::Cache;
use parquet::column::page::Page;
use parquet::file::metadata::ParquetMetaData;
use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef};
use store_api::storage::{ConcreteDataType, RegionId, TimeSeriesRowSelector};
use crate::cache::cache_size::parquet_meta_size;
use crate::cache::file_cache::{FileType, IndexKey};
use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCacheRef};
use crate::cache::write_cache::WriteCacheRef;
use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
use crate::read::Batch;
use crate::sst::file::FileId;
const SST_META_TYPE: &str = "sst_meta";
const VECTOR_TYPE: &str = "vector";
const PAGE_TYPE: &str = "page";
const FILE_TYPE: &str = "file";
const SELECTOR_RESULT_TYPE: &str = "selector_result";
#[derive(Clone)]
pub enum CacheStrategy {
EnableAll(CacheManagerRef),
Compaction(CacheManagerRef),
Disabled,
}
impl CacheStrategy {
pub async fn get_parquet_meta_data(
&self,
region_id: RegionId,
file_id: FileId,
) -> Option<Arc<ParquetMetaData>> {
match self {
CacheStrategy::EnableAll(cache_manager) => {
cache_manager
.get_parquet_meta_data(region_id, file_id)
.await
}
CacheStrategy::Compaction(cache_manager) => {
cache_manager
.get_parquet_meta_data(region_id, file_id)
.await
}
CacheStrategy::Disabled => None,
}
}
pub fn get_parquet_meta_data_from_mem_cache(
&self,
region_id: RegionId,
file_id: FileId,
) -> Option<Arc<ParquetMetaData>> {
match self {
CacheStrategy::EnableAll(cache_manager) => {
cache_manager.get_parquet_meta_data_from_mem_cache(region_id, file_id)
}
CacheStrategy::Compaction(cache_manager) => {
cache_manager.get_parquet_meta_data_from_mem_cache(region_id, file_id)
}
CacheStrategy::Disabled => None,
}
}
pub fn put_parquet_meta_data(
&self,
region_id: RegionId,
file_id: FileId,
metadata: Arc<ParquetMetaData>,
) {
match self {
CacheStrategy::EnableAll(cache_manager) => {
cache_manager.put_parquet_meta_data(region_id, file_id, metadata);
}
CacheStrategy::Compaction(cache_manager) => {
cache_manager.put_parquet_meta_data(region_id, file_id, metadata);
}
CacheStrategy::Disabled => {}
}
}
pub fn remove_parquet_meta_data(&self, region_id: RegionId, file_id: FileId) {
match self {
CacheStrategy::EnableAll(cache_manager) => {
cache_manager.remove_parquet_meta_data(region_id, file_id);
}
CacheStrategy::Compaction(cache_manager) => {
cache_manager.remove_parquet_meta_data(region_id, file_id);
}
CacheStrategy::Disabled => {}
}
}
pub fn get_repeated_vector(
&self,
data_type: &ConcreteDataType,
value: &Value,
) -> Option<VectorRef> {
match self {
CacheStrategy::EnableAll(cache_manager) => {
cache_manager.get_repeated_vector(data_type, value)
}
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
}
}
pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
if let CacheStrategy::EnableAll(cache_manager) = self {
cache_manager.put_repeated_vector(value, vector);
}
}
pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
match self {
CacheStrategy::EnableAll(cache_manager) => cache_manager.get_pages(page_key),
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
}
}
pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
if let CacheStrategy::EnableAll(cache_manager) = self {
cache_manager.put_pages(page_key, pages);
}
}
pub fn get_selector_result(
&self,
selector_key: &SelectorResultKey,
) -> Option<Arc<SelectorResultValue>> {
match self {
CacheStrategy::EnableAll(cache_manager) => {
cache_manager.get_selector_result(selector_key)
}
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
}
}
pub fn put_selector_result(
&self,
selector_key: SelectorResultKey,
result: Arc<SelectorResultValue>,
) {
if let CacheStrategy::EnableAll(cache_manager) = self {
cache_manager.put_selector_result(selector_key, result);
}
}
pub fn write_cache(&self) -> Option<&WriteCacheRef> {
match self {
CacheStrategy::EnableAll(cache_manager) => cache_manager.write_cache(),
CacheStrategy::Compaction(cache_manager) => cache_manager.write_cache(),
CacheStrategy::Disabled => None,
}
}
pub fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
match self {
CacheStrategy::EnableAll(cache_manager) => cache_manager.inverted_index_cache(),
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
}
}
pub fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
match self {
CacheStrategy::EnableAll(cache_manager) => cache_manager.bloom_filter_index_cache(),
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
}
}
pub fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
match self {
CacheStrategy::EnableAll(cache_manager) => cache_manager.puffin_metadata_cache(),
CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
}
}
}
#[derive(Default)]
pub struct CacheManager {
sst_meta_cache: Option<SstMetaCache>,
vector_cache: Option<VectorCache>,
page_cache: Option<PageCache>,
write_cache: Option<WriteCacheRef>,
index_cache: Option<InvertedIndexCacheRef>,
bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
selector_result_cache: Option<SelectorResultCache>,
}
pub type CacheManagerRef = Arc<CacheManager>;
impl CacheManager {
pub fn builder() -> CacheManagerBuilder {
CacheManagerBuilder::default()
}
pub async fn get_parquet_meta_data(
&self,
region_id: RegionId,
file_id: FileId,
) -> Option<Arc<ParquetMetaData>> {
let metadata = self.get_parquet_meta_data_from_mem_cache(region_id, file_id);
if metadata.is_some() {
return metadata;
}
let key = IndexKey::new(region_id, file_id, FileType::Parquet);
if let Some(write_cache) = &self.write_cache {
if let Some(metadata) = write_cache.file_cache().get_parquet_meta_data(key).await {
let metadata = Arc::new(metadata);
self.put_parquet_meta_data(region_id, file_id, metadata.clone());
return Some(metadata);
}
};
None
}
pub fn get_parquet_meta_data_from_mem_cache(
&self,
region_id: RegionId,
file_id: FileId,
) -> Option<Arc<ParquetMetaData>> {
self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
let value = sst_meta_cache.get(&SstMetaKey(region_id, file_id));
update_hit_miss(value, SST_META_TYPE)
})
}
pub fn put_parquet_meta_data(
&self,
region_id: RegionId,
file_id: FileId,
metadata: Arc<ParquetMetaData>,
) {
if let Some(cache) = &self.sst_meta_cache {
let key = SstMetaKey(region_id, file_id);
CACHE_BYTES
.with_label_values(&[SST_META_TYPE])
.add(meta_cache_weight(&key, &metadata).into());
cache.insert(key, metadata);
}
}
pub fn remove_parquet_meta_data(&self, region_id: RegionId, file_id: FileId) {
if let Some(cache) = &self.sst_meta_cache {
cache.remove(&SstMetaKey(region_id, file_id));
}
}
pub fn get_repeated_vector(
&self,
data_type: &ConcreteDataType,
value: &Value,
) -> Option<VectorRef> {
self.vector_cache.as_ref().and_then(|vector_cache| {
let value = vector_cache.get(&(data_type.clone(), value.clone()));
update_hit_miss(value, VECTOR_TYPE)
})
}
pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
if let Some(cache) = &self.vector_cache {
let key = (vector.data_type(), value);
CACHE_BYTES
.with_label_values(&[VECTOR_TYPE])
.add(vector_cache_weight(&key, &vector).into());
cache.insert(key, vector);
}
}
pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
self.page_cache.as_ref().and_then(|page_cache| {
let value = page_cache.get(page_key);
update_hit_miss(value, PAGE_TYPE)
})
}
pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
if let Some(cache) = &self.page_cache {
CACHE_BYTES
.with_label_values(&[PAGE_TYPE])
.add(page_cache_weight(&page_key, &pages).into());
cache.insert(page_key, pages);
}
}
pub fn get_selector_result(
&self,
selector_key: &SelectorResultKey,
) -> Option<Arc<SelectorResultValue>> {
self.selector_result_cache
.as_ref()
.and_then(|selector_result_cache| selector_result_cache.get(selector_key))
}
pub fn put_selector_result(
&self,
selector_key: SelectorResultKey,
result: Arc<SelectorResultValue>,
) {
if let Some(cache) = &self.selector_result_cache {
CACHE_BYTES
.with_label_values(&[SELECTOR_RESULT_TYPE])
.add(selector_result_cache_weight(&selector_key, &result).into());
cache.insert(selector_key, result);
}
}
pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> {
self.write_cache.as_ref()
}
pub(crate) fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
self.index_cache.as_ref()
}
pub(crate) fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
self.bloom_filter_index_cache.as_ref()
}
pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
self.puffin_metadata_cache.as_ref()
}
}
pub fn selector_result_cache_miss() {
CACHE_MISS.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
}
pub fn selector_result_cache_hit() {
CACHE_HIT.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
}
#[derive(Default)]
pub struct CacheManagerBuilder {
sst_meta_cache_size: u64,
vector_cache_size: u64,
page_cache_size: u64,
index_metadata_size: u64,
index_content_size: u64,
index_content_page_size: u64,
puffin_metadata_size: u64,
write_cache: Option<WriteCacheRef>,
selector_result_cache_size: u64,
}
impl CacheManagerBuilder {
pub fn sst_meta_cache_size(mut self, bytes: u64) -> Self {
self.sst_meta_cache_size = bytes;
self
}
pub fn vector_cache_size(mut self, bytes: u64) -> Self {
self.vector_cache_size = bytes;
self
}
pub fn page_cache_size(mut self, bytes: u64) -> Self {
self.page_cache_size = bytes;
self
}
pub fn write_cache(mut self, cache: Option<WriteCacheRef>) -> Self {
self.write_cache = cache;
self
}
pub fn index_metadata_size(mut self, bytes: u64) -> Self {
self.index_metadata_size = bytes;
self
}
pub fn index_content_size(mut self, bytes: u64) -> Self {
self.index_content_size = bytes;
self
}
pub fn index_content_page_size(mut self, bytes: u64) -> Self {
self.index_content_page_size = bytes;
self
}
pub fn puffin_metadata_size(mut self, bytes: u64) -> Self {
self.puffin_metadata_size = bytes;
self
}
pub fn selector_result_cache_size(mut self, bytes: u64) -> Self {
self.selector_result_cache_size = bytes;
self
}
pub fn build(self) -> CacheManager {
fn to_str(cause: RemovalCause) -> &'static str {
match cause {
RemovalCause::Expired => "expired",
RemovalCause::Explicit => "explicit",
RemovalCause::Replaced => "replaced",
RemovalCause::Size => "size",
}
}
let sst_meta_cache = (self.sst_meta_cache_size != 0).then(|| {
Cache::builder()
.max_capacity(self.sst_meta_cache_size)
.weigher(meta_cache_weight)
.eviction_listener(|k, v, cause| {
let size = meta_cache_weight(&k, &v);
CACHE_BYTES
.with_label_values(&[SST_META_TYPE])
.sub(size.into());
CACHE_EVICTION
.with_label_values(&[SST_META_TYPE, to_str(cause)])
.inc();
})
.build()
});
let vector_cache = (self.vector_cache_size != 0).then(|| {
Cache::builder()
.max_capacity(self.vector_cache_size)
.weigher(vector_cache_weight)
.eviction_listener(|k, v, cause| {
let size = vector_cache_weight(&k, &v);
CACHE_BYTES
.with_label_values(&[VECTOR_TYPE])
.sub(size.into());
CACHE_EVICTION
.with_label_values(&[VECTOR_TYPE, to_str(cause)])
.inc();
})
.build()
});
let page_cache = (self.page_cache_size != 0).then(|| {
Cache::builder()
.max_capacity(self.page_cache_size)
.weigher(page_cache_weight)
.eviction_listener(|k, v, cause| {
let size = page_cache_weight(&k, &v);
CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into());
CACHE_EVICTION
.with_label_values(&[PAGE_TYPE, to_str(cause)])
.inc();
})
.build()
});
let inverted_index_cache = InvertedIndexCache::new(
self.index_metadata_size,
self.index_content_size,
self.index_content_page_size,
);
let bloom_filter_index_cache = BloomFilterIndexCache::new(
self.index_metadata_size,
self.index_content_size,
self.index_content_page_size,
);
let puffin_metadata_cache =
PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES);
let selector_result_cache = (self.selector_result_cache_size != 0).then(|| {
Cache::builder()
.max_capacity(self.selector_result_cache_size)
.weigher(selector_result_cache_weight)
.eviction_listener(|k, v, cause| {
let size = selector_result_cache_weight(&k, &v);
CACHE_BYTES
.with_label_values(&[SELECTOR_RESULT_TYPE])
.sub(size.into());
CACHE_EVICTION
.with_label_values(&[SELECTOR_RESULT_TYPE, to_str(cause)])
.inc();
})
.build()
});
CacheManager {
sst_meta_cache,
vector_cache,
page_cache,
write_cache: self.write_cache,
index_cache: Some(Arc::new(inverted_index_cache)),
bloom_filter_index_cache: Some(Arc::new(bloom_filter_index_cache)),
puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
selector_result_cache,
}
}
}
fn meta_cache_weight(k: &SstMetaKey, v: &Arc<ParquetMetaData>) -> u32 {
(k.estimated_size() + parquet_meta_size(v)) as u32
}
fn vector_cache_weight(_k: &(ConcreteDataType, Value), v: &VectorRef) -> u32 {
(mem::size_of::<ConcreteDataType>() + mem::size_of::<Value>() + v.memory_size()) as u32
}
fn page_cache_weight(k: &PageKey, v: &Arc<PageValue>) -> u32 {
(k.estimated_size() + v.estimated_size()) as u32
}
fn selector_result_cache_weight(k: &SelectorResultKey, v: &Arc<SelectorResultValue>) -> u32 {
(mem::size_of_val(k) + v.estimated_size()) as u32
}
fn update_hit_miss<T>(value: Option<T>, cache_type: &str) -> Option<T> {
if value.is_some() {
CACHE_HIT.with_label_values(&[cache_type]).inc();
} else {
CACHE_MISS.with_label_values(&[cache_type]).inc();
}
value
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct SstMetaKey(RegionId, FileId);
impl SstMetaKey {
fn estimated_size(&self) -> usize {
mem::size_of::<Self>()
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ColumnPagePath {
region_id: RegionId,
file_id: FileId,
row_group_idx: usize,
column_idx: usize,
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub enum PageKey {
Compressed(ColumnPagePath),
Uncompressed(ColumnPagePath),
}
impl PageKey {
pub fn new_compressed(
region_id: RegionId,
file_id: FileId,
row_group_idx: usize,
column_idx: usize,
) -> PageKey {
PageKey::Compressed(ColumnPagePath {
region_id,
file_id,
row_group_idx,
column_idx,
})
}
pub fn new_uncompressed(
region_id: RegionId,
file_id: FileId,
row_group_idx: usize,
column_idx: usize,
) -> PageKey {
PageKey::Uncompressed(ColumnPagePath {
region_id,
file_id,
row_group_idx,
column_idx,
})
}
fn estimated_size(&self) -> usize {
mem::size_of::<Self>()
}
}
#[derive(Default)]
pub struct PageValue {
pub compressed: Bytes,
pub row_group: Vec<Page>,
}
impl PageValue {
pub fn new_compressed(bytes: Bytes) -> PageValue {
PageValue {
compressed: bytes,
row_group: vec![],
}
}
pub fn new_row_group(pages: Vec<Page>) -> PageValue {
PageValue {
compressed: Bytes::new(),
row_group: pages,
}
}
fn estimated_size(&self) -> usize {
mem::size_of::<Self>()
+ self.compressed.len()
+ self
.row_group
.iter()
.map(|page| page.buffer().len())
.sum::<usize>()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct SelectorResultKey {
pub file_id: FileId,
pub row_group_idx: usize,
pub selector: TimeSeriesRowSelector,
}
pub struct SelectorResultValue {
pub result: Vec<Batch>,
pub projection: Vec<usize>,
}
impl SelectorResultValue {
pub fn new(result: Vec<Batch>, projection: Vec<usize>) -> SelectorResultValue {
SelectorResultValue { result, projection }
}
fn estimated_size(&self) -> usize {
self.result.iter().map(|batch| batch.memory_size()).sum()
}
}
type SstMetaCache = Cache<SstMetaKey, Arc<ParquetMetaData>>;
type VectorCache = Cache<(ConcreteDataType, Value), VectorRef>;
type PageCache = Cache<PageKey, Arc<PageValue>>;
type SelectorResultCache = Cache<SelectorResultKey, Arc<SelectorResultValue>>;
#[cfg(test)]
mod tests {
use std::sync::Arc;
use datatypes::vectors::Int64Vector;
use super::*;
use crate::cache::test_util::parquet_meta;
#[tokio::test]
async fn test_disable_cache() {
let cache = CacheManager::default();
assert!(cache.sst_meta_cache.is_none());
assert!(cache.vector_cache.is_none());
assert!(cache.page_cache.is_none());
let region_id = RegionId::new(1, 1);
let file_id = FileId::random();
let metadata = parquet_meta();
cache.put_parquet_meta_data(region_id, file_id, metadata);
assert!(cache
.get_parquet_meta_data(region_id, file_id)
.await
.is_none());
let value = Value::Int64(10);
let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
cache.put_repeated_vector(value.clone(), vector.clone());
assert!(cache
.get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
.is_none());
let key = PageKey::new_uncompressed(region_id, file_id, 0, 0);
let pages = Arc::new(PageValue::default());
cache.put_pages(key.clone(), pages);
assert!(cache.get_pages(&key).is_none());
assert!(cache.write_cache().is_none());
}
#[tokio::test]
async fn test_parquet_meta_cache() {
let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
let region_id = RegionId::new(1, 1);
let file_id = FileId::random();
assert!(cache
.get_parquet_meta_data(region_id, file_id)
.await
.is_none());
let metadata = parquet_meta();
cache.put_parquet_meta_data(region_id, file_id, metadata);
assert!(cache
.get_parquet_meta_data(region_id, file_id)
.await
.is_some());
cache.remove_parquet_meta_data(region_id, file_id);
assert!(cache
.get_parquet_meta_data(region_id, file_id)
.await
.is_none());
}
#[test]
fn test_repeated_vector_cache() {
let cache = CacheManager::builder().vector_cache_size(4096).build();
let value = Value::Int64(10);
assert!(cache
.get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
.is_none());
let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
cache.put_repeated_vector(value.clone(), vector.clone());
let cached = cache
.get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
.unwrap();
assert_eq!(vector, cached);
}
#[test]
fn test_page_cache() {
let cache = CacheManager::builder().page_cache_size(1000).build();
let region_id = RegionId::new(1, 1);
let file_id = FileId::random();
let key = PageKey::new_compressed(region_id, file_id, 0, 0);
assert!(cache.get_pages(&key).is_none());
let pages = Arc::new(PageValue::default());
cache.put_pages(key.clone(), pages);
assert!(cache.get_pages(&key).is_some());
}
#[test]
fn test_selector_result_cache() {
let cache = CacheManager::builder()
.selector_result_cache_size(1000)
.build();
let file_id = FileId::random();
let key = SelectorResultKey {
file_id,
row_group_idx: 0,
selector: TimeSeriesRowSelector::LastRow,
};
assert!(cache.get_selector_result(&key).is_none());
let result = Arc::new(SelectorResultValue::new(Vec::new(), Vec::new()));
cache.put_selector_result(key, result);
assert!(cache.get_selector_result(&key).is_some());
}
}