common_meta/cache/table/
table_name.rs1use std::sync::Arc;
16
17use futures::future::BoxFuture;
18use moka::future::Cache;
19use snafu::OptionExt;
20use table::metadata::TableId;
21use table::table_name::TableName;
22
23use crate::cache::{CacheContainer, Initializer};
24use crate::error;
25use crate::error::Result;
26use crate::instruction::CacheIdent;
27use crate::key::table_name::{TableNameKey, TableNameManager, TableNameManagerRef};
28use crate::kv_backend::KvBackendRef;
29
30pub type TableNameCache = CacheContainer<TableName, TableId, CacheIdent>;
32
33pub type TableNameCacheRef = Arc<TableNameCache>;
34
35pub fn new_table_name_cache(
37 name: String,
38 cache: Cache<TableName, TableId>,
39 kv_backend: KvBackendRef,
40) -> TableNameCache {
41 let table_name_manager = Arc::new(TableNameManager::new(kv_backend));
42 let init = init_factory(table_name_manager);
43
44 CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
45}
46
47fn init_factory(table_name_manager: TableNameManagerRef) -> Initializer<TableName, TableId> {
48 Arc::new(
49 move |TableName {
50 catalog_name,
51 schema_name,
52 table_name,
53 }| {
54 let table_name_manager = table_name_manager.clone();
55 Box::pin(async move {
56 Ok(Some(
57 table_name_manager
58 .get(TableNameKey {
59 catalog: catalog_name,
60 schema: schema_name,
61 table: table_name,
62 })
63 .await?
64 .context(error::ValueNotExistSnafu {})?
65 .table_id(),
66 ))
67 })
68 },
69 )
70}
71
72fn invalidator<'a>(
73 cache: &'a Cache<TableName, TableId>,
74 idents: &'a [&CacheIdent],
75) -> BoxFuture<'a, Result<()>> {
76 Box::pin(async move {
77 for ident in idents {
78 if let CacheIdent::TableName(table_name) = ident {
79 cache.invalidate(table_name).await
80 }
81 }
82 Ok(())
83 })
84}
85
86fn filter(ident: &CacheIdent) -> bool {
87 matches!(ident, CacheIdent::TableName(_))
88}
89
90#[cfg(test)]
91mod tests {
92 use std::sync::Arc;
93
94 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
95 use moka::future::CacheBuilder;
96
97 use super::*;
98 use crate::kv_backend::memory::MemoryKvBackend;
99 use crate::kv_backend::txn::TxnService;
100
101 #[tokio::test]
102 async fn test_cache_get() {
103 let mem_kv = Arc::new(MemoryKvBackend::default());
104 let cache = CacheBuilder::new(128).build();
105 let cache = new_table_name_cache("test".to_string(), cache, mem_kv.clone());
106 let result = cache
107 .get_by_ref(&TableName {
108 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
109 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
110 table_name: "my_table".to_string(),
111 })
112 .await
113 .unwrap();
114 assert!(result.is_none());
115 let table_name_manager = TableNameManager::new(mem_kv.clone());
117 let table_id = 1024;
118 let txn = table_name_manager
119 .build_create_txn(
120 &TableNameKey {
121 catalog: DEFAULT_CATALOG_NAME,
122 schema: DEFAULT_SCHEMA_NAME,
123 table: "my_table",
124 },
125 table_id,
126 )
127 .unwrap();
128 mem_kv.txn(txn).await.unwrap();
129 let got = cache
130 .get_by_ref(&TableName {
131 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
132 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
133 table_name: "my_table".to_string(),
134 })
135 .await
136 .unwrap()
137 .unwrap();
138 assert_eq!(got, table_id);
139 }
140
141 #[tokio::test]
142 async fn test_invalidate_cache() {
143 let mem_kv = Arc::new(MemoryKvBackend::default());
144 let cache = CacheBuilder::new(128).build();
145 let cache = new_table_name_cache("test".to_string(), cache, mem_kv.clone());
146 let table_name_manager = TableNameManager::new(mem_kv.clone());
148 let table_id = 1024;
149 let table_name = TableName {
150 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
151 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
152 table_name: "my_table".to_string(),
153 };
154 let txn = table_name_manager
155 .build_create_txn(
156 &TableNameKey {
157 catalog: DEFAULT_CATALOG_NAME,
158 schema: DEFAULT_SCHEMA_NAME,
159 table: "my_table",
160 },
161 table_id,
162 )
163 .unwrap();
164 mem_kv.txn(txn).await.unwrap();
165 let got = cache.get_by_ref(&table_name).await.unwrap().unwrap();
166 assert_eq!(got, table_id);
167
168 assert!(cache.contains_key(&table_name));
169 cache
170 .invalidate(&[CacheIdent::TableName(table_name.clone())])
171 .await
172 .unwrap();
173 assert!(!cache.contains_key(&table_name));
174 }
175}