1use std::collections::HashMap;
16use std::sync::Arc;
17
18use common_telemetry::info;
19use futures::future::BoxFuture;
20use moka::future::Cache;
21use moka::ops::compute::Op;
22use table::metadata::TableId;
23
24use crate::cache::{CacheContainer, Initializer};
25use crate::error::Result;
26use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
27use crate::key::flow::{TableFlowManager, TableFlowManagerRef};
28use crate::key::{FlowId, FlowPartitionId};
29use crate::kv_backend::KvBackendRef;
30use crate::peer::Peer;
31
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct FlowIdent {
35 pub flow_id: FlowId,
36 pub partition_id: FlowPartitionId,
37}
38
39impl FlowIdent {
40 pub fn new(flow_id: FlowId, partition_id: FlowPartitionId) -> Self {
41 Self {
42 flow_id,
43 partition_id,
44 }
45 }
46}
47
48type FlownodeFlowSet = Arc<HashMap<FlowIdent, Peer>>;
51
52pub type TableFlownodeSetCacheRef = Arc<TableFlownodeSetCache>;
53
54pub type TableFlownodeSetCache = CacheContainer<TableId, FlownodeFlowSet, CacheIdent>;
56
57pub fn new_table_flownode_set_cache(
59 name: String,
60 cache: Cache<TableId, FlownodeFlowSet>,
61 kv_backend: KvBackendRef,
62) -> TableFlownodeSetCache {
63 let table_flow_manager = Arc::new(TableFlowManager::new(kv_backend));
64 let init = init_factory(table_flow_manager);
65
66 CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
67}
68
69fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer<TableId, FlownodeFlowSet> {
70 Arc::new(move |&table_id| {
71 let table_flow_manager = table_flow_manager.clone();
72 Box::pin(async move {
73 table_flow_manager
74 .flows(table_id)
75 .await
76 .map(|flows| {
77 flows
78 .into_iter()
79 .map(|(key, value)| {
80 (
81 FlowIdent::new(key.flow_id(), key.partition_id()),
82 value.peer,
83 )
84 })
85 .collect::<HashMap<_, _>>()
86 })
87 .map(Arc::new)
92 .map(Some)
93 .inspect(|set| {
94 if set.as_ref().map(|s| !s.is_empty()).unwrap_or(false) {
95 info!(
96 "Initialized table_flownode cache for table_id: {}, set: {:?}",
97 table_id, set
98 );
99 };
100 })
101 })
102 })
103}
104
105async fn handle_create_flow(
106 cache: &Cache<TableId, FlownodeFlowSet>,
107 CreateFlow {
108 flow_id,
109 source_table_ids,
110 partition_to_peer_mapping: flow_part2nodes,
111 }: &CreateFlow,
112) {
113 for table_id in source_table_ids {
114 let entry = cache.entry(*table_id);
115 entry
116 .and_compute_with(
117 async |entry: Option<moka::Entry<u32, FlownodeFlowSet>>| match entry {
118 Some(entry) => {
119 let mut map = entry.into_value().as_ref().clone();
120 map.extend(
121 flow_part2nodes.iter().map(|(part, peer)| {
122 (FlowIdent::new(*flow_id, *part), peer.clone())
123 }),
124 );
125
126 Op::Put(Arc::new(map))
127 }
128 None => {
129 Op::Put(Arc::new(HashMap::from_iter(flow_part2nodes.iter().map(
130 |(part, peer)| (FlowIdent::new(*flow_id, *part), peer.clone()),
131 ))))
132 }
133 },
134 )
135 .await;
136 }
137}
138
139async fn handle_drop_flow(
140 cache: &Cache<TableId, FlownodeFlowSet>,
141 DropFlow {
142 flow_id,
143 source_table_ids,
144 flow_part2node_id,
145 }: &DropFlow,
146) {
147 for table_id in source_table_ids {
148 let entry = cache.entry(*table_id);
149 entry
150 .and_compute_with(
151 async |entry: Option<moka::Entry<u32, FlownodeFlowSet>>| match entry {
152 Some(entry) => {
153 let mut set = entry.into_value().as_ref().clone();
154 for (part, _node) in flow_part2node_id {
155 let key = FlowIdent::new(*flow_id, *part);
156 set.remove(&key);
157 }
158
159 Op::Put(Arc::new(set))
160 }
161 None => {
162 Op::Nop
164 }
165 },
166 )
167 .await;
168 }
169}
170
171fn invalidator<'a>(
172 cache: &'a Cache<TableId, FlownodeFlowSet>,
173 ident: &'a CacheIdent,
174) -> BoxFuture<'a, Result<()>> {
175 Box::pin(async move {
176 match ident {
177 CacheIdent::CreateFlow(create_flow) => handle_create_flow(cache, create_flow).await,
178 CacheIdent::DropFlow(drop_flow) => handle_drop_flow(cache, drop_flow).await,
179 CacheIdent::FlowNodeAddressChange(node_id) => {
180 info!(
181 "Invalidate flow node cache for node_id in table_flownode: {}",
182 node_id
183 );
184 cache.invalidate_all();
185 }
186 _ => {}
187 }
188 Ok(())
189 })
190}
191
192fn filter(ident: &CacheIdent) -> bool {
193 matches!(
194 ident,
195 CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_) | CacheIdent::FlowNodeAddressChange(_)
196 )
197}
198
199#[cfg(test)]
200mod tests {
201 use std::collections::{BTreeMap, HashMap};
202 use std::sync::Arc;
203
204 use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
205 use moka::future::CacheBuilder;
206 use table::table_name::TableName;
207
208 use crate::cache::flow::table_flownode::{new_table_flownode_set_cache, FlowIdent};
209 use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
210 use crate::key::flow::flow_info::FlowInfoValue;
211 use crate::key::flow::flow_route::FlowRouteValue;
212 use crate::key::flow::FlowMetadataManager;
213 use crate::kv_backend::memory::MemoryKvBackend;
214 use crate::peer::Peer;
215
216 #[tokio::test]
217 async fn test_cache_empty_set() {
218 let mem_kv = Arc::new(MemoryKvBackend::default());
219 let cache = CacheBuilder::new(128).build();
220 let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
221 let set = cache.get(1024).await.unwrap().unwrap();
222 assert!(set.is_empty());
223 }
224
225 #[tokio::test]
226 async fn test_get() {
227 let mem_kv = Arc::new(MemoryKvBackend::default());
228 let flownode_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
229 flownode_metadata_manager
230 .create_flow_metadata(
231 1024,
232 FlowInfoValue {
233 source_table_ids: vec![1024, 1025],
234 sink_table_name: TableName {
235 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
236 schema_name: DEFAULT_SCHEMA_NAME.to_string(),
237 table_name: "sink_table".to_string(),
238 },
239 flownode_ids: BTreeMap::from([(0, 1), (1, 2), (2, 3)]),
240 catalog_name: DEFAULT_CATALOG_NAME.to_string(),
241 query_context: None,
242 flow_name: "my_flow".to_string(),
243 raw_sql: "sql".to_string(),
244 expire_after: Some(300),
245 comment: "comment".to_string(),
246 options: Default::default(),
247 created_time: chrono::Utc::now(),
248 updated_time: chrono::Utc::now(),
249 },
250 (1..=3)
251 .map(|i| {
252 (
253 (i - 1) as u32,
254 FlowRouteValue {
255 peer: Peer::empty(i),
256 },
257 )
258 })
259 .collect::<Vec<_>>(),
260 )
261 .await
262 .unwrap();
263 let cache = CacheBuilder::new(128).build();
264 let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
265 let set = cache.get(1024).await.unwrap().unwrap();
266 assert_eq!(
267 set.as_ref().clone(),
268 HashMap::from_iter(
269 (1..=3).map(|i| { (FlowIdent::new(1024, (i - 1) as u32), Peer::empty(i),) })
270 )
271 );
272 let set = cache.get(1025).await.unwrap().unwrap();
273 assert_eq!(
274 set.as_ref().clone(),
275 HashMap::from_iter(
276 (1..=3).map(|i| { (FlowIdent::new(1024, (i - 1) as u32), Peer::empty(i),) })
277 )
278 );
279 let result = cache.get(1026).await.unwrap().unwrap();
280 assert_eq!(result.len(), 0);
281 }
282
283 #[tokio::test]
284 async fn test_create_flow() {
285 let mem_kv = Arc::new(MemoryKvBackend::default());
286 let cache = CacheBuilder::new(128).build();
287 let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
288 let ident = vec![CacheIdent::CreateFlow(CreateFlow {
289 flow_id: 2001,
290 source_table_ids: vec![1024, 1025],
291 partition_to_peer_mapping: (1..=5).map(|i| (i as u32, Peer::empty(i + 1))).collect(),
292 })];
293 cache.invalidate(&ident).await.unwrap();
294 let set = cache.get(1024).await.unwrap().unwrap();
295 assert_eq!(set.len(), 5);
296 let set = cache.get(1025).await.unwrap().unwrap();
297 assert_eq!(set.len(), 5);
298 }
299
300 #[tokio::test]
301 async fn test_replace_flow() {
302 let mem_kv = Arc::new(MemoryKvBackend::default());
303 let cache = CacheBuilder::new(128).build();
304 let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
305 let ident = vec![CacheIdent::CreateFlow(CreateFlow {
306 flow_id: 2001,
307 source_table_ids: vec![1024, 1025],
308 partition_to_peer_mapping: (1..=5).map(|i| (i as u32, Peer::empty(i + 1))).collect(),
309 })];
310 cache.invalidate(&ident).await.unwrap();
311 let set = cache.get(1024).await.unwrap().unwrap();
312 assert_eq!(set.len(), 5);
313 let set = cache.get(1025).await.unwrap().unwrap();
314 assert_eq!(set.len(), 5);
315
316 let drop_then_create_flow = vec![
317 CacheIdent::DropFlow(DropFlow {
318 flow_id: 2001,
319 source_table_ids: vec![1024, 1025],
320 flow_part2node_id: (1..=5).map(|i| (i as u32, i + 1)).collect(),
321 }),
322 CacheIdent::CreateFlow(CreateFlow {
323 flow_id: 2001,
324 source_table_ids: vec![1026, 1027],
325 partition_to_peer_mapping: (11..=15)
326 .map(|i| (i as u32, Peer::empty(i + 1)))
327 .collect(),
328 }),
329 CacheIdent::FlowId(2001),
330 ];
331 cache.invalidate(&drop_then_create_flow).await.unwrap();
332
333 let set = cache.get(1024).await.unwrap().unwrap();
334 assert!(set.is_empty());
335
336 let expected = HashMap::from_iter(
337 (11..=15).map(|i| (FlowIdent::new(2001, i as u32), Peer::empty(i + 1))),
338 );
339 let set = cache.get(1026).await.unwrap().unwrap();
340
341 assert_eq!(set.as_ref().clone(), expected);
342
343 let set = cache.get(1027).await.unwrap().unwrap();
344
345 assert_eq!(set.as_ref().clone(), expected);
346 }
347
348 #[tokio::test]
349 async fn test_drop_flow() {
350 let mem_kv = Arc::new(MemoryKvBackend::default());
351 let cache = CacheBuilder::new(128).build();
352 let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
353 let ident = vec![
354 CacheIdent::CreateFlow(CreateFlow {
355 flow_id: 2001,
356 source_table_ids: vec![1024, 1025],
357 partition_to_peer_mapping: (1..=5)
358 .map(|i| (i as u32, Peer::empty(i + 1)))
359 .collect(),
360 }),
361 CacheIdent::CreateFlow(CreateFlow {
362 flow_id: 2002,
363 source_table_ids: vec![1024, 1025],
364 partition_to_peer_mapping: (11..=12)
365 .map(|i| (i as u32, Peer::empty(i + 1)))
366 .collect(),
367 }),
368 CacheIdent::CreateFlow(CreateFlow {
370 flow_id: 2003,
371 source_table_ids: vec![1024, 1025],
372 partition_to_peer_mapping: (1..=5)
373 .map(|i| (i as u32, Peer::empty(i + 1)))
374 .collect(),
375 }),
376 ];
377 cache.invalidate(&ident).await.unwrap();
378 let set = cache.get(1024).await.unwrap().unwrap();
379 assert_eq!(set.len(), 12);
380 let set = cache.get(1025).await.unwrap().unwrap();
381 assert_eq!(set.len(), 12);
382
383 let ident = vec![CacheIdent::DropFlow(DropFlow {
384 flow_id: 2001,
385 source_table_ids: vec![1024, 1025],
386 flow_part2node_id: (1..=5).map(|i| (i as u32, i + 1)).collect(),
387 })];
388 cache.invalidate(&ident).await.unwrap();
389 let set = cache.get(1024).await.unwrap().unwrap();
390 assert_eq!(
391 set.as_ref().clone(),
392 HashMap::from_iter(
393 (11..=12)
394 .map(|i| (FlowIdent::new(2002, i as u32), Peer::empty(i + 1)))
395 .chain((1..=5).map(|i| (FlowIdent::new(2003, i as u32), Peer::empty(i + 1))))
396 )
397 );
398 let set = cache.get(1025).await.unwrap().unwrap();
399 assert_eq!(
400 set.as_ref().clone(),
401 HashMap::from_iter(
402 (11..=12)
403 .map(|i| (FlowIdent::new(2002, i as u32), Peer::empty(i + 1)))
404 .chain((1..=5).map(|i| (FlowIdent::new(2003, i as u32), Peer::empty(i + 1))))
405 )
406 );
407 }
408}