common_meta/cache/table/
view_info.rs1use std::sync::Arc;
16
17use futures::future::BoxFuture;
18use moka::future::Cache;
19use snafu::OptionExt;
20use store_api::storage::TableId;
21
22use crate::cache::{CacheContainer, Initializer};
23use crate::error;
24use crate::error::Result;
25use crate::instruction::CacheIdent;
26use crate::key::view_info::{ViewInfoManager, ViewInfoManagerRef, ViewInfoValue};
27use crate::kv_backend::KvBackendRef;
28
29pub type ViewInfoCache = CacheContainer<TableId, Arc<ViewInfoValue>, CacheIdent>;
31
32pub type ViewInfoCacheRef = Arc<ViewInfoCache>;
33
34pub fn new_view_info_cache(
36 name: String,
37 cache: Cache<TableId, Arc<ViewInfoValue>>,
38 kv_backend: KvBackendRef,
39) -> ViewInfoCache {
40 let view_info_manager = Arc::new(ViewInfoManager::new(kv_backend));
41 let init = init_factory(view_info_manager);
42
43 CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
44}
45
46fn init_factory(view_info_manager: ViewInfoManagerRef) -> Initializer<TableId, Arc<ViewInfoValue>> {
47 Arc::new(move |view_id| {
48 let view_info_manager = view_info_manager.clone();
49 Box::pin(async move {
50 let view_info = view_info_manager
51 .get(*view_id)
52 .await?
53 .context(error::ValueNotExistSnafu {})?
54 .into_inner();
55
56 Ok(Some(Arc::new(view_info)))
57 })
58 })
59}
60
61fn invalidator<'a>(
62 cache: &'a Cache<TableId, Arc<ViewInfoValue>>,
63 ident: &'a CacheIdent,
64) -> BoxFuture<'a, Result<()>> {
65 Box::pin(async move {
66 if let CacheIdent::TableId(view_id) = ident {
67 cache.invalidate(view_id).await
68 }
69 Ok(())
70 })
71}
72
73fn filter(ident: &CacheIdent) -> bool {
74 matches!(ident, CacheIdent::TableId(_))
75}
76
77#[cfg(test)]
78mod tests {
79 use std::collections::HashSet;
80 use std::sync::Arc;
81
82 use moka::future::CacheBuilder;
83 use table::table_name::TableName;
84
85 use super::*;
86 use crate::ddl::tests::create_view::test_create_view_task;
87 use crate::key::TableMetadataManager;
88 use crate::kv_backend::memory::MemoryKvBackend;
89
90 #[tokio::test]
91 async fn test_view_info_cache() {
92 let mem_kv = Arc::new(MemoryKvBackend::default());
93 let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
94 let cache = CacheBuilder::new(128).build();
95 let cache = new_view_info_cache("test".to_string(), cache, mem_kv.clone());
96
97 let result = cache.get(1024).await.unwrap();
98 assert!(result.is_none());
99 let mut task = test_create_view_task("my_view");
100 let table_names = {
101 let mut set = HashSet::new();
102 set.insert(TableName {
103 catalog_name: "greptime".to_string(),
104 schema_name: "public".to_string(),
105 table_name: "a_table".to_string(),
106 });
107 set.insert(TableName {
108 catalog_name: "greptime".to_string(),
109 schema_name: "public".to_string(),
110 table_name: "b_table".to_string(),
111 });
112 set
113 };
114 let definition = "CREATE VIEW test AS SELECT * FROM numbers";
115
116 task.view_info.ident.table_id = 1024;
117 table_metadata_manager
118 .create_view_metadata(
119 task.view_info.clone(),
120 task.create_view.logical_plan.clone(),
121 table_names,
122 vec!["a".to_string()],
123 vec!["number".to_string()],
124 definition.to_string(),
125 )
126 .await
127 .unwrap();
128
129 let view_info = cache.get(1024).await.unwrap().unwrap();
130 assert_eq!(view_info.view_info, task.create_view.logical_plan);
131 assert_eq!(
132 view_info.table_names,
133 task.create_view
134 .table_names
135 .iter()
136 .map(|t| t.clone().into())
137 .collect::<HashSet<_>>()
138 );
139 assert_eq!(view_info.definition, task.create_view.definition);
140 assert_eq!(view_info.columns, task.create_view.columns);
141 assert_eq!(view_info.plan_columns, task.create_view.plan_columns);
142
143 assert!(cache.contains_key(&1024));
144 cache
145 .invalidate(&[CacheIdent::TableId(1024)])
146 .await
147 .unwrap();
148 assert!(!cache.contains_key(&1024));
149 }
150}