common_meta/cache/flow/
table_flownode.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use common_telemetry::info;
19use futures::future::BoxFuture;
20use moka::future::Cache;
21use moka::ops::compute::Op;
22use table::metadata::TableId;
23
24use crate::cache::{CacheContainer, Initializer};
25use crate::error::Result;
26use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
27use crate::key::flow::{TableFlowManager, TableFlowManagerRef};
28use crate::key::{FlowId, FlowPartitionId};
29use crate::kv_backend::KvBackendRef;
30use crate::peer::Peer;
31
32/// Flow id&flow partition key
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct FlowIdent {
35    pub flow_id: FlowId,
36    pub partition_id: FlowPartitionId,
37}
38
39impl FlowIdent {
40    pub fn new(flow_id: FlowId, partition_id: FlowPartitionId) -> Self {
41        Self {
42            flow_id,
43            partition_id,
44        }
45    }
46}
47
48/// cache for TableFlowManager, the table_id part is in the outer cache
49/// include flownode_id, flow_id, partition_id mapping to Peer
50type FlownodeFlowSet = Arc<HashMap<FlowIdent, Peer>>;
51
52pub type TableFlownodeSetCacheRef = Arc<TableFlownodeSetCache>;
53
54/// [TableFlownodeSetCache] caches the [TableId] to [FlownodeSet] mapping.
55pub type TableFlownodeSetCache = CacheContainer<TableId, FlownodeFlowSet, CacheIdent>;
56
57/// Constructs a [TableFlownodeSetCache].
58pub fn new_table_flownode_set_cache(
59    name: String,
60    cache: Cache<TableId, FlownodeFlowSet>,
61    kv_backend: KvBackendRef,
62) -> TableFlownodeSetCache {
63    let table_flow_manager = Arc::new(TableFlowManager::new(kv_backend));
64    let init = init_factory(table_flow_manager);
65
66    CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
67}
68
69fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer<TableId, FlownodeFlowSet> {
70    Arc::new(move |&table_id| {
71        let table_flow_manager = table_flow_manager.clone();
72        Box::pin(async move {
73            table_flow_manager
74                .flows(table_id)
75                .await
76                .map(|flows| {
77                    flows
78                        .into_iter()
79                        .map(|(key, value)| {
80                            (
81                                FlowIdent::new(key.flow_id(), key.partition_id()),
82                                value.peer,
83                            )
84                        })
85                        .collect::<HashMap<_, _>>()
86                })
87                // We must cache the `HashSet` even if it's empty,
88                // to avoid future requests to the remote storage next time;
89                // If the value is added to the remote storage,
90                // we have a corresponding cache invalidation mechanism to invalidate `(Key, EmptyHashSet)`.
91                .map(Arc::new)
92                .map(Some)
93                .inspect(|set| {
94                    if set.as_ref().map(|s| !s.is_empty()).unwrap_or(false) {
95                        info!(
96                            "Initialized table_flownode cache for table_id: {}, set: {:?}",
97                            table_id, set
98                        );
99                    };
100                })
101        })
102    })
103}
104
105async fn handle_create_flow(
106    cache: &Cache<TableId, FlownodeFlowSet>,
107    CreateFlow {
108        flow_id,
109        source_table_ids,
110        partition_to_peer_mapping: flow_part2nodes,
111    }: &CreateFlow,
112) {
113    for table_id in source_table_ids {
114        let entry = cache.entry(*table_id);
115        entry
116            .and_compute_with(
117                async |entry: Option<moka::Entry<u32, FlownodeFlowSet>>| match entry {
118                    Some(entry) => {
119                        let mut map = entry.into_value().as_ref().clone();
120                        map.extend(
121                            flow_part2nodes.iter().map(|(part, peer)| {
122                                (FlowIdent::new(*flow_id, *part), peer.clone())
123                            }),
124                        );
125
126                        Op::Put(Arc::new(map))
127                    }
128                    None => {
129                        Op::Put(Arc::new(HashMap::from_iter(flow_part2nodes.iter().map(
130                            |(part, peer)| (FlowIdent::new(*flow_id, *part), peer.clone()),
131                        ))))
132                    }
133                },
134            )
135            .await;
136    }
137}
138
139async fn handle_drop_flow(
140    cache: &Cache<TableId, FlownodeFlowSet>,
141    DropFlow {
142        flow_id,
143        source_table_ids,
144        flow_part2node_id,
145    }: &DropFlow,
146) {
147    for table_id in source_table_ids {
148        let entry = cache.entry(*table_id);
149        entry
150            .and_compute_with(
151                async |entry: Option<moka::Entry<u32, FlownodeFlowSet>>| match entry {
152                    Some(entry) => {
153                        let mut set = entry.into_value().as_ref().clone();
154                        for (part, _node) in flow_part2node_id {
155                            let key = FlowIdent::new(*flow_id, *part);
156                            set.remove(&key);
157                        }
158
159                        Op::Put(Arc::new(set))
160                    }
161                    None => {
162                        // Do nothing
163                        Op::Nop
164                    }
165                },
166            )
167            .await;
168    }
169}
170
171fn invalidator<'a>(
172    cache: &'a Cache<TableId, FlownodeFlowSet>,
173    ident: &'a CacheIdent,
174) -> BoxFuture<'a, Result<()>> {
175    Box::pin(async move {
176        match ident {
177            CacheIdent::CreateFlow(create_flow) => handle_create_flow(cache, create_flow).await,
178            CacheIdent::DropFlow(drop_flow) => handle_drop_flow(cache, drop_flow).await,
179            CacheIdent::FlowNodeAddressChange(node_id) => {
180                info!(
181                    "Invalidate flow node cache for node_id in table_flownode: {}",
182                    node_id
183                );
184                cache.invalidate_all();
185            }
186            _ => {}
187        }
188        Ok(())
189    })
190}
191
192fn filter(ident: &CacheIdent) -> bool {
193    matches!(
194        ident,
195        CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_) | CacheIdent::FlowNodeAddressChange(_)
196    )
197}
198
199#[cfg(test)]
200mod tests {
201    use std::collections::{BTreeMap, HashMap};
202    use std::sync::Arc;
203
204    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
205    use moka::future::CacheBuilder;
206    use table::table_name::TableName;
207
208    use crate::cache::flow::table_flownode::{new_table_flownode_set_cache, FlowIdent};
209    use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
210    use crate::key::flow::flow_info::FlowInfoValue;
211    use crate::key::flow::flow_route::FlowRouteValue;
212    use crate::key::flow::FlowMetadataManager;
213    use crate::kv_backend::memory::MemoryKvBackend;
214    use crate::peer::Peer;
215
216    #[tokio::test]
217    async fn test_cache_empty_set() {
218        let mem_kv = Arc::new(MemoryKvBackend::default());
219        let cache = CacheBuilder::new(128).build();
220        let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
221        let set = cache.get(1024).await.unwrap().unwrap();
222        assert!(set.is_empty());
223    }
224
225    #[tokio::test]
226    async fn test_get() {
227        let mem_kv = Arc::new(MemoryKvBackend::default());
228        let flownode_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
229        flownode_metadata_manager
230            .create_flow_metadata(
231                1024,
232                FlowInfoValue {
233                    source_table_ids: vec![1024, 1025],
234                    sink_table_name: TableName {
235                        catalog_name: DEFAULT_CATALOG_NAME.to_string(),
236                        schema_name: DEFAULT_SCHEMA_NAME.to_string(),
237                        table_name: "sink_table".to_string(),
238                    },
239                    flownode_ids: BTreeMap::from([(0, 1), (1, 2), (2, 3)]),
240                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
241                    query_context: None,
242                    flow_name: "my_flow".to_string(),
243                    raw_sql: "sql".to_string(),
244                    expire_after: Some(300),
245                    comment: "comment".to_string(),
246                    options: Default::default(),
247                    created_time: chrono::Utc::now(),
248                    updated_time: chrono::Utc::now(),
249                },
250                (1..=3)
251                    .map(|i| {
252                        (
253                            (i - 1) as u32,
254                            FlowRouteValue {
255                                peer: Peer::empty(i),
256                            },
257                        )
258                    })
259                    .collect::<Vec<_>>(),
260            )
261            .await
262            .unwrap();
263        let cache = CacheBuilder::new(128).build();
264        let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
265        let set = cache.get(1024).await.unwrap().unwrap();
266        assert_eq!(
267            set.as_ref().clone(),
268            HashMap::from_iter(
269                (1..=3).map(|i| { (FlowIdent::new(1024, (i - 1) as u32), Peer::empty(i),) })
270            )
271        );
272        let set = cache.get(1025).await.unwrap().unwrap();
273        assert_eq!(
274            set.as_ref().clone(),
275            HashMap::from_iter(
276                (1..=3).map(|i| { (FlowIdent::new(1024, (i - 1) as u32), Peer::empty(i),) })
277            )
278        );
279        let result = cache.get(1026).await.unwrap().unwrap();
280        assert_eq!(result.len(), 0);
281    }
282
283    #[tokio::test]
284    async fn test_create_flow() {
285        let mem_kv = Arc::new(MemoryKvBackend::default());
286        let cache = CacheBuilder::new(128).build();
287        let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
288        let ident = vec![CacheIdent::CreateFlow(CreateFlow {
289            flow_id: 2001,
290            source_table_ids: vec![1024, 1025],
291            partition_to_peer_mapping: (1..=5).map(|i| (i as u32, Peer::empty(i + 1))).collect(),
292        })];
293        cache.invalidate(&ident).await.unwrap();
294        let set = cache.get(1024).await.unwrap().unwrap();
295        assert_eq!(set.len(), 5);
296        let set = cache.get(1025).await.unwrap().unwrap();
297        assert_eq!(set.len(), 5);
298    }
299
300    #[tokio::test]
301    async fn test_replace_flow() {
302        let mem_kv = Arc::new(MemoryKvBackend::default());
303        let cache = CacheBuilder::new(128).build();
304        let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
305        let ident = vec![CacheIdent::CreateFlow(CreateFlow {
306            flow_id: 2001,
307            source_table_ids: vec![1024, 1025],
308            partition_to_peer_mapping: (1..=5).map(|i| (i as u32, Peer::empty(i + 1))).collect(),
309        })];
310        cache.invalidate(&ident).await.unwrap();
311        let set = cache.get(1024).await.unwrap().unwrap();
312        assert_eq!(set.len(), 5);
313        let set = cache.get(1025).await.unwrap().unwrap();
314        assert_eq!(set.len(), 5);
315
316        let drop_then_create_flow = vec![
317            CacheIdent::DropFlow(DropFlow {
318                flow_id: 2001,
319                source_table_ids: vec![1024, 1025],
320                flow_part2node_id: (1..=5).map(|i| (i as u32, i + 1)).collect(),
321            }),
322            CacheIdent::CreateFlow(CreateFlow {
323                flow_id: 2001,
324                source_table_ids: vec![1026, 1027],
325                partition_to_peer_mapping: (11..=15)
326                    .map(|i| (i as u32, Peer::empty(i + 1)))
327                    .collect(),
328            }),
329            CacheIdent::FlowId(2001),
330        ];
331        cache.invalidate(&drop_then_create_flow).await.unwrap();
332
333        let set = cache.get(1024).await.unwrap().unwrap();
334        assert!(set.is_empty());
335
336        let expected = HashMap::from_iter(
337            (11..=15).map(|i| (FlowIdent::new(2001, i as u32), Peer::empty(i + 1))),
338        );
339        let set = cache.get(1026).await.unwrap().unwrap();
340
341        assert_eq!(set.as_ref().clone(), expected);
342
343        let set = cache.get(1027).await.unwrap().unwrap();
344
345        assert_eq!(set.as_ref().clone(), expected);
346    }
347
348    #[tokio::test]
349    async fn test_drop_flow() {
350        let mem_kv = Arc::new(MemoryKvBackend::default());
351        let cache = CacheBuilder::new(128).build();
352        let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
353        let ident = vec![
354            CacheIdent::CreateFlow(CreateFlow {
355                flow_id: 2001,
356                source_table_ids: vec![1024, 1025],
357                partition_to_peer_mapping: (1..=5)
358                    .map(|i| (i as u32, Peer::empty(i + 1)))
359                    .collect(),
360            }),
361            CacheIdent::CreateFlow(CreateFlow {
362                flow_id: 2002,
363                source_table_ids: vec![1024, 1025],
364                partition_to_peer_mapping: (11..=12)
365                    .map(|i| (i as u32, Peer::empty(i + 1)))
366                    .collect(),
367            }),
368            // same flownode that hold multiple flows
369            CacheIdent::CreateFlow(CreateFlow {
370                flow_id: 2003,
371                source_table_ids: vec![1024, 1025],
372                partition_to_peer_mapping: (1..=5)
373                    .map(|i| (i as u32, Peer::empty(i + 1)))
374                    .collect(),
375            }),
376        ];
377        cache.invalidate(&ident).await.unwrap();
378        let set = cache.get(1024).await.unwrap().unwrap();
379        assert_eq!(set.len(), 12);
380        let set = cache.get(1025).await.unwrap().unwrap();
381        assert_eq!(set.len(), 12);
382
383        let ident = vec![CacheIdent::DropFlow(DropFlow {
384            flow_id: 2001,
385            source_table_ids: vec![1024, 1025],
386            flow_part2node_id: (1..=5).map(|i| (i as u32, i + 1)).collect(),
387        })];
388        cache.invalidate(&ident).await.unwrap();
389        let set = cache.get(1024).await.unwrap().unwrap();
390        assert_eq!(
391            set.as_ref().clone(),
392            HashMap::from_iter(
393                (11..=12)
394                    .map(|i| (FlowIdent::new(2002, i as u32), Peer::empty(i + 1)))
395                    .chain((1..=5).map(|i| (FlowIdent::new(2003, i as u32), Peer::empty(i + 1))))
396            )
397        );
398        let set = cache.get(1025).await.unwrap().unwrap();
399        assert_eq!(
400            set.as_ref().clone(),
401            HashMap::from_iter(
402                (11..=12)
403                    .map(|i| (FlowIdent::new(2002, i as u32), Peer::empty(i + 1)))
404                    .chain((1..=5).map(|i| (FlowIdent::new(2003, i as u32), Peer::empty(i + 1))))
405            )
406        );
407    }
408}