1use std::collections::HashMap;
16use std::sync::Arc;
17
18use futures::future::BoxFuture;
19use moka::future::Cache;
20use moka::ops::compute::Op;
21use table::metadata::TableId;
22
23use crate::cache::{CacheContainer, Initializer};
24use crate::error::Result;
25use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
26use crate::key::flow::{TableFlowManager, TableFlowManagerRef};
27use crate::key::{FlowId, FlowPartitionId};
28use crate::kv_backend::KvBackendRef;
29use crate::peer::Peer;
30
31#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub struct FlowIdent {
34 pub flow_id: FlowId,
35 pub partition_id: FlowPartitionId,
36}
37
38impl FlowIdent {
39 pub fn new(flow_id: FlowId, partition_id: FlowPartitionId) -> Self {
40 Self {
41 flow_id,
42 partition_id,
43 }
44 }
45}
46
47type FlownodeFlowSet = Arc<HashMap<FlowIdent, Peer>>;
50
51pub type TableFlownodeSetCacheRef = Arc<TableFlownodeSetCache>;
52
53pub type TableFlownodeSetCache = CacheContainer<TableId, FlownodeFlowSet, CacheIdent>;
55
56pub fn new_table_flownode_set_cache(
58 name: String,
59 cache: Cache<TableId, FlownodeFlowSet>,
60 kv_backend: KvBackendRef,
61) -> TableFlownodeSetCache {
62 let table_flow_manager = Arc::new(TableFlowManager::new(kv_backend));
63 let init = init_factory(table_flow_manager);
64
65 CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
66}
67
68fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer<TableId, FlownodeFlowSet> {
69 Arc::new(move |&table_id| {
70 let table_flow_manager = table_flow_manager.clone();
71 Box::pin(async move {
72 table_flow_manager
73 .flows(table_id)
74 .await
75 .map(|flows| {
76 flows
77 .into_iter()
78 .map(|(key, value)| {
79 (
80 FlowIdent::new(key.flow_id(), key.partition_id()),
81 value.peer,
82 )
83 })
84 .collect::<HashMap<_, _>>()
85 })
86 .map(Arc::new)
91 .map(Some)
92 })
93 })
94}
95
96async fn handle_create_flow(
97 cache: &Cache<TableId, FlownodeFlowSet>,
98 CreateFlow {
99 flow_id,
100 source_table_ids,
101 partition_to_peer_mapping: flow_part2nodes,
102 }: &CreateFlow,
103) {
104 for table_id in source_table_ids {
105 let entry = cache.entry(*table_id);
106 entry
107 .and_compute_with(
108 async |entry: Option<moka::Entry<u32, FlownodeFlowSet>>| match entry {
109 Some(entry) => {
110 let mut map = entry.into_value().as_ref().clone();
111 map.extend(
112 flow_part2nodes.iter().map(|(part, peer)| {
113 (FlowIdent::new(*flow_id, *part), peer.clone())
114 }),
115 );
116
117 Op::Put(Arc::new(map))
118 }
119 None => {
120 Op::Put(Arc::new(HashMap::from_iter(flow_part2nodes.iter().map(
121 |(part, peer)| (FlowIdent::new(*flow_id, *part), peer.clone()),
122 ))))
123 }
124 },
125 )
126 .await;
127 }
128}
129
130async fn handle_drop_flow(
131 cache: &Cache<TableId, FlownodeFlowSet>,
132 DropFlow {
133 flow_id,
134 source_table_ids,
135 flow_part2node_id,
136 }: &DropFlow,
137) {
138 for table_id in source_table_ids {
139 let entry = cache.entry(*table_id);
140 entry
141 .and_compute_with(
142 async |entry: Option<moka::Entry<u32, FlownodeFlowSet>>| match entry {
143 Some(entry) => {
144 let mut set = entry.into_value().as_ref().clone();
145 for (part, _node) in flow_part2node_id {
146 let key = FlowIdent::new(*flow_id, *part);
147 set.remove(&key);
148 }
149
150 Op::Put(Arc::new(set))
151 }
152 None => {
153 Op::Nop
155 }
156 },
157 )
158 .await;
159 }
160}
161
162fn invalidator<'a>(
163 cache: &'a Cache<TableId, FlownodeFlowSet>,
164 ident: &'a CacheIdent,
165) -> BoxFuture<'a, Result<()>> {
166 Box::pin(async move {
167 match ident {
168 CacheIdent::CreateFlow(create_flow) => handle_create_flow(cache, create_flow).await,
169 CacheIdent::DropFlow(drop_flow) => handle_drop_flow(cache, drop_flow).await,
170 _ => {}
171 }
172 Ok(())
173 })
174}
175
176fn filter(ident: &CacheIdent) -> bool {
177 matches!(ident, CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_))
178}
179
180#[cfg(test)]
181mod tests {
182 use std::collections::{BTreeMap, HashMap};
183 use std::sync::Arc;
184
185 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
186 use moka::future::CacheBuilder;
187 use table::table_name::TableName;
188
189 use crate::cache::flow::table_flownode::{new_table_flownode_set_cache, FlowIdent};
190 use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
191 use crate::key::flow::flow_info::FlowInfoValue;
192 use crate::key::flow::flow_route::FlowRouteValue;
193 use crate::key::flow::FlowMetadataManager;
194 use crate::kv_backend::memory::MemoryKvBackend;
195 use crate::peer::Peer;
196
197 #[tokio::test]
198 async fn test_cache_empty_set() {
199 let mem_kv = Arc::new(MemoryKvBackend::default());
200 let cache = CacheBuilder::new(128).build();
201 let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
202 let set = cache.get(1024).await.unwrap().unwrap();
203 assert!(set.is_empty());
204 }
205
206 #[tokio::test]
207 async fn test_get() {
208 let mem_kv = Arc::new(MemoryKvBackend::default());
209 let flownode_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
210 flownode_metadata_manager
211 .create_flow_metadata(
212 1024,
213 FlowInfoValue {
214 source_table_ids: vec![1024, 1025],
215 sink_table_name: TableName {
216 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
217 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
218 table_name: "sink_table".to_string(),
219 },
220 flownode_ids: BTreeMap::from([(0, 1), (1, 2), (2, 3)]),
221 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
222 query_context: None,
223 flow_name: "my_flow".to_string(),
224 raw_sql: "sql".to_string(),
225 expire_after: Some(300),
226 comment: "comment".to_string(),
227 options: Default::default(),
228 created_time: chrono::Utc::now(),
229 updated_time: chrono::Utc::now(),
230 },
231 (1..=3)
232 .map(|i| {
233 (
234 (i - 1) as u32,
235 FlowRouteValue {
236 peer: Peer::empty(i),
237 },
238 )
239 })
240 .collect::<Vec<_>>(),
241 )
242 .await
243 .unwrap();
244 let cache = CacheBuilder::new(128).build();
245 let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
246 let set = cache.get(1024).await.unwrap().unwrap();
247 assert_eq!(
248 set.as_ref().clone(),
249 HashMap::from_iter(
250 (1..=3).map(|i| { (FlowIdent::new(1024, (i - 1) as u32), Peer::empty(i),) })
251 )
252 );
253 let set = cache.get(1025).await.unwrap().unwrap();
254 assert_eq!(
255 set.as_ref().clone(),
256 HashMap::from_iter(
257 (1..=3).map(|i| { (FlowIdent::new(1024, (i - 1) as u32), Peer::empty(i),) })
258 )
259 );
260 let result = cache.get(1026).await.unwrap().unwrap();
261 assert_eq!(result.len(), 0);
262 }
263
264 #[tokio::test]
265 async fn test_create_flow() {
266 let mem_kv = Arc::new(MemoryKvBackend::default());
267 let cache = CacheBuilder::new(128).build();
268 let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
269 let ident = vec![CacheIdent::CreateFlow(CreateFlow {
270 flow_id: 2001,
271 source_table_ids: vec![1024, 1025],
272 partition_to_peer_mapping: (1..=5).map(|i| (i as u32, Peer::empty(i + 1))).collect(),
273 })];
274 cache.invalidate(&ident).await.unwrap();
275 let set = cache.get(1024).await.unwrap().unwrap();
276 assert_eq!(set.len(), 5);
277 let set = cache.get(1025).await.unwrap().unwrap();
278 assert_eq!(set.len(), 5);
279 }
280
281 #[tokio::test]
282 async fn test_replace_flow() {
283 let mem_kv = Arc::new(MemoryKvBackend::default());
284 let cache = CacheBuilder::new(128).build();
285 let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
286 let ident = vec![CacheIdent::CreateFlow(CreateFlow {
287 flow_id: 2001,
288 source_table_ids: vec![1024, 1025],
289 partition_to_peer_mapping: (1..=5).map(|i| (i as u32, Peer::empty(i + 1))).collect(),
290 })];
291 cache.invalidate(&ident).await.unwrap();
292 let set = cache.get(1024).await.unwrap().unwrap();
293 assert_eq!(set.len(), 5);
294 let set = cache.get(1025).await.unwrap().unwrap();
295 assert_eq!(set.len(), 5);
296
297 let drop_then_create_flow = vec![
298 CacheIdent::DropFlow(DropFlow {
299 flow_id: 2001,
300 source_table_ids: vec![1024, 1025],
301 flow_part2node_id: (1..=5).map(|i| (i as u32, i + 1)).collect(),
302 }),
303 CacheIdent::CreateFlow(CreateFlow {
304 flow_id: 2001,
305 source_table_ids: vec![1026, 1027],
306 partition_to_peer_mapping: (11..=15)
307 .map(|i| (i as u32, Peer::empty(i + 1)))
308 .collect(),
309 }),
310 CacheIdent::FlowId(2001),
311 ];
312 cache.invalidate(&drop_then_create_flow).await.unwrap();
313
314 let set = cache.get(1024).await.unwrap().unwrap();
315 assert!(set.is_empty());
316
317 let expected = HashMap::from_iter(
318 (11..=15).map(|i| (FlowIdent::new(2001, i as u32), Peer::empty(i + 1))),
319 );
320 let set = cache.get(1026).await.unwrap().unwrap();
321
322 assert_eq!(set.as_ref().clone(), expected);
323
324 let set = cache.get(1027).await.unwrap().unwrap();
325
326 assert_eq!(set.as_ref().clone(), expected);
327 }
328
329 #[tokio::test]
330 async fn test_drop_flow() {
331 let mem_kv = Arc::new(MemoryKvBackend::default());
332 let cache = CacheBuilder::new(128).build();
333 let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
334 let ident = vec![
335 CacheIdent::CreateFlow(CreateFlow {
336 flow_id: 2001,
337 source_table_ids: vec![1024, 1025],
338 partition_to_peer_mapping: (1..=5)
339 .map(|i| (i as u32, Peer::empty(i + 1)))
340 .collect(),
341 }),
342 CacheIdent::CreateFlow(CreateFlow {
343 flow_id: 2002,
344 source_table_ids: vec![1024, 1025],
345 partition_to_peer_mapping: (11..=12)
346 .map(|i| (i as u32, Peer::empty(i + 1)))
347 .collect(),
348 }),
349 CacheIdent::CreateFlow(CreateFlow {
351 flow_id: 2003,
352 source_table_ids: vec![1024, 1025],
353 partition_to_peer_mapping: (1..=5)
354 .map(|i| (i as u32, Peer::empty(i + 1)))
355 .collect(),
356 }),
357 ];
358 cache.invalidate(&ident).await.unwrap();
359 let set = cache.get(1024).await.unwrap().unwrap();
360 assert_eq!(set.len(), 12);
361 let set = cache.get(1025).await.unwrap().unwrap();
362 assert_eq!(set.len(), 12);
363
364 let ident = vec![CacheIdent::DropFlow(DropFlow {
365 flow_id: 2001,
366 source_table_ids: vec![1024, 1025],
367 flow_part2node_id: (1..=5).map(|i| (i as u32, i + 1)).collect(),
368 })];
369 cache.invalidate(&ident).await.unwrap();
370 let set = cache.get(1024).await.unwrap().unwrap();
371 assert_eq!(
372 set.as_ref().clone(),
373 HashMap::from_iter(
374 (11..=12)
375 .map(|i| (FlowIdent::new(2002, i as u32), Peer::empty(i + 1)))
376 .chain((1..=5).map(|i| (FlowIdent::new(2003, i as u32), Peer::empty(i + 1))))
377 )
378 );
379 let set = cache.get(1025).await.unwrap().unwrap();
380 assert_eq!(
381 set.as_ref().clone(),
382 HashMap::from_iter(
383 (11..=12)
384 .map(|i| (FlowIdent::new(2002, i as u32), Peer::empty(i + 1)))
385 .chain((1..=5).map(|i| (FlowIdent::new(2003, i as u32), Peer::empty(i + 1))))
386 )
387 );
388 }
389}