1use std::collections::HashMap;
16use std::sync::Arc;
17
18use futures::future::BoxFuture;
19use moka::future::Cache;
20use moka::ops::compute::Op;
21use table::metadata::TableId;
22
23use crate::cache::{CacheContainer, Initializer};
24use crate::error::Result;
25use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
26use crate::key::flow::{TableFlowManager, TableFlowManagerRef};
27use crate::kv_backend::KvBackendRef;
28use crate::peer::Peer;
29use crate::FlownodeId;
30
31type FlownodeSet = Arc<HashMap<FlownodeId, Peer>>;
32
33pub type TableFlownodeSetCacheRef = Arc<TableFlownodeSetCache>;
34
35pub type TableFlownodeSetCache = CacheContainer<TableId, FlownodeSet, CacheIdent>;
37
38pub fn new_table_flownode_set_cache(
40 name: String,
41 cache: Cache<TableId, FlownodeSet>,
42 kv_backend: KvBackendRef,
43) -> TableFlownodeSetCache {
44 let table_flow_manager = Arc::new(TableFlowManager::new(kv_backend));
45 let init = init_factory(table_flow_manager);
46
47 CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
48}
49
50fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer<TableId, FlownodeSet> {
51 Arc::new(move |&table_id| {
52 let table_flow_manager = table_flow_manager.clone();
53 Box::pin(async move {
54 table_flow_manager
55 .flows(table_id)
56 .await
57 .map(|flows| {
58 flows
59 .into_iter()
60 .map(|(key, value)| (key.flownode_id(), value.peer))
61 .collect::<HashMap<_, _>>()
62 })
63 .map(Arc::new)
68 .map(Some)
69 })
70 })
71}
72
73async fn handle_create_flow(
74 cache: &Cache<TableId, FlownodeSet>,
75 CreateFlow {
76 source_table_ids,
77 flownodes: flownode_peers,
78 }: &CreateFlow,
79) {
80 for table_id in source_table_ids {
81 let entry = cache.entry(*table_id);
82 entry
83 .and_compute_with(
84 async |entry: Option<moka::Entry<u32, Arc<HashMap<u64, _>>>>| match entry {
85 Some(entry) => {
86 let mut map = entry.into_value().as_ref().clone();
87 map.extend(flownode_peers.iter().map(|peer| (peer.id, peer.clone())));
88
89 Op::Put(Arc::new(map))
90 }
91 None => Op::Put(Arc::new(HashMap::from_iter(
92 flownode_peers.iter().map(|peer| (peer.id, peer.clone())),
93 ))),
94 },
95 )
96 .await;
97 }
98}
99
100async fn handle_drop_flow(
101 cache: &Cache<TableId, FlownodeSet>,
102 DropFlow {
103 source_table_ids,
104 flownode_ids,
105 }: &DropFlow,
106) {
107 for table_id in source_table_ids {
108 let entry = cache.entry(*table_id);
109 entry
110 .and_compute_with(
111 async |entry: Option<moka::Entry<u32, Arc<HashMap<u64, _>>>>| match entry {
112 Some(entry) => {
113 let mut set = entry.into_value().as_ref().clone();
114 for flownode_id in flownode_ids {
115 set.remove(flownode_id);
116 }
117
118 Op::Put(Arc::new(set))
119 }
120 None => {
121 Op::Nop
123 }
124 },
125 )
126 .await;
127 }
128}
129
130fn invalidator<'a>(
131 cache: &'a Cache<TableId, FlownodeSet>,
132 ident: &'a CacheIdent,
133) -> BoxFuture<'a, Result<()>> {
134 Box::pin(async move {
135 match ident {
136 CacheIdent::CreateFlow(create_flow) => handle_create_flow(cache, create_flow).await,
137 CacheIdent::DropFlow(drop_flow) => handle_drop_flow(cache, drop_flow).await,
138 _ => {}
139 }
140 Ok(())
141 })
142}
143
144fn filter(ident: &CacheIdent) -> bool {
145 matches!(ident, CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_))
146}
147
148#[cfg(test)]
149mod tests {
150 use std::collections::{BTreeMap, HashMap};
151 use std::sync::Arc;
152
153 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
154 use moka::future::CacheBuilder;
155 use table::table_name::TableName;
156
157 use crate::cache::flow::table_flownode::new_table_flownode_set_cache;
158 use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
159 use crate::key::flow::flow_info::FlowInfoValue;
160 use crate::key::flow::flow_route::FlowRouteValue;
161 use crate::key::flow::FlowMetadataManager;
162 use crate::kv_backend::memory::MemoryKvBackend;
163 use crate::peer::Peer;
164
165 #[tokio::test]
166 async fn test_cache_empty_set() {
167 let mem_kv = Arc::new(MemoryKvBackend::default());
168 let cache = CacheBuilder::new(128).build();
169 let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
170 let set = cache.get(1024).await.unwrap().unwrap();
171 assert!(set.is_empty());
172 }
173
174 #[tokio::test]
175 async fn test_get() {
176 let mem_kv = Arc::new(MemoryKvBackend::default());
177 let flownode_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
178 flownode_metadata_manager
179 .create_flow_metadata(
180 1024,
181 FlowInfoValue {
182 source_table_ids: vec![1024, 1025],
183 sink_table_name: TableName {
184 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
185 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
186 table_name: "sink_table".to_string(),
187 },
188 flownode_ids: BTreeMap::from([(0, 1), (1, 2), (2, 3)]),
189 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
190 query_context: None,
191 flow_name: "my_flow".to_string(),
192 raw_sql: "sql".to_string(),
193 expire_after: Some(300),
194 comment: "comment".to_string(),
195 options: Default::default(),
196 created_time: chrono::Utc::now(),
197 updated_time: chrono::Utc::now(),
198 },
199 (1..=3)
200 .map(|i| {
201 (
202 (i - 1) as u32,
203 FlowRouteValue {
204 peer: Peer::empty(i),
205 },
206 )
207 })
208 .collect::<Vec<_>>(),
209 )
210 .await
211 .unwrap();
212 let cache = CacheBuilder::new(128).build();
213 let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
214 let set = cache.get(1024).await.unwrap().unwrap();
215 assert_eq!(
216 set.as_ref().clone(),
217 HashMap::from_iter((1..=3).map(|i| { (i, Peer::empty(i),) }))
218 );
219 let set = cache.get(1025).await.unwrap().unwrap();
220 assert_eq!(
221 set.as_ref().clone(),
222 HashMap::from_iter((1..=3).map(|i| { (i, Peer::empty(i),) }))
223 );
224 let result = cache.get(1026).await.unwrap().unwrap();
225 assert_eq!(result.len(), 0);
226 }
227
228 #[tokio::test]
229 async fn test_create_flow() {
230 let mem_kv = Arc::new(MemoryKvBackend::default());
231 let cache = CacheBuilder::new(128).build();
232 let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
233 let ident = vec![CacheIdent::CreateFlow(CreateFlow {
234 source_table_ids: vec![1024, 1025],
235 flownodes: (1..=5).map(Peer::empty).collect(),
236 })];
237 cache.invalidate(&ident).await.unwrap();
238 let set = cache.get(1024).await.unwrap().unwrap();
239 assert_eq!(set.len(), 5);
240 let set = cache.get(1025).await.unwrap().unwrap();
241 assert_eq!(set.len(), 5);
242 }
243
244 #[tokio::test]
245 async fn test_drop_flow() {
246 let mem_kv = Arc::new(MemoryKvBackend::default());
247 let cache = CacheBuilder::new(128).build();
248 let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
249 let ident = vec![
250 CacheIdent::CreateFlow(CreateFlow {
251 source_table_ids: vec![1024, 1025],
252 flownodes: (1..=5).map(Peer::empty).collect(),
253 }),
254 CacheIdent::CreateFlow(CreateFlow {
255 source_table_ids: vec![1024, 1025],
256 flownodes: (11..=12).map(Peer::empty).collect(),
257 }),
258 ];
259 cache.invalidate(&ident).await.unwrap();
260 let set = cache.get(1024).await.unwrap().unwrap();
261 assert_eq!(set.len(), 7);
262 let set = cache.get(1025).await.unwrap().unwrap();
263 assert_eq!(set.len(), 7);
264
265 let ident = vec![CacheIdent::DropFlow(DropFlow {
266 source_table_ids: vec![1024, 1025],
267 flownode_ids: vec![1, 2, 3, 4, 5],
268 })];
269 cache.invalidate(&ident).await.unwrap();
270 let set = cache.get(1024).await.unwrap().unwrap();
271 assert_eq!(
272 set.as_ref().clone(),
273 HashMap::from_iter((11..=12).map(|i| { (i, Peer::empty(i),) }))
274 );
275 let set = cache.get(1025).await.unwrap().unwrap();
276 assert_eq!(
277 set.as_ref().clone(),
278 HashMap::from_iter((11..=12).map(|i| { (i, Peer::empty(i),) }))
279 );
280 }
281}