common_meta/cache/table/
table_info.rs1use std::sync::Arc;
16
17use futures::future::BoxFuture;
18use moka::future::Cache;
19use snafu::OptionExt;
20use store_api::storage::TableId;
21use table::metadata::TableInfo;
22
23use crate::cache::{CacheContainer, InitStrategy, Initializer};
24use crate::error;
25use crate::error::Result;
26use crate::instruction::CacheIdent;
27use crate::key::table_info::{TableInfoManager, TableInfoManagerRef};
28use crate::kv_backend::KvBackendRef;
29
30pub type TableInfoCache = CacheContainer<TableId, Arc<TableInfo>, CacheIdent>;
32
33pub type TableInfoCacheRef = Arc<TableInfoCache>;
34
35pub fn new_table_info_cache(
37 name: String,
38 cache: Cache<TableId, Arc<TableInfo>>,
39 kv_backend: KvBackendRef,
40) -> TableInfoCache {
41 let table_info_manager = Arc::new(TableInfoManager::new(kv_backend));
42 let init = init_factory(table_info_manager);
43
44 CacheContainer::with_strategy(
45 name,
46 cache,
47 Box::new(invalidator),
48 init,
49 filter,
50 InitStrategy::VersionChecked,
51 )
52}
53
54fn init_factory(table_info_manager: TableInfoManagerRef) -> Initializer<TableId, Arc<TableInfo>> {
55 Arc::new(move |table_id| {
56 let table_info_manager = table_info_manager.clone();
57 Box::pin(async move {
58 let table_info = table_info_manager
59 .get(*table_id)
60 .await?
61 .context(error::ValueNotExistSnafu {})?
62 .into_inner()
63 .table_info;
64 Ok(Some(Arc::new(table_info)))
65 })
66 })
67}
68
69fn invalidator<'a>(
70 cache: &'a Cache<TableId, Arc<TableInfo>>,
71 idents: &'a [&CacheIdent],
72) -> BoxFuture<'a, Result<()>> {
73 Box::pin(async move {
74 for ident in idents {
75 if let CacheIdent::TableId(table_id) = ident {
76 cache.invalidate(table_id).await
77 }
78 }
79 Ok(())
80 })
81}
82
83fn filter(ident: &CacheIdent) -> bool {
84 matches!(ident, CacheIdent::TableId(_))
85}
86
87#[cfg(test)]
88mod tests {
89 use std::collections::HashMap;
90 use std::sync::Arc;
91
92 use moka::future::CacheBuilder;
93
94 use super::*;
95 use crate::ddl::test_util::create_table::test_create_table_task;
96 use crate::key::TableMetadataManager;
97 use crate::key::table_route::TableRouteValue;
98 use crate::kv_backend::memory::MemoryKvBackend;
99
100 #[tokio::test]
101 async fn test_cache() {
102 let mem_kv = Arc::new(MemoryKvBackend::default());
103 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
104 let cache = CacheBuilder::new(128).build();
105 let cache = new_table_info_cache("test".to_string(), cache, mem_kv.clone());
106
107 let result = cache.get(1024).await.unwrap();
108 assert!(result.is_none());
109 let task = test_create_table_task("my_table", 1024);
110 table_metadata_manager
111 .create_table_metadata(
112 task.table_info.clone(),
113 TableRouteValue::physical(vec![]),
114 HashMap::new(),
115 )
116 .await
117 .unwrap();
118 let table_info = cache.get(1024).await.unwrap().unwrap();
119 assert_eq!(*table_info, task.table_info);
120
121 assert!(cache.contains_key(&1024));
122 cache
123 .invalidate(&[CacheIdent::TableId(1024)])
124 .await
125 .unwrap();
126 assert!(!cache.contains_key(&1024));
127 }
128}