common_meta/cache/flow/
table_flownode.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use common_telemetry::info;
19use futures::future::BoxFuture;
20use moka::future::Cache;
21use moka::ops::compute::Op;
22use table::metadata::TableId;
23
24use crate::cache::{CacheContainer, Initializer};
25use crate::error::Result;
26use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
27use crate::key::flow::{TableFlowManager, TableFlowManagerRef};
28use crate::key::{FlowId, FlowPartitionId};
29use crate::kv_backend::KvBackendRef;
30use crate::peer::Peer;
31
32/// Flow id&flow partition key
33#[derive(Debug, Clone, PartialEq, Eq, Hash)]
34pub struct FlowIdent {
35    pub flow_id: FlowId,
36    pub partition_id: FlowPartitionId,
37}
38
39impl FlowIdent {
40    pub fn new(flow_id: FlowId, partition_id: FlowPartitionId) -> Self {
41        Self {
42            flow_id,
43            partition_id,
44        }
45    }
46}
47
48/// cache for TableFlowManager, the table_id part is in the outer cache
49/// include flownode_id, flow_id, partition_id mapping to Peer
50type FlownodeFlowSet = Arc<HashMap<FlowIdent, Peer>>;
51
52pub type TableFlownodeSetCacheRef = Arc<TableFlownodeSetCache>;
53
54/// [TableFlownodeSetCache] caches the [TableId] to [FlownodeSet] mapping.
55pub type TableFlownodeSetCache = CacheContainer<TableId, FlownodeFlowSet, CacheIdent>;
56
57/// Constructs a [TableFlownodeSetCache].
58pub fn new_table_flownode_set_cache(
59    name: String,
60    cache: Cache<TableId, FlownodeFlowSet>,
61    kv_backend: KvBackendRef,
62) -> TableFlownodeSetCache {
63    let table_flow_manager = Arc::new(TableFlowManager::new(kv_backend));
64    let init = init_factory(table_flow_manager);
65
66    CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
67}
68
69fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer<TableId, FlownodeFlowSet> {
70    Arc::new(move |&table_id| {
71        let table_flow_manager = table_flow_manager.clone();
72        Box::pin(async move {
73            table_flow_manager
74                .flows(table_id)
75                .await
76                .map(|flows| {
77                    flows
78                        .into_iter()
79                        .map(|(key, value)| {
80                            (
81                                FlowIdent::new(key.flow_id(), key.partition_id()),
82                                value.peer,
83                            )
84                        })
85                        .collect::<HashMap<_, _>>()
86                })
87                // We must cache the `HashSet` even if it's empty,
88                // to avoid future requests to the remote storage next time;
89                // If the value is added to the remote storage,
90                // we have a corresponding cache invalidation mechanism to invalidate `(Key, EmptyHashSet)`.
91                .map(Arc::new)
92                .map(Some)
93                .inspect(|set| {
94                    if set.as_ref().map(|s| !s.is_empty()).unwrap_or(false) {
95                        info!(
96                            "Initialized table_flownode cache for table_id: {}, set: {:?}",
97                            table_id, set
98                        );
99                    };
100                })
101        })
102    })
103}
104
105async fn handle_create_flow(
106    cache: &Cache<TableId, FlownodeFlowSet>,
107    CreateFlow {
108        flow_id,
109        source_table_ids,
110        partition_to_peer_mapping: flow_part2nodes,
111    }: &CreateFlow,
112) {
113    for table_id in source_table_ids {
114        let entry = cache.entry(*table_id);
115        entry
116            .and_compute_with(
117                async |entry: Option<moka::Entry<u32, FlownodeFlowSet>>| match entry {
118                    Some(entry) => {
119                        let mut map = entry.into_value().as_ref().clone();
120                        map.extend(
121                            flow_part2nodes.iter().map(|(part, peer)| {
122                                (FlowIdent::new(*flow_id, *part), peer.clone())
123                            }),
124                        );
125
126                        Op::Put(Arc::new(map))
127                    }
128                    None => {
129                        Op::Put(Arc::new(HashMap::from_iter(flow_part2nodes.iter().map(
130                            |(part, peer)| (FlowIdent::new(*flow_id, *part), peer.clone()),
131                        ))))
132                    }
133                },
134            )
135            .await;
136    }
137}
138
139async fn handle_drop_flow(
140    cache: &Cache<TableId, FlownodeFlowSet>,
141    DropFlow {
142        flow_id,
143        source_table_ids,
144        flow_part2node_id,
145    }: &DropFlow,
146) {
147    for table_id in source_table_ids {
148        let entry = cache.entry(*table_id);
149        entry
150            .and_compute_with(
151                async |entry: Option<moka::Entry<u32, FlownodeFlowSet>>| match entry {
152                    Some(entry) => {
153                        let mut set = entry.into_value().as_ref().clone();
154                        for (part, _node) in flow_part2node_id {
155                            let key = FlowIdent::new(*flow_id, *part);
156                            set.remove(&key);
157                        }
158
159                        Op::Put(Arc::new(set))
160                    }
161                    None => {
162                        // Do nothing
163                        Op::Nop
164                    }
165                },
166            )
167            .await;
168    }
169}
170
171fn invalidator<'a>(
172    cache: &'a Cache<TableId, FlownodeFlowSet>,
173    ident: &'a CacheIdent,
174) -> BoxFuture<'a, Result<()>> {
175    Box::pin(async move {
176        match ident {
177            CacheIdent::CreateFlow(create_flow) => handle_create_flow(cache, create_flow).await,
178            CacheIdent::DropFlow(drop_flow) => handle_drop_flow(cache, drop_flow).await,
179            CacheIdent::FlowNodeAddressChange(node_id) => {
180                info!(
181                    "Invalidate flow node cache for node_id in table_flownode: {}",
182                    node_id
183                );
184                cache.invalidate_all();
185            }
186            _ => {}
187        }
188        Ok(())
189    })
190}
191
192fn filter(ident: &CacheIdent) -> bool {
193    matches!(
194        ident,
195        CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_) | CacheIdent::FlowNodeAddressChange(_)
196    )
197}
198
199#[cfg(test)]
200mod tests {
201    use std::collections::{BTreeMap, HashMap};
202    use std::sync::Arc;
203
204    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
205    use moka::future::CacheBuilder;
206    use table::table_name::TableName;
207
208    use crate::cache::flow::table_flownode::{FlowIdent, new_table_flownode_set_cache};
209    use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
210    use crate::key::flow::FlowMetadataManager;
211    use crate::key::flow::flow_info::FlowInfoValue;
212    use crate::key::flow::flow_route::FlowRouteValue;
213    use crate::kv_backend::memory::MemoryKvBackend;
214    use crate::peer::Peer;
215
216    #[tokio::test]
217    async fn test_cache_empty_set() {
218        let mem_kv = Arc::new(MemoryKvBackend::default());
219        let cache = CacheBuilder::new(128).build();
220        let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
221        let set = cache.get(1024).await.unwrap().unwrap();
222        assert!(set.is_empty());
223    }
224
225    #[tokio::test]
226    async fn test_get() {
227        let mem_kv = Arc::new(MemoryKvBackend::default());
228        let flownode_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
229        flownode_metadata_manager
230            .create_flow_metadata(
231                1024,
232                FlowInfoValue {
233                    source_table_ids: vec![1024, 1025],
234                    sink_table_name: TableName {
235                        catalog_name: DEFAULT_CATALOG_NAME.to_string(),
236                        schema_name: DEFAULT_SCHEMA_NAME.to_string(),
237                        table_name: "sink_table".to_string(),
238                    },
239                    flownode_ids: BTreeMap::from([(0, 1), (1, 2), (2, 3)]),
240                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
241                    query_context: None,
242                    flow_name: "my_flow".to_string(),
243                    raw_sql: "sql".to_string(),
244                    expire_after: Some(300),
245                    eval_interval_secs: None,
246                    comment: "comment".to_string(),
247                    options: Default::default(),
248                    created_time: chrono::Utc::now(),
249                    updated_time: chrono::Utc::now(),
250                },
251                (1..=3)
252                    .map(|i| {
253                        (
254                            (i - 1) as u32,
255                            FlowRouteValue {
256                                peer: Peer::empty(i),
257                            },
258                        )
259                    })
260                    .collect::<Vec<_>>(),
261            )
262            .await
263            .unwrap();
264        let cache = CacheBuilder::new(128).build();
265        let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
266        let set = cache.get(1024).await.unwrap().unwrap();
267        assert_eq!(
268            set.as_ref().clone(),
269            HashMap::from_iter(
270                (1..=3).map(|i| { (FlowIdent::new(1024, (i - 1) as u32), Peer::empty(i),) })
271            )
272        );
273        let set = cache.get(1025).await.unwrap().unwrap();
274        assert_eq!(
275            set.as_ref().clone(),
276            HashMap::from_iter(
277                (1..=3).map(|i| { (FlowIdent::new(1024, (i - 1) as u32), Peer::empty(i),) })
278            )
279        );
280        let result = cache.get(1026).await.unwrap().unwrap();
281        assert_eq!(result.len(), 0);
282    }
283
284    #[tokio::test]
285    async fn test_create_flow() {
286        let mem_kv = Arc::new(MemoryKvBackend::default());
287        let cache = CacheBuilder::new(128).build();
288        let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
289        let ident = vec![CacheIdent::CreateFlow(CreateFlow {
290            flow_id: 2001,
291            source_table_ids: vec![1024, 1025],
292            partition_to_peer_mapping: (1..=5).map(|i| (i as u32, Peer::empty(i + 1))).collect(),
293        })];
294        cache.invalidate(&ident).await.unwrap();
295        let set = cache.get(1024).await.unwrap().unwrap();
296        assert_eq!(set.len(), 5);
297        let set = cache.get(1025).await.unwrap().unwrap();
298        assert_eq!(set.len(), 5);
299    }
300
301    #[tokio::test]
302    async fn test_replace_flow() {
303        let mem_kv = Arc::new(MemoryKvBackend::default());
304        let cache = CacheBuilder::new(128).build();
305        let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
306        let ident = vec![CacheIdent::CreateFlow(CreateFlow {
307            flow_id: 2001,
308            source_table_ids: vec![1024, 1025],
309            partition_to_peer_mapping: (1..=5).map(|i| (i as u32, Peer::empty(i + 1))).collect(),
310        })];
311        cache.invalidate(&ident).await.unwrap();
312        let set = cache.get(1024).await.unwrap().unwrap();
313        assert_eq!(set.len(), 5);
314        let set = cache.get(1025).await.unwrap().unwrap();
315        assert_eq!(set.len(), 5);
316
317        let drop_then_create_flow = vec![
318            CacheIdent::DropFlow(DropFlow {
319                flow_id: 2001,
320                source_table_ids: vec![1024, 1025],
321                flow_part2node_id: (1..=5).map(|i| (i as u32, i + 1)).collect(),
322            }),
323            CacheIdent::CreateFlow(CreateFlow {
324                flow_id: 2001,
325                source_table_ids: vec![1026, 1027],
326                partition_to_peer_mapping: (11..=15)
327                    .map(|i| (i as u32, Peer::empty(i + 1)))
328                    .collect(),
329            }),
330            CacheIdent::FlowId(2001),
331        ];
332        cache.invalidate(&drop_then_create_flow).await.unwrap();
333
334        let set = cache.get(1024).await.unwrap().unwrap();
335        assert!(set.is_empty());
336
337        let expected = HashMap::from_iter(
338            (11..=15).map(|i| (FlowIdent::new(2001, i as u32), Peer::empty(i + 1))),
339        );
340        let set = cache.get(1026).await.unwrap().unwrap();
341
342        assert_eq!(set.as_ref().clone(), expected);
343
344        let set = cache.get(1027).await.unwrap().unwrap();
345
346        assert_eq!(set.as_ref().clone(), expected);
347    }
348
349    #[tokio::test]
350    async fn test_drop_flow() {
351        let mem_kv = Arc::new(MemoryKvBackend::default());
352        let cache = CacheBuilder::new(128).build();
353        let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
354        let ident = vec![
355            CacheIdent::CreateFlow(CreateFlow {
356                flow_id: 2001,
357                source_table_ids: vec![1024, 1025],
358                partition_to_peer_mapping: (1..=5)
359                    .map(|i| (i as u32, Peer::empty(i + 1)))
360                    .collect(),
361            }),
362            CacheIdent::CreateFlow(CreateFlow {
363                flow_id: 2002,
364                source_table_ids: vec![1024, 1025],
365                partition_to_peer_mapping: (11..=12)
366                    .map(|i| (i as u32, Peer::empty(i + 1)))
367                    .collect(),
368            }),
369            // same flownode that hold multiple flows
370            CacheIdent::CreateFlow(CreateFlow {
371                flow_id: 2003,
372                source_table_ids: vec![1024, 1025],
373                partition_to_peer_mapping: (1..=5)
374                    .map(|i| (i as u32, Peer::empty(i + 1)))
375                    .collect(),
376            }),
377        ];
378        cache.invalidate(&ident).await.unwrap();
379        let set = cache.get(1024).await.unwrap().unwrap();
380        assert_eq!(set.len(), 12);
381        let set = cache.get(1025).await.unwrap().unwrap();
382        assert_eq!(set.len(), 12);
383
384        let ident = vec![CacheIdent::DropFlow(DropFlow {
385            flow_id: 2001,
386            source_table_ids: vec![1024, 1025],
387            flow_part2node_id: (1..=5).map(|i| (i as u32, i + 1)).collect(),
388        })];
389        cache.invalidate(&ident).await.unwrap();
390        let set = cache.get(1024).await.unwrap().unwrap();
391        assert_eq!(
392            set.as_ref().clone(),
393            HashMap::from_iter(
394                (11..=12)
395                    .map(|i| (FlowIdent::new(2002, i as u32), Peer::empty(i + 1)))
396                    .chain((1..=5).map(|i| (FlowIdent::new(2003, i as u32), Peer::empty(i + 1))))
397            )
398        );
399        let set = cache.get(1025).await.unwrap().unwrap();
400        assert_eq!(
401            set.as_ref().clone(),
402            HashMap::from_iter(
403                (11..=12)
404                    .map(|i| (FlowIdent::new(2002, i as u32), Peer::empty(i + 1)))
405                    .chain((1..=5).map(|i| (FlowIdent::new(2003, i as u32), Peer::empty(i + 1))))
406            )
407        );
408    }
409}