Skip to main content

common_meta/cache/table/
view_info.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use futures::future::BoxFuture;
18use moka::future::Cache;
19use snafu::OptionExt;
20use store_api::storage::TableId;
21
22use crate::cache::{CacheContainer, Initializer};
23use crate::error;
24use crate::error::Result;
25use crate::instruction::CacheIdent;
26use crate::key::view_info::{ViewInfoManager, ViewInfoManagerRef, ViewInfoValue};
27use crate::kv_backend::KvBackendRef;
28
29/// [ViewInfoCache] caches the [TableId] to [ViewInfoValue] mapping.
30pub type ViewInfoCache = CacheContainer<TableId, Arc<ViewInfoValue>, CacheIdent>;
31
32pub type ViewInfoCacheRef = Arc<ViewInfoCache>;
33
34/// Constructs a [ViewInfoCache].
35pub fn new_view_info_cache(
36    name: String,
37    cache: Cache<TableId, Arc<ViewInfoValue>>,
38    kv_backend: KvBackendRef,
39) -> ViewInfoCache {
40    let view_info_manager = Arc::new(ViewInfoManager::new(kv_backend));
41    let init = init_factory(view_info_manager);
42
43    CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
44}
45
46fn init_factory(view_info_manager: ViewInfoManagerRef) -> Initializer<TableId, Arc<ViewInfoValue>> {
47    Arc::new(move |view_id| {
48        let view_info_manager = view_info_manager.clone();
49        Box::pin(async move {
50            let view_info = view_info_manager
51                .get(*view_id)
52                .await?
53                .context(error::ValueNotExistSnafu {})?
54                .into_inner();
55
56            Ok(Some(Arc::new(view_info)))
57        })
58    })
59}
60
61fn invalidator<'a>(
62    cache: &'a Cache<TableId, Arc<ViewInfoValue>>,
63    idents: &'a [&CacheIdent],
64) -> BoxFuture<'a, Result<()>> {
65    Box::pin(async move {
66        for ident in idents {
67            if let CacheIdent::TableId(view_id) = ident {
68                cache.invalidate(view_id).await
69            }
70        }
71        Ok(())
72    })
73}
74
75fn filter(ident: &CacheIdent) -> bool {
76    matches!(ident, CacheIdent::TableId(_))
77}
78
79#[cfg(test)]
80mod tests {
81    use std::collections::HashSet;
82    use std::sync::Arc;
83
84    use moka::future::CacheBuilder;
85    use table::table_name::TableName;
86
87    use super::*;
88    use crate::ddl::tests::create_view::test_create_view_task;
89    use crate::key::TableMetadataManager;
90    use crate::kv_backend::memory::MemoryKvBackend;
91
92    #[tokio::test]
93    async fn test_view_info_cache() {
94        let mem_kv = Arc::new(MemoryKvBackend::default());
95        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
96        let cache = CacheBuilder::new(128).build();
97        let cache = new_view_info_cache("test".to_string(), cache, mem_kv.clone());
98
99        let result = cache.get(1024).await.unwrap();
100        assert!(result.is_none());
101        let mut task = test_create_view_task("my_view");
102        let table_names = {
103            let mut set = HashSet::new();
104            set.insert(TableName {
105                catalog_name: "greptime".to_string(),
106                schema_name: "public".to_string(),
107                table_name: "a_table".to_string(),
108            });
109            set.insert(TableName {
110                catalog_name: "greptime".to_string(),
111                schema_name: "public".to_string(),
112                table_name: "b_table".to_string(),
113            });
114            set
115        };
116        let definition = "CREATE VIEW test AS SELECT * FROM numbers";
117
118        task.view_info.ident.table_id = 1024;
119        table_metadata_manager
120            .create_view_metadata(
121                task.view_info.clone(),
122                task.create_view.logical_plan.clone(),
123                table_names,
124                vec!["a".to_string()],
125                vec!["number".to_string()],
126                definition.to_string(),
127            )
128            .await
129            .unwrap();
130
131        let view_info = cache.get(1024).await.unwrap().unwrap();
132        assert_eq!(view_info.view_info, task.create_view.logical_plan);
133        assert_eq!(
134            view_info.table_names,
135            task.create_view
136                .table_names
137                .iter()
138                .map(|t| t.clone().into())
139                .collect::<HashSet<_>>()
140        );
141        assert_eq!(view_info.definition, task.create_view.definition);
142        assert_eq!(view_info.columns, task.create_view.columns);
143        assert_eq!(view_info.plan_columns, task.create_view.plan_columns);
144
145        assert!(cache.contains_key(&1024));
146        cache
147            .invalidate(&[CacheIdent::TableId(1024)])
148            .await
149            .unwrap();
150        assert!(!cache.contains_key(&1024));
151    }
152}