common_meta/
cache_invalidator.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use crate::error::Result;
18use crate::flow_name::FlowName;
19use crate::instruction::{CacheIdent, DropFlow};
20use crate::key::flow::flow_info::FlowInfoKey;
21use crate::key::flow::flow_name::FlowNameKey;
22use crate::key::flow::flow_route::FlowRouteKey;
23use crate::key::flow::flownode_flow::FlownodeFlowKey;
24use crate::key::flow::table_flow::TableFlowKey;
25use crate::key::schema_name::SchemaNameKey;
26use crate::key::table_info::TableInfoKey;
27use crate::key::table_name::TableNameKey;
28use crate::key::table_route::TableRouteKey;
29use crate::key::view_info::ViewInfoKey;
30use crate::key::MetadataKey;
31
32/// KvBackend cache invalidator
33#[async_trait::async_trait]
34pub trait KvCacheInvalidator: Send + Sync {
35    async fn invalidate_key(&self, key: &[u8]);
36}
37
38pub type KvCacheInvalidatorRef = Arc<dyn KvCacheInvalidator>;
39
40pub struct DummyKvCacheInvalidator;
41
42#[async_trait::async_trait]
43impl KvCacheInvalidator for DummyKvCacheInvalidator {
44    async fn invalidate_key(&self, _key: &[u8]) {}
45}
46
47/// Places context of invalidating cache. e.g., span id, trace id etc.
48#[derive(Default)]
49pub struct Context {
50    pub subject: Option<String>,
51}
52
53#[async_trait::async_trait]
54pub trait CacheInvalidator: Send + Sync {
55    async fn invalidate(&self, ctx: &Context, caches: &[CacheIdent]) -> Result<()>;
56}
57
58pub type CacheInvalidatorRef = Arc<dyn CacheInvalidator>;
59
60pub struct DummyCacheInvalidator;
61
62#[async_trait::async_trait]
63impl CacheInvalidator for DummyCacheInvalidator {
64    async fn invalidate(&self, _ctx: &Context, _caches: &[CacheIdent]) -> Result<()> {
65        Ok(())
66    }
67}
68
69#[async_trait::async_trait]
70impl<T> CacheInvalidator for T
71where
72    T: KvCacheInvalidator,
73{
74    async fn invalidate(&self, _ctx: &Context, caches: &[CacheIdent]) -> Result<()> {
75        for cache in caches {
76            match cache {
77                CacheIdent::TableId(table_id) => {
78                    let key = TableInfoKey::new(*table_id);
79                    self.invalidate_key(&key.to_bytes()).await;
80
81                    let key = TableRouteKey::new(*table_id);
82                    self.invalidate_key(&key.to_bytes()).await;
83
84                    let key = ViewInfoKey::new(*table_id);
85                    self.invalidate_key(&key.to_bytes()).await;
86                }
87                CacheIdent::TableName(table_name) => {
88                    let key: TableNameKey = table_name.into();
89                    self.invalidate_key(&key.to_bytes()).await
90                }
91                CacheIdent::SchemaName(schema_name) => {
92                    let key: SchemaNameKey = schema_name.into();
93                    self.invalidate_key(&key.to_bytes()).await;
94                }
95                CacheIdent::CreateFlow(_) => {
96                    // Do nothing
97                }
98                CacheIdent::DropFlow(DropFlow {
99                    flow_id,
100                    source_table_ids,
101                    flow_part2node_id,
102                }) => {
103                    // invalidate flow route/flownode flow/table flow
104                    let mut keys = Vec::with_capacity(
105                        source_table_ids.len() * flow_part2node_id.len()
106                            + flow_part2node_id.len() * 2,
107                    );
108                    for table_id in source_table_ids {
109                        for (partition_id, node_id) in flow_part2node_id {
110                            let key =
111                                TableFlowKey::new(*table_id, *node_id, *flow_id, *partition_id)
112                                    .to_bytes();
113                            keys.push(key);
114                        }
115                    }
116
117                    for (partition_id, node_id) in flow_part2node_id {
118                        let key =
119                            FlownodeFlowKey::new(*node_id, *flow_id, *partition_id).to_bytes();
120                        keys.push(key);
121                        let key = FlowRouteKey::new(*flow_id, *partition_id).to_bytes();
122                        keys.push(key);
123                    }
124
125                    for key in keys {
126                        self.invalidate_key(&key).await;
127                    }
128                }
129                CacheIdent::FlowName(FlowName {
130                    catalog_name,
131                    flow_name,
132                }) => {
133                    let key = FlowNameKey::new(catalog_name, flow_name);
134                    self.invalidate_key(&key.to_bytes()).await
135                }
136                CacheIdent::FlowId(flow_id) => {
137                    let key = FlowInfoKey::new(*flow_id);
138                    self.invalidate_key(&key.to_bytes()).await;
139                }
140            }
141        }
142        Ok(())
143    }
144}