common_meta/cache/table/
table_name.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use futures::future::BoxFuture;
18use moka::future::Cache;
19use snafu::OptionExt;
20use table::metadata::TableId;
21use table::table_name::TableName;
22
23use crate::cache::{CacheContainer, Initializer};
24use crate::error;
25use crate::error::Result;
26use crate::instruction::CacheIdent;
27use crate::key::table_name::{TableNameKey, TableNameManager, TableNameManagerRef};
28use crate::kv_backend::KvBackendRef;
29
30/// [TableNameCache] caches the [TableName] to [TableId] mapping.
31pub type TableNameCache = CacheContainer<TableName, TableId, CacheIdent>;
32
33pub type TableNameCacheRef = Arc<TableNameCache>;
34
35/// Constructs a [TableNameCache].
36pub fn new_table_name_cache(
37    name: String,
38    cache: Cache<TableName, TableId>,
39    kv_backend: KvBackendRef,
40) -> TableNameCache {
41    let table_name_manager = Arc::new(TableNameManager::new(kv_backend));
42    let init = init_factory(table_name_manager);
43
44    CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
45}
46
47fn init_factory(table_name_manager: TableNameManagerRef) -> Initializer<TableName, TableId> {
48    Arc::new(
49        move |TableName {
50                  catalog_name,
51                  schema_name,
52                  table_name,
53              }| {
54            let table_name_manager = table_name_manager.clone();
55            Box::pin(async move {
56                Ok(Some(
57                    table_name_manager
58                        .get(TableNameKey {
59                            catalog: catalog_name,
60                            schema: schema_name,
61                            table: table_name,
62                        })
63                        .await?
64                        .context(error::ValueNotExistSnafu {})?
65                        .table_id(),
66                ))
67            })
68        },
69    )
70}
71
72fn invalidator<'a>(
73    cache: &'a Cache<TableName, TableId>,
74    ident: &'a CacheIdent,
75) -> BoxFuture<'a, Result<()>> {
76    Box::pin(async move {
77        if let CacheIdent::TableName(table_name) = ident {
78            cache.invalidate(table_name).await
79        }
80        Ok(())
81    })
82}
83
84fn filter(ident: &CacheIdent) -> bool {
85    matches!(ident, CacheIdent::TableName(_))
86}
87
88#[cfg(test)]
89mod tests {
90    use std::sync::Arc;
91
92    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
93    use moka::future::CacheBuilder;
94
95    use super::*;
96    use crate::kv_backend::memory::MemoryKvBackend;
97    use crate::kv_backend::txn::TxnService;
98
99    #[tokio::test]
100    async fn test_cache_get() {
101        let mem_kv = Arc::new(MemoryKvBackend::default());
102        let cache = CacheBuilder::new(128).build();
103        let cache = new_table_name_cache("test".to_string(), cache, mem_kv.clone());
104        let result = cache
105            .get_by_ref(&TableName {
106                catalog_name: DEFAULT_CATALOG_NAME.to_string(),
107                schema_name: DEFAULT_SCHEMA_NAME.to_string(),
108                table_name: "my_table".to_string(),
109            })
110            .await
111            .unwrap();
112        assert!(result.is_none());
113        // Puts a new value.
114        let table_name_manager = TableNameManager::new(mem_kv.clone());
115        let table_id = 1024;
116        let txn = table_name_manager
117            .build_create_txn(
118                &TableNameKey {
119                    catalog: DEFAULT_CATALOG_NAME,
120                    schema: DEFAULT_SCHEMA_NAME,
121                    table: "my_table",
122                },
123                table_id,
124            )
125            .unwrap();
126        mem_kv.txn(txn).await.unwrap();
127        let got = cache
128            .get_by_ref(&TableName {
129                catalog_name: DEFAULT_CATALOG_NAME.to_string(),
130                schema_name: DEFAULT_SCHEMA_NAME.to_string(),
131                table_name: "my_table".to_string(),
132            })
133            .await
134            .unwrap()
135            .unwrap();
136        assert_eq!(got, table_id);
137    }
138
139    #[tokio::test]
140    async fn test_invalidate_cache() {
141        let mem_kv = Arc::new(MemoryKvBackend::default());
142        let cache = CacheBuilder::new(128).build();
143        let cache = new_table_name_cache("test".to_string(), cache, mem_kv.clone());
144        // Puts a new value.
145        let table_name_manager = TableNameManager::new(mem_kv.clone());
146        let table_id = 1024;
147        let table_name = TableName {
148            catalog_name: DEFAULT_CATALOG_NAME.to_string(),
149            schema_name: DEFAULT_SCHEMA_NAME.to_string(),
150            table_name: "my_table".to_string(),
151        };
152        let txn = table_name_manager
153            .build_create_txn(
154                &TableNameKey {
155                    catalog: DEFAULT_CATALOG_NAME,
156                    schema: DEFAULT_SCHEMA_NAME,
157                    table: "my_table",
158                },
159                table_id,
160            )
161            .unwrap();
162        mem_kv.txn(txn).await.unwrap();
163        let got = cache.get_by_ref(&table_name).await.unwrap().unwrap();
164        assert_eq!(got, table_id);
165
166        assert!(cache.contains_key(&table_name));
167        cache
168            .invalidate(&[CacheIdent::TableName(table_name.clone())])
169            .await
170            .unwrap();
171        assert!(!cache.contains_key(&table_name));
172    }
173}