common_meta/cache/table/
schema.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use futures_util::future::BoxFuture;
18use moka::future::Cache;
19use snafu::OptionExt;
20
21use crate::cache::{CacheContainer, Initializer};
22use crate::error::ValueNotExistSnafu;
23use crate::instruction::CacheIdent;
24use crate::key::schema_name::{SchemaManager, SchemaName, SchemaNameKey, SchemaNameValue};
25use crate::kv_backend::KvBackendRef;
26
27pub type SchemaCache = CacheContainer<SchemaName, Arc<SchemaNameValue>, CacheIdent>;
28pub type SchemaCacheRef = Arc<SchemaCache>;
29
30/// Constructs a [SchemaCache].
31pub fn new_schema_cache(
32    name: String,
33    cache: Cache<SchemaName, Arc<SchemaNameValue>>,
34    kv_backend: KvBackendRef,
35) -> SchemaCache {
36    let schema_manager = SchemaManager::new(kv_backend.clone());
37    let init = init_factory(schema_manager);
38
39    CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
40}
41
42fn init_factory(schema_manager: SchemaManager) -> Initializer<SchemaName, Arc<SchemaNameValue>> {
43    Arc::new(move |schema_name| {
44        let manager = schema_manager.clone();
45        Box::pin(async move {
46            let schema_value = manager
47                .get(SchemaNameKey {
48                    catalog: &schema_name.catalog_name,
49                    schema: &schema_name.schema_name,
50                })
51                .await?
52                .context(ValueNotExistSnafu)?
53                .into_inner();
54            Ok(Some(Arc::new(schema_value)))
55        })
56    })
57}
58
59fn invalidator<'a>(
60    cache: &'a Cache<SchemaName, Arc<SchemaNameValue>>,
61    ident: &'a CacheIdent,
62) -> BoxFuture<'a, crate::error::Result<()>> {
63    Box::pin(async move {
64        if let CacheIdent::SchemaName(schema_name) = ident {
65            cache.invalidate(schema_name).await
66        }
67        Ok(())
68    })
69}
70
71fn filter(ident: &CacheIdent) -> bool {
72    matches!(ident, CacheIdent::SchemaName(_))
73}