common_meta/cache/table/
table_schema.rsuse std::sync::Arc;
use futures_util::future::BoxFuture;
use moka::future::Cache;
use snafu::OptionExt;
use store_api::storage::TableId;
use crate::cache::{CacheContainer, Initializer};
use crate::error;
use crate::instruction::CacheIdent;
use crate::key::schema_name::SchemaName;
use crate::key::table_info::TableInfoManager;
use crate::kv_backend::KvBackendRef;
pub type TableSchemaCache = CacheContainer<TableId, Arc<SchemaName>, CacheIdent>;
pub type TableSchemaCacheRef = Arc<TableSchemaCache>;
pub fn new_table_schema_cache(
name: String,
cache: Cache<TableId, Arc<SchemaName>>,
kv_backend: KvBackendRef,
) -> TableSchemaCache {
let table_info_manager = TableInfoManager::new(kv_backend);
let init = init_factory(table_info_manager);
CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
}
fn init_factory(table_info_manager: TableInfoManager) -> Initializer<TableId, Arc<SchemaName>> {
Arc::new(move |table_id| {
let table_info_manager = table_info_manager.clone();
Box::pin(async move {
let raw_table_info = table_info_manager
.get(*table_id)
.await?
.context(error::ValueNotExistSnafu)?
.into_inner()
.table_info;
Ok(Some(Arc::new(SchemaName {
catalog_name: raw_table_info.catalog_name,
schema_name: raw_table_info.schema_name,
})))
})
})
}
fn invalidator<'a>(
_cache: &'a Cache<TableId, Arc<SchemaName>>,
_ident: &'a CacheIdent,
) -> BoxFuture<'a, error::Result<()>> {
Box::pin(std::future::ready(Ok(())))
}
fn filter(_ident: &CacheIdent) -> bool {
false
}