common_meta/
cache_invalidator.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use crate::error::Result;
18use crate::flow_name::FlowName;
19use crate::instruction::{CacheIdent, DropFlow};
20use crate::key::flow::flow_info::FlowInfoKey;
21use crate::key::flow::flow_name::FlowNameKey;
22use crate::key::flow::flow_route::FlowRouteKey;
23use crate::key::flow::flownode_flow::FlownodeFlowKey;
24use crate::key::flow::table_flow::TableFlowKey;
25use crate::key::node_address::NodeAddressKey;
26use crate::key::schema_name::SchemaNameKey;
27use crate::key::table_info::TableInfoKey;
28use crate::key::table_name::TableNameKey;
29use crate::key::table_route::TableRouteKey;
30use crate::key::view_info::ViewInfoKey;
31use crate::key::MetadataKey;
32
33/// KvBackend cache invalidator
34#[async_trait::async_trait]
35pub trait KvCacheInvalidator: Send + Sync {
36    async fn invalidate_key(&self, key: &[u8]);
37}
38
39pub type KvCacheInvalidatorRef = Arc<dyn KvCacheInvalidator>;
40
41pub struct DummyKvCacheInvalidator;
42
43#[async_trait::async_trait]
44impl KvCacheInvalidator for DummyKvCacheInvalidator {
45    async fn invalidate_key(&self, _key: &[u8]) {}
46}
47
48/// Places context of invalidating cache. e.g., span id, trace id etc.
49#[derive(Default)]
50pub struct Context {
51    pub subject: Option<String>,
52}
53
54#[async_trait::async_trait]
55pub trait CacheInvalidator: Send + Sync {
56    async fn invalidate(&self, ctx: &Context, caches: &[CacheIdent]) -> Result<()>;
57
58    fn name(&self) -> &'static str {
59        std::any::type_name::<Self>()
60    }
61}
62
63pub type CacheInvalidatorRef = Arc<dyn CacheInvalidator>;
64
65pub struct DummyCacheInvalidator;
66
67#[async_trait::async_trait]
68impl CacheInvalidator for DummyCacheInvalidator {
69    async fn invalidate(&self, _ctx: &Context, _caches: &[CacheIdent]) -> Result<()> {
70        Ok(())
71    }
72}
73
74#[async_trait::async_trait]
75impl<T> CacheInvalidator for T
76where
77    T: KvCacheInvalidator,
78{
79    async fn invalidate(&self, _ctx: &Context, caches: &[CacheIdent]) -> Result<()> {
80        for cache in caches {
81            match cache {
82                CacheIdent::TableId(table_id) => {
83                    let key = TableInfoKey::new(*table_id);
84                    self.invalidate_key(&key.to_bytes()).await;
85
86                    let key = TableRouteKey::new(*table_id);
87                    self.invalidate_key(&key.to_bytes()).await;
88
89                    let key = ViewInfoKey::new(*table_id);
90                    self.invalidate_key(&key.to_bytes()).await;
91                }
92                CacheIdent::TableName(table_name) => {
93                    let key: TableNameKey = table_name.into();
94                    self.invalidate_key(&key.to_bytes()).await
95                }
96                CacheIdent::SchemaName(schema_name) => {
97                    let key: SchemaNameKey = schema_name.into();
98                    self.invalidate_key(&key.to_bytes()).await;
99                }
100                CacheIdent::CreateFlow(_) => {
101                    // Do nothing
102                }
103                CacheIdent::DropFlow(DropFlow {
104                    flow_id,
105                    source_table_ids,
106                    flow_part2node_id,
107                }) => {
108                    // invalidate flow route/flownode flow/table flow
109                    let mut keys = Vec::with_capacity(
110                        source_table_ids.len() * flow_part2node_id.len()
111                            + flow_part2node_id.len() * 2,
112                    );
113                    for table_id in source_table_ids {
114                        for (partition_id, node_id) in flow_part2node_id {
115                            let key =
116                                TableFlowKey::new(*table_id, *node_id, *flow_id, *partition_id)
117                                    .to_bytes();
118                            keys.push(key);
119                        }
120                    }
121
122                    for (partition_id, node_id) in flow_part2node_id {
123                        let key =
124                            FlownodeFlowKey::new(*node_id, *flow_id, *partition_id).to_bytes();
125                        keys.push(key);
126                        let key = FlowRouteKey::new(*flow_id, *partition_id).to_bytes();
127                        keys.push(key);
128                    }
129
130                    for key in keys {
131                        self.invalidate_key(&key).await;
132                    }
133                }
134                CacheIdent::FlowName(FlowName {
135                    catalog_name,
136                    flow_name,
137                }) => {
138                    let key = FlowNameKey::new(catalog_name, flow_name);
139                    self.invalidate_key(&key.to_bytes()).await
140                }
141                CacheIdent::FlowId(flow_id) => {
142                    let key = FlowInfoKey::new(*flow_id);
143                    self.invalidate_key(&key.to_bytes()).await;
144                }
145                CacheIdent::FlowNodeAddressChange(node_id) => {
146                    // other caches doesn't need to be invalidated
147                    // since this is only for flownode address change not id change
148                    common_telemetry::info!("Invalidate flow node cache for node_id: {}", node_id);
149                    let key = NodeAddressKey::with_flownode(*node_id);
150                    self.invalidate_key(&key.to_bytes()).await;
151                }
152            }
153        }
154        Ok(())
155    }
156}