Skip to main content

common_meta/cache/flow/
table_flownode.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use common_telemetry::info;
19use futures::future::BoxFuture;
20use moka::future::Cache;
21use moka::ops::compute::Op;
22use table::metadata::TableId;
23
24use crate::cache::{CacheContainer, Initializer};
25use crate::error::Result;
26use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
27use crate::key::flow::{TableFlowManager, TableFlowManagerRef};
28use crate::key::{FlowId, FlowPartitionId};
29use crate::kv_backend::KvBackendRef;
30use crate::peer::Peer;
31
32/// Flow id&flow partition key
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct FlowIdent {
35    pub flow_id: FlowId,
36    pub partition_id: FlowPartitionId,
37}
38
39impl FlowIdent {
40    pub fn new(flow_id: FlowId, partition_id: FlowPartitionId) -> Self {
41        Self {
42            flow_id,
43            partition_id,
44        }
45    }
46}
47
48/// cache for TableFlowManager, the table_id part is in the outer cache
49/// include flownode_id, flow_id, partition_id mapping to Peer
50type FlownodeFlowSet = Arc<HashMap<FlowIdent, Peer>>;
51
52pub type TableFlownodeSetCacheRef = Arc<TableFlownodeSetCache>;
53
54/// [TableFlownodeSetCache] caches the [TableId] to [FlownodeSet] mapping.
55pub type TableFlownodeSetCache = CacheContainer<TableId, FlownodeFlowSet, CacheIdent>;
56
57/// Constructs a [TableFlownodeSetCache].
58pub fn new_table_flownode_set_cache(
59    name: String,
60    cache: Cache<TableId, FlownodeFlowSet>,
61    kv_backend: KvBackendRef,
62) -> TableFlownodeSetCache {
63    let table_flow_manager = Arc::new(TableFlowManager::new(kv_backend));
64    let init = init_factory(table_flow_manager);
65
66    CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
67}
68
69fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer<TableId, FlownodeFlowSet> {
70    Arc::new(move |&table_id| {
71        let table_flow_manager = table_flow_manager.clone();
72        Box::pin(async move {
73            table_flow_manager
74                .flows(table_id)
75                .await
76                .map(|flows| {
77                    flows
78                        .into_iter()
79                        .map(|(key, value)| {
80                            (
81                                FlowIdent::new(key.flow_id(), key.partition_id()),
82                                value.peer,
83                            )
84                        })
85                        .collect::<HashMap<_, _>>()
86                })
87                // We must cache the `HashSet` even if it's empty,
88                // to avoid future requests to the remote storage next time;
89                // If the value is added to the remote storage,
90                // we have a corresponding cache invalidation mechanism to invalidate `(Key, EmptyHashSet)`.
91                .map(Arc::new)
92                .map(Some)
93                .inspect(|set| {
94                    if set.as_ref().map(|s| !s.is_empty()).unwrap_or(false) {
95                        info!(
96                            "Initialized table_flownode cache for table_id: {}, set: {:?}",
97                            table_id, set
98                        );
99                    };
100                })
101        })
102    })
103}
104
105async fn handle_create_flow(
106    cache: &Cache<TableId, FlownodeFlowSet>,
107    CreateFlow {
108        flow_id,
109        source_table_ids,
110        partition_to_peer_mapping: flow_part2nodes,
111    }: &CreateFlow,
112) {
113    for table_id in source_table_ids {
114        let entry = cache.entry(*table_id);
115        entry
116            .and_compute_with(
117                async |entry: Option<moka::Entry<u32, FlownodeFlowSet>>| match entry {
118                    Some(entry) => {
119                        let mut map = entry.into_value().as_ref().clone();
120                        map.extend(
121                            flow_part2nodes.iter().map(|(part, peer)| {
122                                (FlowIdent::new(*flow_id, *part), peer.clone())
123                            }),
124                        );
125
126                        Op::Put(Arc::new(map))
127                    }
128                    None => {
129                        Op::Put(Arc::new(HashMap::from_iter(flow_part2nodes.iter().map(
130                            |(part, peer)| (FlowIdent::new(*flow_id, *part), peer.clone()),
131                        ))))
132                    }
133                },
134            )
135            .await;
136    }
137}
138
139async fn handle_drop_flow(
140    cache: &Cache<TableId, FlownodeFlowSet>,
141    DropFlow {
142        flow_id,
143        source_table_ids,
144        flow_part2node_id,
145    }: &DropFlow,
146) {
147    for table_id in source_table_ids {
148        let entry = cache.entry(*table_id);
149        entry
150            .and_compute_with(
151                async |entry: Option<moka::Entry<u32, FlownodeFlowSet>>| match entry {
152                    Some(entry) => {
153                        let mut set = entry.into_value().as_ref().clone();
154                        for (part, _node) in flow_part2node_id {
155                            let key = FlowIdent::new(*flow_id, *part);
156                            set.remove(&key);
157                        }
158
159                        Op::Put(Arc::new(set))
160                    }
161                    None => {
162                        // Do nothing
163                        Op::Nop
164                    }
165                },
166            )
167            .await;
168    }
169}
170
171fn invalidator<'a>(
172    cache: &'a Cache<TableId, FlownodeFlowSet>,
173    idents: &'a [&CacheIdent],
174) -> BoxFuture<'a, Result<()>> {
175    Box::pin(async move {
176        for ident in idents {
177            match ident {
178                CacheIdent::CreateFlow(create_flow) => handle_create_flow(cache, create_flow).await,
179                CacheIdent::DropFlow(drop_flow) => handle_drop_flow(cache, drop_flow).await,
180                CacheIdent::FlowNodeAddressChange(node_id) => {
181                    info!(
182                        "Invalidate flow node cache for node_id in table_flownode: {}",
183                        node_id
184                    );
185                    cache.invalidate_all();
186                }
187                _ => {}
188            }
189        }
190        Ok(())
191    })
192}
193
194fn filter(ident: &CacheIdent) -> bool {
195    matches!(
196        ident,
197        CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_) | CacheIdent::FlowNodeAddressChange(_)
198    )
199}
200
201#[cfg(test)]
202mod tests {
203    use std::collections::{BTreeMap, HashMap};
204    use std::sync::Arc;
205
206    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
207    use moka::future::CacheBuilder;
208    use table::table_name::TableName;
209
210    use crate::cache::flow::table_flownode::{FlowIdent, new_table_flownode_set_cache};
211    use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
212    use crate::key::flow::FlowMetadataManager;
213    use crate::key::flow::flow_info::FlowInfoValue;
214    use crate::key::flow::flow_route::FlowRouteValue;
215    use crate::kv_backend::memory::MemoryKvBackend;
216    use crate::peer::Peer;
217
218    #[tokio::test]
219    async fn test_cache_empty_set() {
220        let mem_kv = Arc::new(MemoryKvBackend::default());
221        let cache = CacheBuilder::new(128).build();
222        let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
223        let set = cache.get(1024).await.unwrap().unwrap();
224        assert!(set.is_empty());
225    }
226
227    #[tokio::test]
228    async fn test_get() {
229        let mem_kv = Arc::new(MemoryKvBackend::default());
230        let flownode_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
231        flownode_metadata_manager
232            .create_flow_metadata(
233                1024,
234                FlowInfoValue {
235                    source_table_ids: vec![1024, 1025],
236                    sink_table_name: TableName {
237                        catalog_name: DEFAULT_CATALOG_NAME.to_string(),
238                        schema_name: DEFAULT_SCHEMA_NAME.to_string(),
239                        table_name: "sink_table".to_string(),
240                    },
241                    flownode_ids: BTreeMap::from([(0, 1), (1, 2), (2, 3)]),
242                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
243                    query_context: None,
244                    flow_name: "my_flow".to_string(),
245                    raw_sql: "sql".to_string(),
246                    expire_after: Some(300),
247                    eval_interval_secs: None,
248                    comment: "comment".to_string(),
249                    options: Default::default(),
250                    created_time: chrono::Utc::now(),
251                    updated_time: chrono::Utc::now(),
252                },
253                (1..=3)
254                    .map(|i| {
255                        (
256                            (i - 1) as u32,
257                            FlowRouteValue {
258                                peer: Peer::empty(i),
259                            },
260                        )
261                    })
262                    .collect::<Vec<_>>(),
263            )
264            .await
265            .unwrap();
266        let cache = CacheBuilder::new(128).build();
267        let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
268        let set = cache.get(1024).await.unwrap().unwrap();
269        assert_eq!(
270            set.as_ref().clone(),
271            HashMap::from_iter(
272                (1..=3).map(|i| { (FlowIdent::new(1024, (i - 1) as u32), Peer::empty(i),) })
273            )
274        );
275        let set = cache.get(1025).await.unwrap().unwrap();
276        assert_eq!(
277            set.as_ref().clone(),
278            HashMap::from_iter(
279                (1..=3).map(|i| { (FlowIdent::new(1024, (i - 1) as u32), Peer::empty(i),) })
280            )
281        );
282        let result = cache.get(1026).await.unwrap().unwrap();
283        assert_eq!(result.len(), 0);
284    }
285
286    #[tokio::test]
287    async fn test_create_flow() {
288        let mem_kv = Arc::new(MemoryKvBackend::default());
289        let cache = CacheBuilder::new(128).build();
290        let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
291        let ident = vec![CacheIdent::CreateFlow(CreateFlow {
292            flow_id: 2001,
293            source_table_ids: vec![1024, 1025],
294            partition_to_peer_mapping: (1..=5).map(|i| (i as u32, Peer::empty(i + 1))).collect(),
295        })];
296        cache.invalidate(&ident).await.unwrap();
297        let set = cache.get(1024).await.unwrap().unwrap();
298        assert_eq!(set.len(), 5);
299        let set = cache.get(1025).await.unwrap().unwrap();
300        assert_eq!(set.len(), 5);
301    }
302
303    #[tokio::test]
304    async fn test_replace_flow() {
305        let mem_kv = Arc::new(MemoryKvBackend::default());
306        let cache = CacheBuilder::new(128).build();
307        let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
308        let ident = vec![CacheIdent::CreateFlow(CreateFlow {
309            flow_id: 2001,
310            source_table_ids: vec![1024, 1025],
311            partition_to_peer_mapping: (1..=5).map(|i| (i as u32, Peer::empty(i + 1))).collect(),
312        })];
313        cache.invalidate(&ident).await.unwrap();
314        let set = cache.get(1024).await.unwrap().unwrap();
315        assert_eq!(set.len(), 5);
316        let set = cache.get(1025).await.unwrap().unwrap();
317        assert_eq!(set.len(), 5);
318
319        let drop_then_create_flow = vec![
320            CacheIdent::DropFlow(DropFlow {
321                flow_id: 2001,
322                source_table_ids: vec![1024, 1025],
323                flow_part2node_id: (1..=5).map(|i| (i as u32, i + 1)).collect(),
324            }),
325            CacheIdent::CreateFlow(CreateFlow {
326                flow_id: 2001,
327                source_table_ids: vec![1026, 1027],
328                partition_to_peer_mapping: (11..=15)
329                    .map(|i| (i as u32, Peer::empty(i + 1)))
330                    .collect(),
331            }),
332            CacheIdent::FlowId(2001),
333        ];
334        cache.invalidate(&drop_then_create_flow).await.unwrap();
335
336        let set = cache.get(1024).await.unwrap().unwrap();
337        assert!(set.is_empty());
338
339        let expected = HashMap::from_iter(
340            (11..=15).map(|i| (FlowIdent::new(2001, i as u32), Peer::empty(i + 1))),
341        );
342        let set = cache.get(1026).await.unwrap().unwrap();
343
344        assert_eq!(set.as_ref().clone(), expected);
345
346        let set = cache.get(1027).await.unwrap().unwrap();
347
348        assert_eq!(set.as_ref().clone(), expected);
349    }
350
351    #[tokio::test]
352    async fn test_drop_flow() {
353        let mem_kv = Arc::new(MemoryKvBackend::default());
354        let cache = CacheBuilder::new(128).build();
355        let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
356        let ident = vec![
357            CacheIdent::CreateFlow(CreateFlow {
358                flow_id: 2001,
359                source_table_ids: vec![1024, 1025],
360                partition_to_peer_mapping: (1..=5)
361                    .map(|i| (i as u32, Peer::empty(i + 1)))
362                    .collect(),
363            }),
364            CacheIdent::CreateFlow(CreateFlow {
365                flow_id: 2002,
366                source_table_ids: vec![1024, 1025],
367                partition_to_peer_mapping: (11..=12)
368                    .map(|i| (i as u32, Peer::empty(i + 1)))
369                    .collect(),
370            }),
371            // same flownode that hold multiple flows
372            CacheIdent::CreateFlow(CreateFlow {
373                flow_id: 2003,
374                source_table_ids: vec![1024, 1025],
375                partition_to_peer_mapping: (1..=5)
376                    .map(|i| (i as u32, Peer::empty(i + 1)))
377                    .collect(),
378            }),
379        ];
380        cache.invalidate(&ident).await.unwrap();
381        let set = cache.get(1024).await.unwrap().unwrap();
382        assert_eq!(set.len(), 12);
383        let set = cache.get(1025).await.unwrap().unwrap();
384        assert_eq!(set.len(), 12);
385
386        let ident = vec![CacheIdent::DropFlow(DropFlow {
387            flow_id: 2001,
388            source_table_ids: vec![1024, 1025],
389            flow_part2node_id: (1..=5).map(|i| (i as u32, i + 1)).collect(),
390        })];
391        cache.invalidate(&ident).await.unwrap();
392        let set = cache.get(1024).await.unwrap().unwrap();
393        assert_eq!(
394            set.as_ref().clone(),
395            HashMap::from_iter(
396                (11..=12)
397                    .map(|i| (FlowIdent::new(2002, i as u32), Peer::empty(i + 1)))
398                    .chain((1..=5).map(|i| (FlowIdent::new(2003, i as u32), Peer::empty(i + 1))))
399            )
400        );
401        let set = cache.get(1025).await.unwrap().unwrap();
402        assert_eq!(
403            set.as_ref().clone(),
404            HashMap::from_iter(
405                (11..=12)
406                    .map(|i| (FlowIdent::new(2002, i as u32), Peer::empty(i + 1)))
407                    .chain((1..=5).map(|i| (FlowIdent::new(2003, i as u32), Peer::empty(i + 1))))
408            )
409        );
410    }
411}