common_meta/
cache_invalidator.rs1use std::sync::Arc;
16
17use crate::error::Result;
18use crate::flow_name::FlowName;
19use crate::instruction::CacheIdent;
20use crate::key::flow::flow_info::FlowInfoKey;
21use crate::key::flow::flow_name::FlowNameKey;
22use crate::key::schema_name::SchemaNameKey;
23use crate::key::table_info::TableInfoKey;
24use crate::key::table_name::TableNameKey;
25use crate::key::table_route::TableRouteKey;
26use crate::key::view_info::ViewInfoKey;
27use crate::key::MetadataKey;
28
29#[async_trait::async_trait]
31pub trait KvCacheInvalidator: Send + Sync {
32 async fn invalidate_key(&self, key: &[u8]);
33}
34
35pub type KvCacheInvalidatorRef = Arc<dyn KvCacheInvalidator>;
36
37pub struct DummyKvCacheInvalidator;
38
39#[async_trait::async_trait]
40impl KvCacheInvalidator for DummyKvCacheInvalidator {
41 async fn invalidate_key(&self, _key: &[u8]) {}
42}
43
44#[derive(Default)]
46pub struct Context {
47 pub subject: Option<String>,
48}
49
50#[async_trait::async_trait]
51pub trait CacheInvalidator: Send + Sync {
52 async fn invalidate(&self, ctx: &Context, caches: &[CacheIdent]) -> Result<()>;
53}
54
55pub type CacheInvalidatorRef = Arc<dyn CacheInvalidator>;
56
57pub struct DummyCacheInvalidator;
58
59#[async_trait::async_trait]
60impl CacheInvalidator for DummyCacheInvalidator {
61 async fn invalidate(&self, _ctx: &Context, _caches: &[CacheIdent]) -> Result<()> {
62 Ok(())
63 }
64}
65
66#[async_trait::async_trait]
67impl<T> CacheInvalidator for T
68where
69 T: KvCacheInvalidator,
70{
71 async fn invalidate(&self, _ctx: &Context, caches: &[CacheIdent]) -> Result<()> {
72 for cache in caches {
73 match cache {
74 CacheIdent::TableId(table_id) => {
75 let key = TableInfoKey::new(*table_id);
76 self.invalidate_key(&key.to_bytes()).await;
77
78 let key = TableRouteKey::new(*table_id);
79 self.invalidate_key(&key.to_bytes()).await;
80
81 let key = ViewInfoKey::new(*table_id);
82 self.invalidate_key(&key.to_bytes()).await;
83 }
84 CacheIdent::TableName(table_name) => {
85 let key: TableNameKey = table_name.into();
86 self.invalidate_key(&key.to_bytes()).await
87 }
88 CacheIdent::SchemaName(schema_name) => {
89 let key: SchemaNameKey = schema_name.into();
90 self.invalidate_key(&key.to_bytes()).await;
91 }
92 CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_) => {
93 }
95 CacheIdent::FlowName(FlowName {
96 catalog_name,
97 flow_name,
98 }) => {
99 let key = FlowNameKey::new(catalog_name, flow_name);
100 self.invalidate_key(&key.to_bytes()).await
101 }
102 CacheIdent::FlowId(flow_id) => {
103 let key = FlowInfoKey::new(*flow_id);
104 self.invalidate_key(&key.to_bytes()).await;
105 }
106 }
107 }
108 Ok(())
109 }
110}