Skip to main content

common_meta/cache/table/
table_name.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use futures::future::BoxFuture;
18use moka::future::Cache;
19use snafu::OptionExt;
20use table::metadata::TableId;
21use table::table_name::TableName;
22
23use crate::cache::{CacheContainer, Initializer};
24use crate::error;
25use crate::error::Result;
26use crate::instruction::CacheIdent;
27use crate::key::table_name::{TableNameKey, TableNameManager, TableNameManagerRef};
28use crate::kv_backend::KvBackendRef;
29
30/// [TableNameCache] caches the [TableName] to [TableId] mapping.
31pub type TableNameCache = CacheContainer<TableName, TableId, CacheIdent>;
32
33pub type TableNameCacheRef = Arc<TableNameCache>;
34
35/// Constructs a [TableNameCache].
36pub fn new_table_name_cache(
37    name: String,
38    cache: Cache<TableName, TableId>,
39    kv_backend: KvBackendRef,
40) -> TableNameCache {
41    let table_name_manager = Arc::new(TableNameManager::new(kv_backend));
42    let init = init_factory(table_name_manager);
43
44    CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
45}
46
47fn init_factory(table_name_manager: TableNameManagerRef) -> Initializer<TableName, TableId> {
48    Arc::new(
49        move |TableName {
50                  catalog_name,
51                  schema_name,
52                  table_name,
53              }| {
54            let table_name_manager = table_name_manager.clone();
55            Box::pin(async move {
56                Ok(Some(
57                    table_name_manager
58                        .get(TableNameKey {
59                            catalog: catalog_name,
60                            schema: schema_name,
61                            table: table_name,
62                        })
63                        .await?
64                        .context(error::ValueNotExistSnafu {})?
65                        .table_id(),
66                ))
67            })
68        },
69    )
70}
71
72fn invalidator<'a>(
73    cache: &'a Cache<TableName, TableId>,
74    idents: &'a [&CacheIdent],
75) -> BoxFuture<'a, Result<()>> {
76    Box::pin(async move {
77        for ident in idents {
78            if let CacheIdent::TableName(table_name) = ident {
79                cache.invalidate(table_name).await
80            }
81        }
82        Ok(())
83    })
84}
85
86fn filter(ident: &CacheIdent) -> bool {
87    matches!(ident, CacheIdent::TableName(_))
88}
89
90#[cfg(test)]
91mod tests {
92    use std::sync::Arc;
93
94    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
95    use moka::future::CacheBuilder;
96
97    use super::*;
98    use crate::kv_backend::memory::MemoryKvBackend;
99    use crate::kv_backend::txn::TxnService;
100
101    #[tokio::test]
102    async fn test_cache_get() {
103        let mem_kv = Arc::new(MemoryKvBackend::default());
104        let cache = CacheBuilder::new(128).build();
105        let cache = new_table_name_cache("test".to_string(), cache, mem_kv.clone());
106        let result = cache
107            .get_by_ref(&TableName {
108                catalog_name: DEFAULT_CATALOG_NAME.to_string(),
109                schema_name: DEFAULT_SCHEMA_NAME.to_string(),
110                table_name: "my_table".to_string(),
111            })
112            .await
113            .unwrap();
114        assert!(result.is_none());
115        // Puts a new value.
116        let table_name_manager = TableNameManager::new(mem_kv.clone());
117        let table_id = 1024;
118        let txn = table_name_manager
119            .build_create_txn(
120                &TableNameKey {
121                    catalog: DEFAULT_CATALOG_NAME,
122                    schema: DEFAULT_SCHEMA_NAME,
123                    table: "my_table",
124                },
125                table_id,
126            )
127            .unwrap();
128        mem_kv.txn(txn).await.unwrap();
129        let got = cache
130            .get_by_ref(&TableName {
131                catalog_name: DEFAULT_CATALOG_NAME.to_string(),
132                schema_name: DEFAULT_SCHEMA_NAME.to_string(),
133                table_name: "my_table".to_string(),
134            })
135            .await
136            .unwrap()
137            .unwrap();
138        assert_eq!(got, table_id);
139    }
140
141    #[tokio::test]
142    async fn test_invalidate_cache() {
143        let mem_kv = Arc::new(MemoryKvBackend::default());
144        let cache = CacheBuilder::new(128).build();
145        let cache = new_table_name_cache("test".to_string(), cache, mem_kv.clone());
146        // Puts a new value.
147        let table_name_manager = TableNameManager::new(mem_kv.clone());
148        let table_id = 1024;
149        let table_name = TableName {
150            catalog_name: DEFAULT_CATALOG_NAME.to_string(),
151            schema_name: DEFAULT_SCHEMA_NAME.to_string(),
152            table_name: "my_table".to_string(),
153        };
154        let txn = table_name_manager
155            .build_create_txn(
156                &TableNameKey {
157                    catalog: DEFAULT_CATALOG_NAME,
158                    schema: DEFAULT_SCHEMA_NAME,
159                    table: "my_table",
160                },
161                table_id,
162            )
163            .unwrap();
164        mem_kv.txn(txn).await.unwrap();
165        let got = cache.get_by_ref(&table_name).await.unwrap().unwrap();
166        assert_eq!(got, table_id);
167
168        assert!(cache.contains_key(&table_name));
169        cache
170            .invalidate(&[CacheIdent::TableName(table_name.clone())])
171            .await
172            .unwrap();
173        assert!(!cache.contains_key(&table_name));
174    }
175}