common_meta/cache/table/
table_info.rs1use std::sync::Arc;
16
17use futures::future::BoxFuture;
18use moka::future::Cache;
19use snafu::OptionExt;
20use store_api::storage::TableId;
21use table::metadata::TableInfo;
22
23use crate::cache::{CacheContainer, Initializer};
24use crate::error;
25use crate::error::Result;
26use crate::instruction::CacheIdent;
27use crate::key::table_info::{TableInfoManager, TableInfoManagerRef};
28use crate::kv_backend::KvBackendRef;
29
30pub type TableInfoCache = CacheContainer<TableId, Arc<TableInfo>, CacheIdent>;
32
33pub type TableInfoCacheRef = Arc<TableInfoCache>;
34
35pub fn new_table_info_cache(
37 name: String,
38 cache: Cache<TableId, Arc<TableInfo>>,
39 kv_backend: KvBackendRef,
40) -> TableInfoCache {
41 let table_info_manager = Arc::new(TableInfoManager::new(kv_backend));
42 let init = init_factory(table_info_manager);
43
44 CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
45}
46
47fn init_factory(table_info_manager: TableInfoManagerRef) -> Initializer<TableId, Arc<TableInfo>> {
48 Arc::new(move |table_id| {
49 let table_info_manager = table_info_manager.clone();
50 Box::pin(async move {
51 let table_info = table_info_manager
52 .get(*table_id)
53 .await?
54 .context(error::ValueNotExistSnafu {})?
55 .into_inner()
56 .table_info;
57 Ok(Some(Arc::new(table_info)))
58 })
59 })
60}
61
62fn invalidator<'a>(
63 cache: &'a Cache<TableId, Arc<TableInfo>>,
64 ident: &'a CacheIdent,
65) -> BoxFuture<'a, Result<()>> {
66 Box::pin(async move {
67 if let CacheIdent::TableId(table_id) = ident {
68 cache.invalidate(table_id).await
69 }
70 Ok(())
71 })
72}
73
74fn filter(ident: &CacheIdent) -> bool {
75 matches!(ident, CacheIdent::TableId(_))
76}
77
78#[cfg(test)]
79mod tests {
80 use std::collections::HashMap;
81 use std::sync::Arc;
82
83 use moka::future::CacheBuilder;
84
85 use super::*;
86 use crate::ddl::test_util::create_table::test_create_table_task;
87 use crate::key::TableMetadataManager;
88 use crate::key::table_route::TableRouteValue;
89 use crate::kv_backend::memory::MemoryKvBackend;
90
91 #[tokio::test]
92 async fn test_cache() {
93 let mem_kv = Arc::new(MemoryKvBackend::default());
94 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
95 let cache = CacheBuilder::new(128).build();
96 let cache = new_table_info_cache("test".to_string(), cache, mem_kv.clone());
97
98 let result = cache.get(1024).await.unwrap();
99 assert!(result.is_none());
100 let task = test_create_table_task("my_table", 1024);
101 table_metadata_manager
102 .create_table_metadata(
103 task.table_info.clone(),
104 TableRouteValue::physical(vec![]),
105 HashMap::new(),
106 )
107 .await
108 .unwrap();
109 let table_info = cache.get(1024).await.unwrap().unwrap();
110 assert_eq!(*table_info, task.table_info);
111
112 assert!(cache.contains_key(&1024));
113 cache
114 .invalidate(&[CacheIdent::TableId(1024)])
115 .await
116 .unwrap();
117 assert!(!cache.contains_key(&1024));
118 }
119}