catalog/kvbackend/
table_cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use common_meta::cache::{CacheContainer, Initializer, TableInfoCacheRef, TableNameCacheRef};
18use common_meta::error::{Result as MetaResult, ValueNotExistSnafu};
19use common_meta::instruction::CacheIdent;
20use futures::future::BoxFuture;
21use moka::future::Cache;
22use snafu::OptionExt;
23use table::dist_table::DistTable;
24use table::table_name::TableName;
25use table::TableRef;
26
27pub type TableCacheRef = Arc<TableCache>;
28
29/// [TableCache] caches the [TableName] to [TableRef] mapping.
30pub type TableCache = CacheContainer<TableName, TableRef, CacheIdent>;
31
32/// Constructs a [TableCache].
33pub fn new_table_cache(
34    name: String,
35    cache: Cache<TableName, TableRef>,
36    table_info_cache: TableInfoCacheRef,
37    table_name_cache: TableNameCacheRef,
38) -> TableCache {
39    let init = init_factory(table_info_cache, table_name_cache);
40
41    CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
42}
43
44fn init_factory(
45    table_info_cache: TableInfoCacheRef,
46    table_name_cache: TableNameCacheRef,
47) -> Initializer<TableName, TableRef> {
48    Arc::new(move |table_name| {
49        let table_info_cache = table_info_cache.clone();
50        let table_name_cache = table_name_cache.clone();
51        Box::pin(async move {
52            let table_id = table_name_cache
53                .get_by_ref(table_name)
54                .await?
55                .context(ValueNotExistSnafu)?;
56            let table_info = table_info_cache
57                .get_by_ref(&table_id)
58                .await?
59                .context(ValueNotExistSnafu)?;
60
61            Ok(Some(DistTable::table(table_info)))
62        })
63    })
64}
65
66fn invalidator<'a>(
67    cache: &'a Cache<TableName, TableRef>,
68    ident: &'a CacheIdent,
69) -> BoxFuture<'a, MetaResult<()>> {
70    Box::pin(async move {
71        if let CacheIdent::TableName(table_name) = ident {
72            cache.invalidate(table_name).await
73        }
74        Ok(())
75    })
76}
77
78fn filter(ident: &CacheIdent) -> bool {
79    matches!(ident, CacheIdent::TableName(_))
80}