common_meta/
cache_invalidator.rs1use std::sync::Arc;
16
17use crate::error::Result;
18use crate::flow_name::FlowName;
19use crate::instruction::{CacheIdent, DropFlow};
20use crate::key::flow::flow_info::FlowInfoKey;
21use crate::key::flow::flow_name::FlowNameKey;
22use crate::key::flow::flow_route::FlowRouteKey;
23use crate::key::flow::flownode_flow::FlownodeFlowKey;
24use crate::key::flow::table_flow::TableFlowKey;
25use crate::key::node_address::NodeAddressKey;
26use crate::key::schema_name::SchemaNameKey;
27use crate::key::table_info::TableInfoKey;
28use crate::key::table_name::TableNameKey;
29use crate::key::table_route::TableRouteKey;
30use crate::key::view_info::ViewInfoKey;
31use crate::key::MetadataKey;
32
33#[async_trait::async_trait]
35pub trait KvCacheInvalidator: Send + Sync {
36 async fn invalidate_key(&self, key: &[u8]);
37}
38
39pub type KvCacheInvalidatorRef = Arc<dyn KvCacheInvalidator>;
40
41pub struct DummyKvCacheInvalidator;
42
43#[async_trait::async_trait]
44impl KvCacheInvalidator for DummyKvCacheInvalidator {
45 async fn invalidate_key(&self, _key: &[u8]) {}
46}
47
48#[derive(Default)]
50pub struct Context {
51 pub subject: Option<String>,
52}
53
54#[async_trait::async_trait]
55pub trait CacheInvalidator: Send + Sync {
56 async fn invalidate(&self, ctx: &Context, caches: &[CacheIdent]) -> Result<()>;
57
58 fn name(&self) -> &'static str {
59 std::any::type_name::<Self>()
60 }
61}
62
63pub type CacheInvalidatorRef = Arc<dyn CacheInvalidator>;
64
65pub struct DummyCacheInvalidator;
66
67#[async_trait::async_trait]
68impl CacheInvalidator for DummyCacheInvalidator {
69 async fn invalidate(&self, _ctx: &Context, _caches: &[CacheIdent]) -> Result<()> {
70 Ok(())
71 }
72}
73
74#[async_trait::async_trait]
75impl<T> CacheInvalidator for T
76where
77 T: KvCacheInvalidator,
78{
79 async fn invalidate(&self, _ctx: &Context, caches: &[CacheIdent]) -> Result<()> {
80 for cache in caches {
81 match cache {
82 CacheIdent::TableId(table_id) => {
83 let key = TableInfoKey::new(*table_id);
84 self.invalidate_key(&key.to_bytes()).await;
85
86 let key = TableRouteKey::new(*table_id);
87 self.invalidate_key(&key.to_bytes()).await;
88
89 let key = ViewInfoKey::new(*table_id);
90 self.invalidate_key(&key.to_bytes()).await;
91 }
92 CacheIdent::TableName(table_name) => {
93 let key: TableNameKey = table_name.into();
94 self.invalidate_key(&key.to_bytes()).await
95 }
96 CacheIdent::SchemaName(schema_name) => {
97 let key: SchemaNameKey = schema_name.into();
98 self.invalidate_key(&key.to_bytes()).await;
99 }
100 CacheIdent::CreateFlow(_) => {
101 }
103 CacheIdent::DropFlow(DropFlow {
104 flow_id,
105 source_table_ids,
106 flow_part2node_id,
107 }) => {
108 let mut keys = Vec::with_capacity(
110 source_table_ids.len() * flow_part2node_id.len()
111 + flow_part2node_id.len() * 2,
112 );
113 for table_id in source_table_ids {
114 for (partition_id, node_id) in flow_part2node_id {
115 let key =
116 TableFlowKey::new(*table_id, *node_id, *flow_id, *partition_id)
117 .to_bytes();
118 keys.push(key);
119 }
120 }
121
122 for (partition_id, node_id) in flow_part2node_id {
123 let key =
124 FlownodeFlowKey::new(*node_id, *flow_id, *partition_id).to_bytes();
125 keys.push(key);
126 let key = FlowRouteKey::new(*flow_id, *partition_id).to_bytes();
127 keys.push(key);
128 }
129
130 for key in keys {
131 self.invalidate_key(&key).await;
132 }
133 }
134 CacheIdent::FlowName(FlowName {
135 catalog_name,
136 flow_name,
137 }) => {
138 let key = FlowNameKey::new(catalog_name, flow_name);
139 self.invalidate_key(&key.to_bytes()).await
140 }
141 CacheIdent::FlowId(flow_id) => {
142 let key = FlowInfoKey::new(*flow_id);
143 self.invalidate_key(&key.to_bytes()).await;
144 }
145 CacheIdent::FlowNodeAddressChange(node_id) => {
146 common_telemetry::info!("Invalidate flow node cache for node_id: {}", node_id);
149 let key = NodeAddressKey::with_flownode(*node_id);
150 self.invalidate_key(&key.to_bytes()).await;
151 }
152 }
153 }
154 Ok(())
155 }
156}