1use std::collections::HashMap;
16use std::sync::Arc;
17
18use common_telemetry::info;
19use futures::future::BoxFuture;
20use moka::future::Cache;
21use moka::ops::compute::Op;
22use table::metadata::TableId;
23
24use crate::cache::{CacheContainer, Initializer};
25use crate::error::Result;
26use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
27use crate::key::flow::{TableFlowManager, TableFlowManagerRef};
28use crate::key::{FlowId, FlowPartitionId};
29use crate::kv_backend::KvBackendRef;
30use crate::peer::Peer;
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct FlowIdent {
35 pub flow_id: FlowId,
36 pub partition_id: FlowPartitionId,
37}
38
39impl FlowIdent {
40 pub fn new(flow_id: FlowId, partition_id: FlowPartitionId) -> Self {
41 Self {
42 flow_id,
43 partition_id,
44 }
45 }
46}
47
48type FlownodeFlowSet = Arc<HashMap<FlowIdent, Peer>>;
51
52pub type TableFlownodeSetCacheRef = Arc<TableFlownodeSetCache>;
53
54pub type TableFlownodeSetCache = CacheContainer<TableId, FlownodeFlowSet, CacheIdent>;
56
57pub fn new_table_flownode_set_cache(
59 name: String,
60 cache: Cache<TableId, FlownodeFlowSet>,
61 kv_backend: KvBackendRef,
62) -> TableFlownodeSetCache {
63 let table_flow_manager = Arc::new(TableFlowManager::new(kv_backend));
64 let init = init_factory(table_flow_manager);
65
66 CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
67}
68
69fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer<TableId, FlownodeFlowSet> {
70 Arc::new(move |&table_id| {
71 let table_flow_manager = table_flow_manager.clone();
72 Box::pin(async move {
73 table_flow_manager
74 .flows(table_id)
75 .await
76 .map(|flows| {
77 flows
78 .into_iter()
79 .map(|(key, value)| {
80 (
81 FlowIdent::new(key.flow_id(), key.partition_id()),
82 value.peer,
83 )
84 })
85 .collect::<HashMap<_, _>>()
86 })
87 .map(Arc::new)
92 .map(Some)
93 .inspect(|set| {
94 if set.as_ref().map(|s| !s.is_empty()).unwrap_or(false) {
95 info!(
96 "Initialized table_flownode cache for table_id: {}, set: {:?}",
97 table_id, set
98 );
99 };
100 })
101 })
102 })
103}
104
105async fn handle_create_flow(
106 cache: &Cache<TableId, FlownodeFlowSet>,
107 CreateFlow {
108 flow_id,
109 source_table_ids,
110 partition_to_peer_mapping: flow_part2nodes,
111 }: &CreateFlow,
112) {
113 for table_id in source_table_ids {
114 let entry = cache.entry(*table_id);
115 entry
116 .and_compute_with(
117 async |entry: Option<moka::Entry<u32, FlownodeFlowSet>>| match entry {
118 Some(entry) => {
119 let mut map = entry.into_value().as_ref().clone();
120 map.extend(
121 flow_part2nodes.iter().map(|(part, peer)| {
122 (FlowIdent::new(*flow_id, *part), peer.clone())
123 }),
124 );
125
126 Op::Put(Arc::new(map))
127 }
128 None => {
129 Op::Put(Arc::new(HashMap::from_iter(flow_part2nodes.iter().map(
130 |(part, peer)| (FlowIdent::new(*flow_id, *part), peer.clone()),
131 ))))
132 }
133 },
134 )
135 .await;
136 }
137}
138
139async fn handle_drop_flow(
140 cache: &Cache<TableId, FlownodeFlowSet>,
141 DropFlow {
142 flow_id,
143 source_table_ids,
144 flow_part2node_id,
145 }: &DropFlow,
146) {
147 for table_id in source_table_ids {
148 let entry = cache.entry(*table_id);
149 entry
150 .and_compute_with(
151 async |entry: Option<moka::Entry<u32, FlownodeFlowSet>>| match entry {
152 Some(entry) => {
153 let mut set = entry.into_value().as_ref().clone();
154 for (part, _node) in flow_part2node_id {
155 let key = FlowIdent::new(*flow_id, *part);
156 set.remove(&key);
157 }
158
159 Op::Put(Arc::new(set))
160 }
161 None => {
162 Op::Nop
164 }
165 },
166 )
167 .await;
168 }
169}
170
171fn invalidator<'a>(
172 cache: &'a Cache<TableId, FlownodeFlowSet>,
173 ident: &'a CacheIdent,
174) -> BoxFuture<'a, Result<()>> {
175 Box::pin(async move {
176 match ident {
177 CacheIdent::CreateFlow(create_flow) => handle_create_flow(cache, create_flow).await,
178 CacheIdent::DropFlow(drop_flow) => handle_drop_flow(cache, drop_flow).await,
179 CacheIdent::FlowNodeAddressChange(node_id) => {
180 info!(
181 "Invalidate flow node cache for node_id in table_flownode: {}",
182 node_id
183 );
184 cache.invalidate_all();
185 }
186 _ => {}
187 }
188 Ok(())
189 })
190}
191
192fn filter(ident: &CacheIdent) -> bool {
193 matches!(
194 ident,
195 CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_) | CacheIdent::FlowNodeAddressChange(_)
196 )
197}
198
199#[cfg(test)]
200mod tests {
201 use std::collections::{BTreeMap, HashMap};
202 use std::sync::Arc;
203
204 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
205 use moka::future::CacheBuilder;
206 use table::table_name::TableName;
207
208 use crate::cache::flow::table_flownode::{FlowIdent, new_table_flownode_set_cache};
209 use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
210 use crate::key::flow::FlowMetadataManager;
211 use crate::key::flow::flow_info::FlowInfoValue;
212 use crate::key::flow::flow_route::FlowRouteValue;
213 use crate::kv_backend::memory::MemoryKvBackend;
214 use crate::peer::Peer;
215
216 #[tokio::test]
217 async fn test_cache_empty_set() {
218 let mem_kv = Arc::new(MemoryKvBackend::default());
219 let cache = CacheBuilder::new(128).build();
220 let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
221 let set = cache.get(1024).await.unwrap().unwrap();
222 assert!(set.is_empty());
223 }
224
225 #[tokio::test]
226 async fn test_get() {
227 let mem_kv = Arc::new(MemoryKvBackend::default());
228 let flownode_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
229 flownode_metadata_manager
230 .create_flow_metadata(
231 1024,
232 FlowInfoValue {
233 source_table_ids: vec![1024, 1025],
234 sink_table_name: TableName {
235 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
236 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
237 table_name: "sink_table".to_string(),
238 },
239 flownode_ids: BTreeMap::from([(0, 1), (1, 2), (2, 3)]),
240 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
241 query_context: None,
242 flow_name: "my_flow".to_string(),
243 raw_sql: "sql".to_string(),
244 expire_after: Some(300),
245 eval_interval_secs: None,
246 comment: "comment".to_string(),
247 options: Default::default(),
248 created_time: chrono::Utc::now(),
249 updated_time: chrono::Utc::now(),
250 },
251 (1..=3)
252 .map(|i| {
253 (
254 (i - 1) as u32,
255 FlowRouteValue {
256 peer: Peer::empty(i),
257 },
258 )
259 })
260 .collect::<Vec<_>>(),
261 )
262 .await
263 .unwrap();
264 let cache = CacheBuilder::new(128).build();
265 let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
266 let set = cache.get(1024).await.unwrap().unwrap();
267 assert_eq!(
268 set.as_ref().clone(),
269 HashMap::from_iter(
270 (1..=3).map(|i| { (FlowIdent::new(1024, (i - 1) as u32), Peer::empty(i),) })
271 )
272 );
273 let set = cache.get(1025).await.unwrap().unwrap();
274 assert_eq!(
275 set.as_ref().clone(),
276 HashMap::from_iter(
277 (1..=3).map(|i| { (FlowIdent::new(1024, (i - 1) as u32), Peer::empty(i),) })
278 )
279 );
280 let result = cache.get(1026).await.unwrap().unwrap();
281 assert_eq!(result.len(), 0);
282 }
283
284 #[tokio::test]
285 async fn test_create_flow() {
286 let mem_kv = Arc::new(MemoryKvBackend::default());
287 let cache = CacheBuilder::new(128).build();
288 let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
289 let ident = vec![CacheIdent::CreateFlow(CreateFlow {
290 flow_id: 2001,
291 source_table_ids: vec![1024, 1025],
292 partition_to_peer_mapping: (1..=5).map(|i| (i as u32, Peer::empty(i + 1))).collect(),
293 })];
294 cache.invalidate(&ident).await.unwrap();
295 let set = cache.get(1024).await.unwrap().unwrap();
296 assert_eq!(set.len(), 5);
297 let set = cache.get(1025).await.unwrap().unwrap();
298 assert_eq!(set.len(), 5);
299 }
300
301 #[tokio::test]
302 async fn test_replace_flow() {
303 let mem_kv = Arc::new(MemoryKvBackend::default());
304 let cache = CacheBuilder::new(128).build();
305 let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
306 let ident = vec![CacheIdent::CreateFlow(CreateFlow {
307 flow_id: 2001,
308 source_table_ids: vec![1024, 1025],
309 partition_to_peer_mapping: (1..=5).map(|i| (i as u32, Peer::empty(i + 1))).collect(),
310 })];
311 cache.invalidate(&ident).await.unwrap();
312 let set = cache.get(1024).await.unwrap().unwrap();
313 assert_eq!(set.len(), 5);
314 let set = cache.get(1025).await.unwrap().unwrap();
315 assert_eq!(set.len(), 5);
316
317 let drop_then_create_flow = vec![
318 CacheIdent::DropFlow(DropFlow {
319 flow_id: 2001,
320 source_table_ids: vec![1024, 1025],
321 flow_part2node_id: (1..=5).map(|i| (i as u32, i + 1)).collect(),
322 }),
323 CacheIdent::CreateFlow(CreateFlow {
324 flow_id: 2001,
325 source_table_ids: vec![1026, 1027],
326 partition_to_peer_mapping: (11..=15)
327 .map(|i| (i as u32, Peer::empty(i + 1)))
328 .collect(),
329 }),
330 CacheIdent::FlowId(2001),
331 ];
332 cache.invalidate(&drop_then_create_flow).await.unwrap();
333
334 let set = cache.get(1024).await.unwrap().unwrap();
335 assert!(set.is_empty());
336
337 let expected = HashMap::from_iter(
338 (11..=15).map(|i| (FlowIdent::new(2001, i as u32), Peer::empty(i + 1))),
339 );
340 let set = cache.get(1026).await.unwrap().unwrap();
341
342 assert_eq!(set.as_ref().clone(), expected);
343
344 let set = cache.get(1027).await.unwrap().unwrap();
345
346 assert_eq!(set.as_ref().clone(), expected);
347 }
348
349 #[tokio::test]
350 async fn test_drop_flow() {
351 let mem_kv = Arc::new(MemoryKvBackend::default());
352 let cache = CacheBuilder::new(128).build();
353 let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
354 let ident = vec![
355 CacheIdent::CreateFlow(CreateFlow {
356 flow_id: 2001,
357 source_table_ids: vec![1024, 1025],
358 partition_to_peer_mapping: (1..=5)
359 .map(|i| (i as u32, Peer::empty(i + 1)))
360 .collect(),
361 }),
362 CacheIdent::CreateFlow(CreateFlow {
363 flow_id: 2002,
364 source_table_ids: vec![1024, 1025],
365 partition_to_peer_mapping: (11..=12)
366 .map(|i| (i as u32, Peer::empty(i + 1)))
367 .collect(),
368 }),
369 CacheIdent::CreateFlow(CreateFlow {
371 flow_id: 2003,
372 source_table_ids: vec![1024, 1025],
373 partition_to_peer_mapping: (1..=5)
374 .map(|i| (i as u32, Peer::empty(i + 1)))
375 .collect(),
376 }),
377 ];
378 cache.invalidate(&ident).await.unwrap();
379 let set = cache.get(1024).await.unwrap().unwrap();
380 assert_eq!(set.len(), 12);
381 let set = cache.get(1025).await.unwrap().unwrap();
382 assert_eq!(set.len(), 12);
383
384 let ident = vec![CacheIdent::DropFlow(DropFlow {
385 flow_id: 2001,
386 source_table_ids: vec![1024, 1025],
387 flow_part2node_id: (1..=5).map(|i| (i as u32, i + 1)).collect(),
388 })];
389 cache.invalidate(&ident).await.unwrap();
390 let set = cache.get(1024).await.unwrap().unwrap();
391 assert_eq!(
392 set.as_ref().clone(),
393 HashMap::from_iter(
394 (11..=12)
395 .map(|i| (FlowIdent::new(2002, i as u32), Peer::empty(i + 1)))
396 .chain((1..=5).map(|i| (FlowIdent::new(2003, i as u32), Peer::empty(i + 1))))
397 )
398 );
399 let set = cache.get(1025).await.unwrap().unwrap();
400 assert_eq!(
401 set.as_ref().clone(),
402 HashMap::from_iter(
403 (11..=12)
404 .map(|i| (FlowIdent::new(2002, i as u32), Peer::empty(i + 1)))
405 .chain((1..=5).map(|i| (FlowIdent::new(2003, i as u32), Peer::empty(i + 1))))
406 )
407 );
408 }
409}