common_meta/cache/table/
table_info.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use futures::future::BoxFuture;
18use moka::future::Cache;
19use snafu::{OptionExt, ResultExt};
20use store_api::storage::TableId;
21use table::metadata::TableInfo;
22
23use crate::cache::{CacheContainer, Initializer};
24use crate::error;
25use crate::error::Result;
26use crate::instruction::CacheIdent;
27use crate::key::table_info::{TableInfoManager, TableInfoManagerRef};
28use crate::kv_backend::KvBackendRef;
29
30/// [TableInfoCache] caches the [TableId] to [TableInfo] mapping.
31pub type TableInfoCache = CacheContainer<TableId, Arc<TableInfo>, CacheIdent>;
32
33pub type TableInfoCacheRef = Arc<TableInfoCache>;
34
35/// Constructs a [TableInfoCache].
36pub fn new_table_info_cache(
37    name: String,
38    cache: Cache<TableId, Arc<TableInfo>>,
39    kv_backend: KvBackendRef,
40) -> TableInfoCache {
41    let table_info_manager = Arc::new(TableInfoManager::new(kv_backend));
42    let init = init_factory(table_info_manager);
43
44    CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
45}
46
47fn init_factory(table_info_manager: TableInfoManagerRef) -> Initializer<TableId, Arc<TableInfo>> {
48    Arc::new(move |table_id| {
49        let table_info_manager = table_info_manager.clone();
50        Box::pin(async move {
51            let raw_table_info = table_info_manager
52                .get(*table_id)
53                .await?
54                .context(error::ValueNotExistSnafu {})?
55                .into_inner()
56                .table_info;
57            Ok(Some(Arc::new(
58                TableInfo::try_from(raw_table_info).context(error::ConvertRawTableInfoSnafu)?,
59            )))
60        })
61    })
62}
63
64fn invalidator<'a>(
65    cache: &'a Cache<TableId, Arc<TableInfo>>,
66    ident: &'a CacheIdent,
67) -> BoxFuture<'a, Result<()>> {
68    Box::pin(async move {
69        if let CacheIdent::TableId(table_id) = ident {
70            cache.invalidate(table_id).await
71        }
72        Ok(())
73    })
74}
75
76fn filter(ident: &CacheIdent) -> bool {
77    matches!(ident, CacheIdent::TableId(_))
78}
79
80#[cfg(test)]
81mod tests {
82    use std::collections::HashMap;
83    use std::sync::Arc;
84
85    use moka::future::CacheBuilder;
86
87    use super::*;
88    use crate::ddl::test_util::create_table::test_create_table_task;
89    use crate::key::table_route::TableRouteValue;
90    use crate::key::TableMetadataManager;
91    use crate::kv_backend::memory::MemoryKvBackend;
92
93    #[tokio::test]
94    async fn test_cache() {
95        let mem_kv = Arc::new(MemoryKvBackend::default());
96        let table_metadata_manager = TableMetadataManager::new(mem_kv.clone());
97        let cache = CacheBuilder::new(128).build();
98        let cache = new_table_info_cache("test".to_string(), cache, mem_kv.clone());
99
100        let result = cache.get(1024).await.unwrap();
101        assert!(result.is_none());
102        let task = test_create_table_task("my_table", 1024);
103        table_metadata_manager
104            .create_table_metadata(
105                task.table_info.clone(),
106                TableRouteValue::physical(vec![]),
107                HashMap::new(),
108            )
109            .await
110            .unwrap();
111        let table_info = cache.get(1024).await.unwrap().unwrap();
112        assert_eq!(*table_info, TableInfo::try_from(task.table_info).unwrap());
113
114        assert!(cache.contains_key(&1024));
115        cache
116            .invalidate(&[CacheIdent::TableId(1024)])
117            .await
118            .unwrap();
119        assert!(!cache.contains_key(&1024));
120    }
121}