catalog/kvbackend/
table_cache.rs1use std::sync::Arc;
16
17use common_meta::cache::{CacheContainer, Initializer, TableInfoCacheRef, TableNameCacheRef};
18use common_meta::error::{Result as MetaResult, ValueNotExistSnafu};
19use common_meta::instruction::CacheIdent;
20use futures::future::BoxFuture;
21use moka::future::Cache;
22use snafu::OptionExt;
23use table::dist_table::DistTable;
24use table::table_name::TableName;
25use table::TableRef;
26
27pub type TableCacheRef = Arc<TableCache>;
28
29pub type TableCache = CacheContainer<TableName, TableRef, CacheIdent>;
31
32pub fn new_table_cache(
34 name: String,
35 cache: Cache<TableName, TableRef>,
36 table_info_cache: TableInfoCacheRef,
37 table_name_cache: TableNameCacheRef,
38) -> TableCache {
39 let init = init_factory(table_info_cache, table_name_cache);
40
41 CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
42}
43
44fn init_factory(
45 table_info_cache: TableInfoCacheRef,
46 table_name_cache: TableNameCacheRef,
47) -> Initializer<TableName, TableRef> {
48 Arc::new(move |table_name| {
49 let table_info_cache = table_info_cache.clone();
50 let table_name_cache = table_name_cache.clone();
51 Box::pin(async move {
52 let table_id = table_name_cache
53 .get_by_ref(table_name)
54 .await?
55 .context(ValueNotExistSnafu)?;
56 let table_info = table_info_cache
57 .get_by_ref(&table_id)
58 .await?
59 .context(ValueNotExistSnafu)?;
60
61 Ok(Some(DistTable::table(table_info)))
62 })
63 })
64}
65
66fn invalidator<'a>(
67 cache: &'a Cache<TableName, TableRef>,
68 ident: &'a CacheIdent,
69) -> BoxFuture<'a, MetaResult<()>> {
70 Box::pin(async move {
71 if let CacheIdent::TableName(table_name) = ident {
72 cache.invalidate(table_name).await
73 }
74 Ok(())
75 })
76}
77
78fn filter(ident: &CacheIdent) -> bool {
79 matches!(ident, CacheIdent::TableName(_))
80}