pub mod error;
use std::sync::Arc;
use std::time::Duration;
use catalog::kvbackend::new_table_cache;
use common_meta::cache::{
new_schema_cache, new_table_flownode_set_cache, new_table_info_cache, new_table_name_cache,
new_table_route_cache, new_table_schema_cache, new_view_info_cache, CacheRegistry,
CacheRegistryBuilder, LayeredCacheRegistryBuilder,
};
use common_meta::kv_backend::KvBackendRef;
use moka::future::CacheBuilder;
use snafu::OptionExt;
use crate::error::Result;
const DEFAULT_CACHE_MAX_CAPACITY: u64 = 65536;
const DEFAULT_CACHE_TTL: Duration = Duration::from_secs(10 * 60);
const DEFAULT_CACHE_TTI: Duration = Duration::from_secs(5 * 60);
pub const TABLE_INFO_CACHE_NAME: &str = "table_info_cache";
pub const VIEW_INFO_CACHE_NAME: &str = "view_info_cache";
pub const TABLE_NAME_CACHE_NAME: &str = "table_name_cache";
pub const TABLE_CACHE_NAME: &str = "table_cache";
pub const SCHEMA_CACHE_NAME: &str = "schema_cache";
pub const TABLE_SCHEMA_NAME_CACHE_NAME: &str = "table_schema_name_cache";
pub const TABLE_FLOWNODE_SET_CACHE_NAME: &str = "table_flownode_set_cache";
pub const TABLE_ROUTE_CACHE_NAME: &str = "table_route_cache";
pub fn build_datanode_cache_registry(kv_backend: KvBackendRef) -> CacheRegistry {
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY).build();
let table_id_schema_cache = Arc::new(new_table_schema_cache(
TABLE_SCHEMA_NAME_CACHE_NAME.to_string(),
cache,
kv_backend.clone(),
));
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
.time_to_live(DEFAULT_CACHE_TTL)
.time_to_idle(DEFAULT_CACHE_TTI)
.build();
let schema_cache = Arc::new(new_schema_cache(
SCHEMA_CACHE_NAME.to_string(),
cache,
kv_backend.clone(),
));
CacheRegistryBuilder::default()
.add_cache(table_id_schema_cache)
.add_cache(schema_cache)
.build()
}
pub fn build_fundamental_cache_registry(kv_backend: KvBackendRef) -> CacheRegistry {
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
.time_to_live(DEFAULT_CACHE_TTL)
.time_to_idle(DEFAULT_CACHE_TTI)
.build();
let table_info_cache = Arc::new(new_table_info_cache(
TABLE_INFO_CACHE_NAME.to_string(),
cache,
kv_backend.clone(),
));
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
.time_to_live(DEFAULT_CACHE_TTL)
.time_to_idle(DEFAULT_CACHE_TTI)
.build();
let table_name_cache = Arc::new(new_table_name_cache(
TABLE_NAME_CACHE_NAME.to_string(),
cache,
kv_backend.clone(),
));
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
.time_to_live(DEFAULT_CACHE_TTL)
.time_to_idle(DEFAULT_CACHE_TTI)
.build();
let table_route_cache = Arc::new(new_table_route_cache(
TABLE_ROUTE_CACHE_NAME.to_string(),
cache,
kv_backend.clone(),
));
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
.time_to_live(DEFAULT_CACHE_TTL)
.time_to_idle(DEFAULT_CACHE_TTI)
.build();
let table_flownode_set_cache = Arc::new(new_table_flownode_set_cache(
TABLE_FLOWNODE_SET_CACHE_NAME.to_string(),
cache,
kv_backend.clone(),
));
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
.time_to_live(DEFAULT_CACHE_TTL)
.time_to_idle(DEFAULT_CACHE_TTI)
.build();
let view_info_cache = Arc::new(new_view_info_cache(
VIEW_INFO_CACHE_NAME.to_string(),
cache,
kv_backend.clone(),
));
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
.time_to_live(DEFAULT_CACHE_TTL)
.time_to_idle(DEFAULT_CACHE_TTI)
.build();
let schema_cache = Arc::new(new_schema_cache(
SCHEMA_CACHE_NAME.to_string(),
cache,
kv_backend.clone(),
));
let table_id_schema_cache = Arc::new(new_table_schema_cache(
TABLE_SCHEMA_NAME_CACHE_NAME.to_string(),
CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY).build(),
kv_backend,
));
CacheRegistryBuilder::default()
.add_cache(table_info_cache)
.add_cache(table_name_cache)
.add_cache(table_route_cache)
.add_cache(view_info_cache)
.add_cache(table_flownode_set_cache)
.add_cache(schema_cache)
.add_cache(table_id_schema_cache)
.build()
}
pub fn with_default_composite_cache_registry(
builder: LayeredCacheRegistryBuilder,
) -> Result<LayeredCacheRegistryBuilder> {
let table_info_cache = builder.get().context(error::CacheRequiredSnafu {
name: TABLE_INFO_CACHE_NAME,
})?;
let table_name_cache = builder.get().context(error::CacheRequiredSnafu {
name: TABLE_NAME_CACHE_NAME,
})?;
let cache = CacheBuilder::new(DEFAULT_CACHE_MAX_CAPACITY)
.time_to_live(DEFAULT_CACHE_TTL)
.time_to_idle(DEFAULT_CACHE_TTI)
.build();
let table_cache = Arc::new(new_table_cache(
TABLE_CACHE_NAME.to_string(),
cache,
table_info_cache,
table_name_cache,
));
let registry = CacheRegistryBuilder::default()
.add_cache(table_cache)
.build();
Ok(builder.add_cache_registry(registry))
}