common_meta/cache/flow/
table_flownode.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use futures::future::BoxFuture;
19use moka::future::Cache;
20use moka::ops::compute::Op;
21use table::metadata::TableId;
22
23use crate::cache::{CacheContainer, Initializer};
24use crate::error::Result;
25use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
26use crate::key::flow::{TableFlowManager, TableFlowManagerRef};
27use crate::key::{FlowId, FlowPartitionId};
28use crate::kv_backend::KvBackendRef;
29use crate::peer::Peer;
30
31/// Flow id&flow partition key
32#[derive(Debug, Clone, PartialEq, Eq, Hash)]
33pub struct FlowIdent {
34    pub flow_id: FlowId,
35    pub partition_id: FlowPartitionId,
36}
37
38impl FlowIdent {
39    pub fn new(flow_id: FlowId, partition_id: FlowPartitionId) -> Self {
40        Self {
41            flow_id,
42            partition_id,
43        }
44    }
45}
46
47/// cache for TableFlowManager, the table_id part is in the outer cache
48/// include flownode_id, flow_id, partition_id mapping to Peer
49type FlownodeFlowSet = Arc<HashMap<FlowIdent, Peer>>;
50
51pub type TableFlownodeSetCacheRef = Arc<TableFlownodeSetCache>;
52
53/// [TableFlownodeSetCache] caches the [TableId] to [FlownodeSet] mapping.
54pub type TableFlownodeSetCache = CacheContainer<TableId, FlownodeFlowSet, CacheIdent>;
55
56/// Constructs a [TableFlownodeSetCache].
57pub fn new_table_flownode_set_cache(
58    name: String,
59    cache: Cache<TableId, FlownodeFlowSet>,
60    kv_backend: KvBackendRef,
61) -> TableFlownodeSetCache {
62    let table_flow_manager = Arc::new(TableFlowManager::new(kv_backend));
63    let init = init_factory(table_flow_manager);
64
65    CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
66}
67
68fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer<TableId, FlownodeFlowSet> {
69    Arc::new(move |&table_id| {
70        let table_flow_manager = table_flow_manager.clone();
71        Box::pin(async move {
72            table_flow_manager
73                .flows(table_id)
74                .await
75                .map(|flows| {
76                    flows
77                        .into_iter()
78                        .map(|(key, value)| {
79                            (
80                                FlowIdent::new(key.flow_id(), key.partition_id()),
81                                value.peer,
82                            )
83                        })
84                        .collect::<HashMap<_, _>>()
85                })
86                // We must cache the `HashSet` even if it's empty,
87                // to avoid future requests to the remote storage next time;
88                // If the value is added to the remote storage,
89                // we have a corresponding cache invalidation mechanism to invalidate `(Key, EmptyHashSet)`.
90                .map(Arc::new)
91                .map(Some)
92        })
93    })
94}
95
96async fn handle_create_flow(
97    cache: &Cache<TableId, FlownodeFlowSet>,
98    CreateFlow {
99        flow_id,
100        source_table_ids,
101        partition_to_peer_mapping: flow_part2nodes,
102    }: &CreateFlow,
103) {
104    for table_id in source_table_ids {
105        let entry = cache.entry(*table_id);
106        entry
107            .and_compute_with(
108                async |entry: Option<moka::Entry<u32, FlownodeFlowSet>>| match entry {
109                    Some(entry) => {
110                        let mut map = entry.into_value().as_ref().clone();
111                        map.extend(
112                            flow_part2nodes.iter().map(|(part, peer)| {
113                                (FlowIdent::new(*flow_id, *part), peer.clone())
114                            }),
115                        );
116
117                        Op::Put(Arc::new(map))
118                    }
119                    None => {
120                        Op::Put(Arc::new(HashMap::from_iter(flow_part2nodes.iter().map(
121                            |(part, peer)| (FlowIdent::new(*flow_id, *part), peer.clone()),
122                        ))))
123                    }
124                },
125            )
126            .await;
127    }
128}
129
130async fn handle_drop_flow(
131    cache: &Cache<TableId, FlownodeFlowSet>,
132    DropFlow {
133        flow_id,
134        source_table_ids,
135        flow_part2node_id,
136    }: &DropFlow,
137) {
138    for table_id in source_table_ids {
139        let entry = cache.entry(*table_id);
140        entry
141            .and_compute_with(
142                async |entry: Option<moka::Entry<u32, FlownodeFlowSet>>| match entry {
143                    Some(entry) => {
144                        let mut set = entry.into_value().as_ref().clone();
145                        for (part, _node) in flow_part2node_id {
146                            let key = FlowIdent::new(*flow_id, *part);
147                            set.remove(&key);
148                        }
149
150                        Op::Put(Arc::new(set))
151                    }
152                    None => {
153                        // Do nothing
154                        Op::Nop
155                    }
156                },
157            )
158            .await;
159    }
160}
161
162fn invalidator<'a>(
163    cache: &'a Cache<TableId, FlownodeFlowSet>,
164    ident: &'a CacheIdent,
165) -> BoxFuture<'a, Result<()>> {
166    Box::pin(async move {
167        match ident {
168            CacheIdent::CreateFlow(create_flow) => handle_create_flow(cache, create_flow).await,
169            CacheIdent::DropFlow(drop_flow) => handle_drop_flow(cache, drop_flow).await,
170            _ => {}
171        }
172        Ok(())
173    })
174}
175
176fn filter(ident: &CacheIdent) -> bool {
177    matches!(ident, CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_))
178}
179
180#[cfg(test)]
181mod tests {
182    use std::collections::{BTreeMap, HashMap};
183    use std::sync::Arc;
184
185    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
186    use moka::future::CacheBuilder;
187    use table::table_name::TableName;
188
189    use crate::cache::flow::table_flownode::{new_table_flownode_set_cache, FlowIdent};
190    use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
191    use crate::key::flow::flow_info::FlowInfoValue;
192    use crate::key::flow::flow_route::FlowRouteValue;
193    use crate::key::flow::FlowMetadataManager;
194    use crate::kv_backend::memory::MemoryKvBackend;
195    use crate::peer::Peer;
196
197    #[tokio::test]
198    async fn test_cache_empty_set() {
199        let mem_kv = Arc::new(MemoryKvBackend::default());
200        let cache = CacheBuilder::new(128).build();
201        let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
202        let set = cache.get(1024).await.unwrap().unwrap();
203        assert!(set.is_empty());
204    }
205
206    #[tokio::test]
207    async fn test_get() {
208        let mem_kv = Arc::new(MemoryKvBackend::default());
209        let flownode_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
210        flownode_metadata_manager
211            .create_flow_metadata(
212                1024,
213                FlowInfoValue {
214                    source_table_ids: vec![1024, 1025],
215                    sink_table_name: TableName {
216                        catalog_name: DEFAULT_CATALOG_NAME.to_string(),
217                        schema_name: DEFAULT_SCHEMA_NAME.to_string(),
218                        table_name: "sink_table".to_string(),
219                    },
220                    flownode_ids: BTreeMap::from([(0, 1), (1, 2), (2, 3)]),
221                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
222                    query_context: None,
223                    flow_name: "my_flow".to_string(),
224                    raw_sql: "sql".to_string(),
225                    expire_after: Some(300),
226                    comment: "comment".to_string(),
227                    options: Default::default(),
228                    created_time: chrono::Utc::now(),
229                    updated_time: chrono::Utc::now(),
230                },
231                (1..=3)
232                    .map(|i| {
233                        (
234                            (i - 1) as u32,
235                            FlowRouteValue {
236                                peer: Peer::empty(i),
237                            },
238                        )
239                    })
240                    .collect::<Vec<_>>(),
241            )
242            .await
243            .unwrap();
244        let cache = CacheBuilder::new(128).build();
245        let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
246        let set = cache.get(1024).await.unwrap().unwrap();
247        assert_eq!(
248            set.as_ref().clone(),
249            HashMap::from_iter(
250                (1..=3).map(|i| { (FlowIdent::new(1024, (i - 1) as u32), Peer::empty(i),) })
251            )
252        );
253        let set = cache.get(1025).await.unwrap().unwrap();
254        assert_eq!(
255            set.as_ref().clone(),
256            HashMap::from_iter(
257                (1..=3).map(|i| { (FlowIdent::new(1024, (i - 1) as u32), Peer::empty(i),) })
258            )
259        );
260        let result = cache.get(1026).await.unwrap().unwrap();
261        assert_eq!(result.len(), 0);
262    }
263
264    #[tokio::test]
265    async fn test_create_flow() {
266        let mem_kv = Arc::new(MemoryKvBackend::default());
267        let cache = CacheBuilder::new(128).build();
268        let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
269        let ident = vec![CacheIdent::CreateFlow(CreateFlow {
270            flow_id: 2001,
271            source_table_ids: vec![1024, 1025],
272            partition_to_peer_mapping: (1..=5).map(|i| (i as u32, Peer::empty(i + 1))).collect(),
273        })];
274        cache.invalidate(&ident).await.unwrap();
275        let set = cache.get(1024).await.unwrap().unwrap();
276        assert_eq!(set.len(), 5);
277        let set = cache.get(1025).await.unwrap().unwrap();
278        assert_eq!(set.len(), 5);
279    }
280
281    #[tokio::test]
282    async fn test_replace_flow() {
283        let mem_kv = Arc::new(MemoryKvBackend::default());
284        let cache = CacheBuilder::new(128).build();
285        let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
286        let ident = vec![CacheIdent::CreateFlow(CreateFlow {
287            flow_id: 2001,
288            source_table_ids: vec![1024, 1025],
289            partition_to_peer_mapping: (1..=5).map(|i| (i as u32, Peer::empty(i + 1))).collect(),
290        })];
291        cache.invalidate(&ident).await.unwrap();
292        let set = cache.get(1024).await.unwrap().unwrap();
293        assert_eq!(set.len(), 5);
294        let set = cache.get(1025).await.unwrap().unwrap();
295        assert_eq!(set.len(), 5);
296
297        let drop_then_create_flow = vec![
298            CacheIdent::DropFlow(DropFlow {
299                flow_id: 2001,
300                source_table_ids: vec![1024, 1025],
301                flow_part2node_id: (1..=5).map(|i| (i as u32, i + 1)).collect(),
302            }),
303            CacheIdent::CreateFlow(CreateFlow {
304                flow_id: 2001,
305                source_table_ids: vec![1026, 1027],
306                partition_to_peer_mapping: (11..=15)
307                    .map(|i| (i as u32, Peer::empty(i + 1)))
308                    .collect(),
309            }),
310            CacheIdent::FlowId(2001),
311        ];
312        cache.invalidate(&drop_then_create_flow).await.unwrap();
313
314        let set = cache.get(1024).await.unwrap().unwrap();
315        assert!(set.is_empty());
316
317        let expected = HashMap::from_iter(
318            (11..=15).map(|i| (FlowIdent::new(2001, i as u32), Peer::empty(i + 1))),
319        );
320        let set = cache.get(1026).await.unwrap().unwrap();
321
322        assert_eq!(set.as_ref().clone(), expected);
323
324        let set = cache.get(1027).await.unwrap().unwrap();
325
326        assert_eq!(set.as_ref().clone(), expected);
327    }
328
329    #[tokio::test]
330    async fn test_drop_flow() {
331        let mem_kv = Arc::new(MemoryKvBackend::default());
332        let cache = CacheBuilder::new(128).build();
333        let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
334        let ident = vec![
335            CacheIdent::CreateFlow(CreateFlow {
336                flow_id: 2001,
337                source_table_ids: vec![1024, 1025],
338                partition_to_peer_mapping: (1..=5)
339                    .map(|i| (i as u32, Peer::empty(i + 1)))
340                    .collect(),
341            }),
342            CacheIdent::CreateFlow(CreateFlow {
343                flow_id: 2002,
344                source_table_ids: vec![1024, 1025],
345                partition_to_peer_mapping: (11..=12)
346                    .map(|i| (i as u32, Peer::empty(i + 1)))
347                    .collect(),
348            }),
349            // same flownode that hold multiple flows
350            CacheIdent::CreateFlow(CreateFlow {
351                flow_id: 2003,
352                source_table_ids: vec![1024, 1025],
353                partition_to_peer_mapping: (1..=5)
354                    .map(|i| (i as u32, Peer::empty(i + 1)))
355                    .collect(),
356            }),
357        ];
358        cache.invalidate(&ident).await.unwrap();
359        let set = cache.get(1024).await.unwrap().unwrap();
360        assert_eq!(set.len(), 12);
361        let set = cache.get(1025).await.unwrap().unwrap();
362        assert_eq!(set.len(), 12);
363
364        let ident = vec![CacheIdent::DropFlow(DropFlow {
365            flow_id: 2001,
366            source_table_ids: vec![1024, 1025],
367            flow_part2node_id: (1..=5).map(|i| (i as u32, i + 1)).collect(),
368        })];
369        cache.invalidate(&ident).await.unwrap();
370        let set = cache.get(1024).await.unwrap().unwrap();
371        assert_eq!(
372            set.as_ref().clone(),
373            HashMap::from_iter(
374                (11..=12)
375                    .map(|i| (FlowIdent::new(2002, i as u32), Peer::empty(i + 1)))
376                    .chain((1..=5).map(|i| (FlowIdent::new(2003, i as u32), Peer::empty(i + 1))))
377            )
378        );
379        let set = cache.get(1025).await.unwrap().unwrap();
380        assert_eq!(
381            set.as_ref().clone(),
382            HashMap::from_iter(
383                (11..=12)
384                    .map(|i| (FlowIdent::new(2002, i as u32), Peer::empty(i + 1)))
385                    .chain((1..=5).map(|i| (FlowIdent::new(2003, i as u32), Peer::empty(i + 1))))
386            )
387        );
388    }
389}