common_meta/cache/table/
table_name.rs1use std::sync::Arc;
16
17use futures::future::BoxFuture;
18use moka::future::Cache;
19use snafu::OptionExt;
20use table::metadata::TableId;
21use table::table_name::TableName;
22
23use crate::cache::{CacheContainer, Initializer};
24use crate::error;
25use crate::error::Result;
26use crate::instruction::CacheIdent;
27use crate::key::table_name::{TableNameKey, TableNameManager, TableNameManagerRef};
28use crate::kv_backend::KvBackendRef;
29
30pub type TableNameCache = CacheContainer<TableName, TableId, CacheIdent>;
32
33pub type TableNameCacheRef = Arc<TableNameCache>;
34
35pub fn new_table_name_cache(
37 name: String,
38 cache: Cache<TableName, TableId>,
39 kv_backend: KvBackendRef,
40) -> TableNameCache {
41 let table_name_manager = Arc::new(TableNameManager::new(kv_backend));
42 let init = init_factory(table_name_manager);
43
44 CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
45}
46
47fn init_factory(table_name_manager: TableNameManagerRef) -> Initializer<TableName, TableId> {
48 Arc::new(
49 move |TableName {
50 catalog_name,
51 schema_name,
52 table_name,
53 }| {
54 let table_name_manager = table_name_manager.clone();
55 Box::pin(async move {
56 Ok(Some(
57 table_name_manager
58 .get(TableNameKey {
59 catalog: catalog_name,
60 schema: schema_name,
61 table: table_name,
62 })
63 .await?
64 .context(error::ValueNotExistSnafu {})?
65 .table_id(),
66 ))
67 })
68 },
69 )
70}
71
72fn invalidator<'a>(
73 cache: &'a Cache<TableName, TableId>,
74 ident: &'a CacheIdent,
75) -> BoxFuture<'a, Result<()>> {
76 Box::pin(async move {
77 if let CacheIdent::TableName(table_name) = ident {
78 cache.invalidate(table_name).await
79 }
80 Ok(())
81 })
82}
83
84fn filter(ident: &CacheIdent) -> bool {
85 matches!(ident, CacheIdent::TableName(_))
86}
87
88#[cfg(test)]
89mod tests {
90 use std::sync::Arc;
91
92 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
93 use moka::future::CacheBuilder;
94
95 use super::*;
96 use crate::kv_backend::memory::MemoryKvBackend;
97 use crate::kv_backend::txn::TxnService;
98
99 #[tokio::test]
100 async fn test_cache_get() {
101 let mem_kv = Arc::new(MemoryKvBackend::default());
102 let cache = CacheBuilder::new(128).build();
103 let cache = new_table_name_cache("test".to_string(), cache, mem_kv.clone());
104 let result = cache
105 .get_by_ref(&TableName {
106 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
107 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
108 table_name: "my_table".to_string(),
109 })
110 .await
111 .unwrap();
112 assert!(result.is_none());
113 let table_name_manager = TableNameManager::new(mem_kv.clone());
115 let table_id = 1024;
116 let txn = table_name_manager
117 .build_create_txn(
118 &TableNameKey {
119 catalog: DEFAULT_CATALOG_NAME,
120 schema: DEFAULT_SCHEMA_NAME,
121 table: "my_table",
122 },
123 table_id,
124 )
125 .unwrap();
126 mem_kv.txn(txn).await.unwrap();
127 let got = cache
128 .get_by_ref(&TableName {
129 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
130 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
131 table_name: "my_table".to_string(),
132 })
133 .await
134 .unwrap()
135 .unwrap();
136 assert_eq!(got, table_id);
137 }
138
139 #[tokio::test]
140 async fn test_invalidate_cache() {
141 let mem_kv = Arc::new(MemoryKvBackend::default());
142 let cache = CacheBuilder::new(128).build();
143 let cache = new_table_name_cache("test".to_string(), cache, mem_kv.clone());
144 let table_name_manager = TableNameManager::new(mem_kv.clone());
146 let table_id = 1024;
147 let table_name = TableName {
148 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
149 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
150 table_name: "my_table".to_string(),
151 };
152 let txn = table_name_manager
153 .build_create_txn(
154 &TableNameKey {
155 catalog: DEFAULT_CATALOG_NAME,
156 schema: DEFAULT_SCHEMA_NAME,
157 table: "my_table",
158 },
159 table_id,
160 )
161 .unwrap();
162 mem_kv.txn(txn).await.unwrap();
163 let got = cache.get_by_ref(&table_name).await.unwrap().unwrap();
164 assert_eq!(got, table_id);
165
166 assert!(cache.contains_key(&table_name));
167 cache
168 .invalidate(&[CacheIdent::TableName(table_name.clone())])
169 .await
170 .unwrap();
171 assert!(!cache.contains_key(&table_name));
172 }
173}