common_meta/
cache_invalidator.rs1use std::sync::Arc;
16
17use crate::error::Result;
18use crate::flow_name::FlowName;
19use crate::instruction::{CacheIdent, DropFlow};
20use crate::key::flow::flow_info::FlowInfoKey;
21use crate::key::flow::flow_name::FlowNameKey;
22use crate::key::flow::flow_route::FlowRouteKey;
23use crate::key::flow::flownode_flow::FlownodeFlowKey;
24use crate::key::flow::table_flow::TableFlowKey;
25use crate::key::schema_name::SchemaNameKey;
26use crate::key::table_info::TableInfoKey;
27use crate::key::table_name::TableNameKey;
28use crate::key::table_route::TableRouteKey;
29use crate::key::view_info::ViewInfoKey;
30use crate::key::MetadataKey;
31
32#[async_trait::async_trait]
34pub trait KvCacheInvalidator: Send + Sync {
35 async fn invalidate_key(&self, key: &[u8]);
36}
37
38pub type KvCacheInvalidatorRef = Arc<dyn KvCacheInvalidator>;
39
40pub struct DummyKvCacheInvalidator;
41
42#[async_trait::async_trait]
43impl KvCacheInvalidator for DummyKvCacheInvalidator {
44 async fn invalidate_key(&self, _key: &[u8]) {}
45}
46
47#[derive(Default)]
49pub struct Context {
50 pub subject: Option<String>,
51}
52
53#[async_trait::async_trait]
54pub trait CacheInvalidator: Send + Sync {
55 async fn invalidate(&self, ctx: &Context, caches: &[CacheIdent]) -> Result<()>;
56}
57
58pub type CacheInvalidatorRef = Arc<dyn CacheInvalidator>;
59
60pub struct DummyCacheInvalidator;
61
62#[async_trait::async_trait]
63impl CacheInvalidator for DummyCacheInvalidator {
64 async fn invalidate(&self, _ctx: &Context, _caches: &[CacheIdent]) -> Result<()> {
65 Ok(())
66 }
67}
68
69#[async_trait::async_trait]
70impl<T> CacheInvalidator for T
71where
72 T: KvCacheInvalidator,
73{
74 async fn invalidate(&self, _ctx: &Context, caches: &[CacheIdent]) -> Result<()> {
75 for cache in caches {
76 match cache {
77 CacheIdent::TableId(table_id) => {
78 let key = TableInfoKey::new(*table_id);
79 self.invalidate_key(&key.to_bytes()).await;
80
81 let key = TableRouteKey::new(*table_id);
82 self.invalidate_key(&key.to_bytes()).await;
83
84 let key = ViewInfoKey::new(*table_id);
85 self.invalidate_key(&key.to_bytes()).await;
86 }
87 CacheIdent::TableName(table_name) => {
88 let key: TableNameKey = table_name.into();
89 self.invalidate_key(&key.to_bytes()).await
90 }
91 CacheIdent::SchemaName(schema_name) => {
92 let key: SchemaNameKey = schema_name.into();
93 self.invalidate_key(&key.to_bytes()).await;
94 }
95 CacheIdent::CreateFlow(_) => {
96 }
98 CacheIdent::DropFlow(DropFlow {
99 flow_id,
100 source_table_ids,
101 flow_part2node_id,
102 }) => {
103 let mut keys = Vec::with_capacity(
105 source_table_ids.len() * flow_part2node_id.len()
106 + flow_part2node_id.len() * 2,
107 );
108 for table_id in source_table_ids {
109 for (partition_id, node_id) in flow_part2node_id {
110 let key =
111 TableFlowKey::new(*table_id, *node_id, *flow_id, *partition_id)
112 .to_bytes();
113 keys.push(key);
114 }
115 }
116
117 for (partition_id, node_id) in flow_part2node_id {
118 let key =
119 FlownodeFlowKey::new(*node_id, *flow_id, *partition_id).to_bytes();
120 keys.push(key);
121 let key = FlowRouteKey::new(*flow_id, *partition_id).to_bytes();
122 keys.push(key);
123 }
124
125 for key in keys {
126 self.invalidate_key(&key).await;
127 }
128 }
129 CacheIdent::FlowName(FlowName {
130 catalog_name,
131 flow_name,
132 }) => {
133 let key = FlowNameKey::new(catalog_name, flow_name);
134 self.invalidate_key(&key.to_bytes()).await
135 }
136 CacheIdent::FlowId(flow_id) => {
137 let key = FlowInfoKey::new(*flow_id);
138 self.invalidate_key(&key.to_bytes()).await;
139 }
140 }
141 }
142 Ok(())
143 }
144}