1use std::collections::HashMap;
16use std::sync::Arc;
17
18use common_telemetry::info;
19use futures::future::BoxFuture;
20use moka::future::Cache;
21use moka::ops::compute::Op;
22use table::metadata::TableId;
23
24use crate::cache::{CacheContainer, Initializer};
25use crate::error::Result;
26use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
27use crate::key::flow::{TableFlowManager, TableFlowManagerRef};
28use crate::key::{FlowId, FlowPartitionId};
29use crate::kv_backend::KvBackendRef;
30use crate::peer::Peer;
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct FlowIdent {
35 pub flow_id: FlowId,
36 pub partition_id: FlowPartitionId,
37}
38
39impl FlowIdent {
40 pub fn new(flow_id: FlowId, partition_id: FlowPartitionId) -> Self {
41 Self {
42 flow_id,
43 partition_id,
44 }
45 }
46}
47
48type FlownodeFlowSet = Arc<HashMap<FlowIdent, Peer>>;
51
52pub type TableFlownodeSetCacheRef = Arc<TableFlownodeSetCache>;
53
54pub type TableFlownodeSetCache = CacheContainer<TableId, FlownodeFlowSet, CacheIdent>;
56
57pub fn new_table_flownode_set_cache(
59 name: String,
60 cache: Cache<TableId, FlownodeFlowSet>,
61 kv_backend: KvBackendRef,
62) -> TableFlownodeSetCache {
63 let table_flow_manager = Arc::new(TableFlowManager::new(kv_backend));
64 let init = init_factory(table_flow_manager);
65
66 CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
67}
68
69fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer<TableId, FlownodeFlowSet> {
70 Arc::new(move |&table_id| {
71 let table_flow_manager = table_flow_manager.clone();
72 Box::pin(async move {
73 table_flow_manager
74 .flows(table_id)
75 .await
76 .map(|flows| {
77 flows
78 .into_iter()
79 .map(|(key, value)| {
80 (
81 FlowIdent::new(key.flow_id(), key.partition_id()),
82 value.peer,
83 )
84 })
85 .collect::<HashMap<_, _>>()
86 })
87 .map(Arc::new)
92 .map(Some)
93 .inspect(|set| {
94 if set.as_ref().map(|s| !s.is_empty()).unwrap_or(false) {
95 info!(
96 "Initialized table_flownode cache for table_id: {}, set: {:?}",
97 table_id, set
98 );
99 };
100 })
101 })
102 })
103}
104
105async fn handle_create_flow(
106 cache: &Cache<TableId, FlownodeFlowSet>,
107 CreateFlow {
108 flow_id,
109 source_table_ids,
110 partition_to_peer_mapping: flow_part2nodes,
111 }: &CreateFlow,
112) {
113 for table_id in source_table_ids {
114 let entry = cache.entry(*table_id);
115 entry
116 .and_compute_with(
117 async |entry: Option<moka::Entry<u32, FlownodeFlowSet>>| match entry {
118 Some(entry) => {
119 let mut map = entry.into_value().as_ref().clone();
120 map.extend(
121 flow_part2nodes.iter().map(|(part, peer)| {
122 (FlowIdent::new(*flow_id, *part), peer.clone())
123 }),
124 );
125
126 Op::Put(Arc::new(map))
127 }
128 None => {
129 Op::Put(Arc::new(HashMap::from_iter(flow_part2nodes.iter().map(
130 |(part, peer)| (FlowIdent::new(*flow_id, *part), peer.clone()),
131 ))))
132 }
133 },
134 )
135 .await;
136 }
137}
138
139async fn handle_drop_flow(
140 cache: &Cache<TableId, FlownodeFlowSet>,
141 DropFlow {
142 flow_id,
143 source_table_ids,
144 flow_part2node_id,
145 }: &DropFlow,
146) {
147 for table_id in source_table_ids {
148 let entry = cache.entry(*table_id);
149 entry
150 .and_compute_with(
151 async |entry: Option<moka::Entry<u32, FlownodeFlowSet>>| match entry {
152 Some(entry) => {
153 let mut set = entry.into_value().as_ref().clone();
154 for (part, _node) in flow_part2node_id {
155 let key = FlowIdent::new(*flow_id, *part);
156 set.remove(&key);
157 }
158
159 Op::Put(Arc::new(set))
160 }
161 None => {
162 Op::Nop
164 }
165 },
166 )
167 .await;
168 }
169}
170
171fn invalidator<'a>(
172 cache: &'a Cache<TableId, FlownodeFlowSet>,
173 idents: &'a [&CacheIdent],
174) -> BoxFuture<'a, Result<()>> {
175 Box::pin(async move {
176 for ident in idents {
177 match ident {
178 CacheIdent::CreateFlow(create_flow) => handle_create_flow(cache, create_flow).await,
179 CacheIdent::DropFlow(drop_flow) => handle_drop_flow(cache, drop_flow).await,
180 CacheIdent::FlowNodeAddressChange(node_id) => {
181 info!(
182 "Invalidate flow node cache for node_id in table_flownode: {}",
183 node_id
184 );
185 cache.invalidate_all();
186 }
187 _ => {}
188 }
189 }
190 Ok(())
191 })
192}
193
194fn filter(ident: &CacheIdent) -> bool {
195 matches!(
196 ident,
197 CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_) | CacheIdent::FlowNodeAddressChange(_)
198 )
199}
200
201#[cfg(test)]
202mod tests {
203 use std::collections::{BTreeMap, HashMap};
204 use std::sync::Arc;
205
206 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
207 use moka::future::CacheBuilder;
208 use table::table_name::TableName;
209
210 use crate::cache::flow::table_flownode::{FlowIdent, new_table_flownode_set_cache};
211 use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
212 use crate::key::flow::FlowMetadataManager;
213 use crate::key::flow::flow_info::FlowInfoValue;
214 use crate::key::flow::flow_route::FlowRouteValue;
215 use crate::kv_backend::memory::MemoryKvBackend;
216 use crate::peer::Peer;
217
218 #[tokio::test]
219 async fn test_cache_empty_set() {
220 let mem_kv = Arc::new(MemoryKvBackend::default());
221 let cache = CacheBuilder::new(128).build();
222 let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
223 let set = cache.get(1024).await.unwrap().unwrap();
224 assert!(set.is_empty());
225 }
226
227 #[tokio::test]
228 async fn test_get() {
229 let mem_kv = Arc::new(MemoryKvBackend::default());
230 let flownode_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
231 flownode_metadata_manager
232 .create_flow_metadata(
233 1024,
234 FlowInfoValue {
235 source_table_ids: vec![1024, 1025],
236 sink_table_name: TableName {
237 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
238 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
239 table_name: "sink_table".to_string(),
240 },
241 flownode_ids: BTreeMap::from([(0, 1), (1, 2), (2, 3)]),
242 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
243 query_context: None,
244 flow_name: "my_flow".to_string(),
245 raw_sql: "sql".to_string(),
246 expire_after: Some(300),
247 eval_interval_secs: None,
248 comment: "comment".to_string(),
249 options: Default::default(),
250 created_time: chrono::Utc::now(),
251 updated_time: chrono::Utc::now(),
252 },
253 (1..=3)
254 .map(|i| {
255 (
256 (i - 1) as u32,
257 FlowRouteValue {
258 peer: Peer::empty(i),
259 },
260 )
261 })
262 .collect::<Vec<_>>(),
263 )
264 .await
265 .unwrap();
266 let cache = CacheBuilder::new(128).build();
267 let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
268 let set = cache.get(1024).await.unwrap().unwrap();
269 assert_eq!(
270 set.as_ref().clone(),
271 HashMap::from_iter(
272 (1..=3).map(|i| { (FlowIdent::new(1024, (i - 1) as u32), Peer::empty(i),) })
273 )
274 );
275 let set = cache.get(1025).await.unwrap().unwrap();
276 assert_eq!(
277 set.as_ref().clone(),
278 HashMap::from_iter(
279 (1..=3).map(|i| { (FlowIdent::new(1024, (i - 1) as u32), Peer::empty(i),) })
280 )
281 );
282 let result = cache.get(1026).await.unwrap().unwrap();
283 assert_eq!(result.len(), 0);
284 }
285
286 #[tokio::test]
287 async fn test_create_flow() {
288 let mem_kv = Arc::new(MemoryKvBackend::default());
289 let cache = CacheBuilder::new(128).build();
290 let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
291 let ident = vec![CacheIdent::CreateFlow(CreateFlow {
292 flow_id: 2001,
293 source_table_ids: vec![1024, 1025],
294 partition_to_peer_mapping: (1..=5).map(|i| (i as u32, Peer::empty(i + 1))).collect(),
295 })];
296 cache.invalidate(&ident).await.unwrap();
297 let set = cache.get(1024).await.unwrap().unwrap();
298 assert_eq!(set.len(), 5);
299 let set = cache.get(1025).await.unwrap().unwrap();
300 assert_eq!(set.len(), 5);
301 }
302
303 #[tokio::test]
304 async fn test_replace_flow() {
305 let mem_kv = Arc::new(MemoryKvBackend::default());
306 let cache = CacheBuilder::new(128).build();
307 let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
308 let ident = vec![CacheIdent::CreateFlow(CreateFlow {
309 flow_id: 2001,
310 source_table_ids: vec![1024, 1025],
311 partition_to_peer_mapping: (1..=5).map(|i| (i as u32, Peer::empty(i + 1))).collect(),
312 })];
313 cache.invalidate(&ident).await.unwrap();
314 let set = cache.get(1024).await.unwrap().unwrap();
315 assert_eq!(set.len(), 5);
316 let set = cache.get(1025).await.unwrap().unwrap();
317 assert_eq!(set.len(), 5);
318
319 let drop_then_create_flow = vec![
320 CacheIdent::DropFlow(DropFlow {
321 flow_id: 2001,
322 source_table_ids: vec![1024, 1025],
323 flow_part2node_id: (1..=5).map(|i| (i as u32, i + 1)).collect(),
324 }),
325 CacheIdent::CreateFlow(CreateFlow {
326 flow_id: 2001,
327 source_table_ids: vec![1026, 1027],
328 partition_to_peer_mapping: (11..=15)
329 .map(|i| (i as u32, Peer::empty(i + 1)))
330 .collect(),
331 }),
332 CacheIdent::FlowId(2001),
333 ];
334 cache.invalidate(&drop_then_create_flow).await.unwrap();
335
336 let set = cache.get(1024).await.unwrap().unwrap();
337 assert!(set.is_empty());
338
339 let expected = HashMap::from_iter(
340 (11..=15).map(|i| (FlowIdent::new(2001, i as u32), Peer::empty(i + 1))),
341 );
342 let set = cache.get(1026).await.unwrap().unwrap();
343
344 assert_eq!(set.as_ref().clone(), expected);
345
346 let set = cache.get(1027).await.unwrap().unwrap();
347
348 assert_eq!(set.as_ref().clone(), expected);
349 }
350
351 #[tokio::test]
352 async fn test_drop_flow() {
353 let mem_kv = Arc::new(MemoryKvBackend::default());
354 let cache = CacheBuilder::new(128).build();
355 let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
356 let ident = vec![
357 CacheIdent::CreateFlow(CreateFlow {
358 flow_id: 2001,
359 source_table_ids: vec![1024, 1025],
360 partition_to_peer_mapping: (1..=5)
361 .map(|i| (i as u32, Peer::empty(i + 1)))
362 .collect(),
363 }),
364 CacheIdent::CreateFlow(CreateFlow {
365 flow_id: 2002,
366 source_table_ids: vec![1024, 1025],
367 partition_to_peer_mapping: (11..=12)
368 .map(|i| (i as u32, Peer::empty(i + 1)))
369 .collect(),
370 }),
371 CacheIdent::CreateFlow(CreateFlow {
373 flow_id: 2003,
374 source_table_ids: vec![1024, 1025],
375 partition_to_peer_mapping: (1..=5)
376 .map(|i| (i as u32, Peer::empty(i + 1)))
377 .collect(),
378 }),
379 ];
380 cache.invalidate(&ident).await.unwrap();
381 let set = cache.get(1024).await.unwrap().unwrap();
382 assert_eq!(set.len(), 12);
383 let set = cache.get(1025).await.unwrap().unwrap();
384 assert_eq!(set.len(), 12);
385
386 let ident = vec![CacheIdent::DropFlow(DropFlow {
387 flow_id: 2001,
388 source_table_ids: vec![1024, 1025],
389 flow_part2node_id: (1..=5).map(|i| (i as u32, i + 1)).collect(),
390 })];
391 cache.invalidate(&ident).await.unwrap();
392 let set = cache.get(1024).await.unwrap().unwrap();
393 assert_eq!(
394 set.as_ref().clone(),
395 HashMap::from_iter(
396 (11..=12)
397 .map(|i| (FlowIdent::new(2002, i as u32), Peer::empty(i + 1)))
398 .chain((1..=5).map(|i| (FlowIdent::new(2003, i as u32), Peer::empty(i + 1))))
399 )
400 );
401 let set = cache.get(1025).await.unwrap().unwrap();
402 assert_eq!(
403 set.as_ref().clone(),
404 HashMap::from_iter(
405 (11..=12)
406 .map(|i| (FlowIdent::new(2002, i as u32), Peer::empty(i + 1)))
407 .chain((1..=5).map(|i| (FlowIdent::new(2003, i as u32), Peer::empty(i + 1))))
408 )
409 );
410 }
411}