common_meta/
cache_invalidator.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use crate::error::Result;
18use crate::flow_name::FlowName;
19use crate::instruction::CacheIdent;
20use crate::key::flow::flow_info::FlowInfoKey;
21use crate::key::flow::flow_name::FlowNameKey;
22use crate::key::schema_name::SchemaNameKey;
23use crate::key::table_info::TableInfoKey;
24use crate::key::table_name::TableNameKey;
25use crate::key::table_route::TableRouteKey;
26use crate::key::view_info::ViewInfoKey;
27use crate::key::MetadataKey;
28
29/// KvBackend cache invalidator
30#[async_trait::async_trait]
31pub trait KvCacheInvalidator: Send + Sync {
32    async fn invalidate_key(&self, key: &[u8]);
33}
34
35pub type KvCacheInvalidatorRef = Arc<dyn KvCacheInvalidator>;
36
37pub struct DummyKvCacheInvalidator;
38
39#[async_trait::async_trait]
40impl KvCacheInvalidator for DummyKvCacheInvalidator {
41    async fn invalidate_key(&self, _key: &[u8]) {}
42}
43
44/// Places context of invalidating cache. e.g., span id, trace id etc.
45#[derive(Default)]
46pub struct Context {
47    pub subject: Option<String>,
48}
49
50#[async_trait::async_trait]
51pub trait CacheInvalidator: Send + Sync {
52    async fn invalidate(&self, ctx: &Context, caches: &[CacheIdent]) -> Result<()>;
53}
54
55pub type CacheInvalidatorRef = Arc<dyn CacheInvalidator>;
56
57pub struct DummyCacheInvalidator;
58
59#[async_trait::async_trait]
60impl CacheInvalidator for DummyCacheInvalidator {
61    async fn invalidate(&self, _ctx: &Context, _caches: &[CacheIdent]) -> Result<()> {
62        Ok(())
63    }
64}
65
66#[async_trait::async_trait]
67impl<T> CacheInvalidator for T
68where
69    T: KvCacheInvalidator,
70{
71    async fn invalidate(&self, _ctx: &Context, caches: &[CacheIdent]) -> Result<()> {
72        for cache in caches {
73            match cache {
74                CacheIdent::TableId(table_id) => {
75                    let key = TableInfoKey::new(*table_id);
76                    self.invalidate_key(&key.to_bytes()).await;
77
78                    let key = TableRouteKey::new(*table_id);
79                    self.invalidate_key(&key.to_bytes()).await;
80
81                    let key = ViewInfoKey::new(*table_id);
82                    self.invalidate_key(&key.to_bytes()).await;
83                }
84                CacheIdent::TableName(table_name) => {
85                    let key: TableNameKey = table_name.into();
86                    self.invalidate_key(&key.to_bytes()).await
87                }
88                CacheIdent::SchemaName(schema_name) => {
89                    let key: SchemaNameKey = schema_name.into();
90                    self.invalidate_key(&key.to_bytes()).await;
91                }
92                CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_) => {
93                    // Do nothing
94                }
95                CacheIdent::FlowName(FlowName {
96                    catalog_name,
97                    flow_name,
98                }) => {
99                    let key = FlowNameKey::new(catalog_name, flow_name);
100                    self.invalidate_key(&key.to_bytes()).await
101                }
102                CacheIdent::FlowId(flow_id) => {
103                    let key = FlowInfoKey::new(*flow_id);
104                    self.invalidate_key(&key.to_bytes()).await;
105                }
106            }
107        }
108        Ok(())
109    }
110}