Skip to main content

common_meta/cache/table/
table_info.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use futures::future::BoxFuture;
18use moka::future::Cache;
19use snafu::OptionExt;
20use store_api::storage::TableId;
21use table::metadata::TableInfo;
22
23use crate::cache::{CacheContainer, InitStrategy, Initializer};
24use crate::error;
25use crate::error::Result;
26use crate::instruction::CacheIdent;
27use crate::key::table_info::{TableInfoManager, TableInfoManagerRef};
28use crate::kv_backend::KvBackendRef;
29
30/// [TableInfoCache] caches the [TableId] to [TableInfo] mapping.
31pub type TableInfoCache = CacheContainer<TableId, Arc<TableInfo>, CacheIdent>;
32
33pub type TableInfoCacheRef = Arc<TableInfoCache>;
34
35/// Constructs a [TableInfoCache].
36pub fn new_table_info_cache(
37    name: String,
38    cache: Cache<TableId, Arc<TableInfo>>,
39    kv_backend: KvBackendRef,
40) -> TableInfoCache {
41    let table_info_manager = Arc::new(TableInfoManager::new(kv_backend));
42    let init = init_factory(table_info_manager);
43
44    CacheContainer::with_strategy(
45        name,
46        cache,
47        Box::new(invalidator),
48        init,
49        filter,
50        InitStrategy::VersionChecked,
51    )
52}
53
54fn init_factory(table_info_manager: TableInfoManagerRef) -> Initializer<TableId, Arc<TableInfo>> {
55    Arc::new(move |table_id| {
56        let table_info_manager = table_info_manager.clone();
57        Box::pin(async move {
58            let table_info = table_info_manager
59                .get(*table_id)
60                .await?
61                .context(error::ValueNotExistSnafu {})?
62                .into_inner()
63                .table_info;
64            Ok(Some(Arc::new(table_info)))
65        })
66    })
67}
68
69fn invalidator<'a>(
70    cache: &'a Cache<TableId, Arc<TableInfo>>,
71    idents: &'a [&CacheIdent],
72) -> BoxFuture<'a, Result<()>> {
73    Box::pin(async move {
74        for ident in idents {
75            if let CacheIdent::TableId(table_id) = ident {
76                cache.invalidate(table_id).await
77            }
78        }
79        Ok(())
80    })
81}
82
83fn filter(ident: &CacheIdent) -> bool {
84    matches!(ident, CacheIdent::TableId(_))
85}
86
87#[cfg(test)]
88mod tests {
89    use std::collections::HashMap;
90    use std::sync::Arc;
91
92    use moka::future::CacheBuilder;
93
94    use super::*;
95    use crate::ddl::test_util::create_table::test_create_table_task;
96    use crate::key::TableMetadataManager;
97    use crate::key::table_route::TableRouteValue;
98    use crate::kv_backend::memory::MemoryKvBackend;
99
100    #[tokio::test]
101    async fn test_cache() {
102        let mem_kv = Arc::new(MemoryKvBackend::default());
103        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
104        let cache = CacheBuilder::new(128).build();
105        let cache = new_table_info_cache("test".to_string(), cache, mem_kv.clone());
106
107        let result = cache.get(1024).await.unwrap();
108        assert!(result.is_none());
109        let task = test_create_table_task("my_table", 1024);
110        table_metadata_manager
111            .create_table_metadata(
112                task.table_info.clone(),
113                TableRouteValue::physical(vec![]),
114                HashMap::new(),
115            )
116            .await
117            .unwrap();
118        let table_info = cache.get(1024).await.unwrap().unwrap();
119        assert_eq!(*table_info, task.table_info);
120
121        assert!(cache.contains_key(&1024));
122        cache
123            .invalidate(&[CacheIdent::TableId(1024)])
124            .await
125            .unwrap();
126        assert!(!cache.contains_key(&1024));
127    }
128}