common_meta/cache/flow/
table_flownode.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use futures::future::BoxFuture;
19use moka::future::Cache;
20use moka::ops::compute::Op;
21use table::metadata::TableId;
22
23use crate::cache::{CacheContainer, Initializer};
24use crate::error::Result;
25use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
26use crate::key::flow::{TableFlowManager, TableFlowManagerRef};
27use crate::kv_backend::KvBackendRef;
28use crate::peer::Peer;
29use crate::FlownodeId;
30
31type FlownodeSet = Arc<HashMap<FlownodeId, Peer>>;
32
33pub type TableFlownodeSetCacheRef = Arc<TableFlownodeSetCache>;
34
35/// [TableFlownodeSetCache] caches the [TableId] to [FlownodeSet] mapping.
36pub type TableFlownodeSetCache = CacheContainer<TableId, FlownodeSet, CacheIdent>;
37
38/// Constructs a [TableFlownodeSetCache].
39pub fn new_table_flownode_set_cache(
40    name: String,
41    cache: Cache<TableId, FlownodeSet>,
42    kv_backend: KvBackendRef,
43) -> TableFlownodeSetCache {
44    let table_flow_manager = Arc::new(TableFlowManager::new(kv_backend));
45    let init = init_factory(table_flow_manager);
46
47    CacheContainer::new(name, cache, Box::new(invalidator), init, filter)
48}
49
50fn init_factory(table_flow_manager: TableFlowManagerRef) -> Initializer<TableId, FlownodeSet> {
51    Arc::new(move |&table_id| {
52        let table_flow_manager = table_flow_manager.clone();
53        Box::pin(async move {
54            table_flow_manager
55                .flows(table_id)
56                .await
57                .map(|flows| {
58                    flows
59                        .into_iter()
60                        .map(|(key, value)| (key.flownode_id(), value.peer))
61                        .collect::<HashMap<_, _>>()
62                })
63                // We must cache the `HashSet` even if it's empty,
64                // to avoid future requests to the remote storage next time;
65                // If the value is added to the remote storage,
66                // we have a corresponding cache invalidation mechanism to invalidate `(Key, EmptyHashSet)`.
67                .map(Arc::new)
68                .map(Some)
69        })
70    })
71}
72
73async fn handle_create_flow(
74    cache: &Cache<TableId, FlownodeSet>,
75    CreateFlow {
76        source_table_ids,
77        flownodes: flownode_peers,
78    }: &CreateFlow,
79) {
80    for table_id in source_table_ids {
81        let entry = cache.entry(*table_id);
82        entry
83            .and_compute_with(
84                async |entry: Option<moka::Entry<u32, Arc<HashMap<u64, _>>>>| match entry {
85                    Some(entry) => {
86                        let mut map = entry.into_value().as_ref().clone();
87                        map.extend(flownode_peers.iter().map(|peer| (peer.id, peer.clone())));
88
89                        Op::Put(Arc::new(map))
90                    }
91                    None => Op::Put(Arc::new(HashMap::from_iter(
92                        flownode_peers.iter().map(|peer| (peer.id, peer.clone())),
93                    ))),
94                },
95            )
96            .await;
97    }
98}
99
100async fn handle_drop_flow(
101    cache: &Cache<TableId, FlownodeSet>,
102    DropFlow {
103        source_table_ids,
104        flownode_ids,
105    }: &DropFlow,
106) {
107    for table_id in source_table_ids {
108        let entry = cache.entry(*table_id);
109        entry
110            .and_compute_with(
111                async |entry: Option<moka::Entry<u32, Arc<HashMap<u64, _>>>>| match entry {
112                    Some(entry) => {
113                        let mut set = entry.into_value().as_ref().clone();
114                        for flownode_id in flownode_ids {
115                            set.remove(flownode_id);
116                        }
117
118                        Op::Put(Arc::new(set))
119                    }
120                    None => {
121                        // Do nothing
122                        Op::Nop
123                    }
124                },
125            )
126            .await;
127    }
128}
129
130fn invalidator<'a>(
131    cache: &'a Cache<TableId, FlownodeSet>,
132    ident: &'a CacheIdent,
133) -> BoxFuture<'a, Result<()>> {
134    Box::pin(async move {
135        match ident {
136            CacheIdent::CreateFlow(create_flow) => handle_create_flow(cache, create_flow).await,
137            CacheIdent::DropFlow(drop_flow) => handle_drop_flow(cache, drop_flow).await,
138            _ => {}
139        }
140        Ok(())
141    })
142}
143
144fn filter(ident: &CacheIdent) -> bool {
145    matches!(ident, CacheIdent::CreateFlow(_) | CacheIdent::DropFlow(_))
146}
147
148#[cfg(test)]
149mod tests {
150    use std::collections::{BTreeMap, HashMap};
151    use std::sync::Arc;
152
153    use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
154    use moka::future::CacheBuilder;
155    use table::table_name::TableName;
156
157    use crate::cache::flow::table_flownode::new_table_flownode_set_cache;
158    use crate::instruction::{CacheIdent, CreateFlow, DropFlow};
159    use crate::key::flow::flow_info::FlowInfoValue;
160    use crate::key::flow::flow_route::FlowRouteValue;
161    use crate::key::flow::FlowMetadataManager;
162    use crate::kv_backend::memory::MemoryKvBackend;
163    use crate::peer::Peer;
164
165    #[tokio::test]
166    async fn test_cache_empty_set() {
167        let mem_kv = Arc::new(MemoryKvBackend::default());
168        let cache = CacheBuilder::new(128).build();
169        let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
170        let set = cache.get(1024).await.unwrap().unwrap();
171        assert!(set.is_empty());
172    }
173
174    #[tokio::test]
175    async fn test_get() {
176        let mem_kv = Arc::new(MemoryKvBackend::default());
177        let flownode_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
178        flownode_metadata_manager
179            .create_flow_metadata(
180                1024,
181                FlowInfoValue {
182                    source_table_ids: vec![1024, 1025],
183                    sink_table_name: TableName {
184                        catalog_name: DEFAULT_CATALOG_NAME.to_string(),
185                        schema_name: DEFAULT_SCHEMA_NAME.to_string(),
186                        table_name: "sink_table".to_string(),
187                    },
188                    flownode_ids: BTreeMap::from([(0, 1), (1, 2), (2, 3)]),
189                    catalog_name: DEFAULT_CATALOG_NAME.to_string(),
190                    query_context: None,
191                    flow_name: "my_flow".to_string(),
192                    raw_sql: "sql".to_string(),
193                    expire_after: Some(300),
194                    comment: "comment".to_string(),
195                    options: Default::default(),
196                    created_time: chrono::Utc::now(),
197                    updated_time: chrono::Utc::now(),
198                },
199                (1..=3)
200                    .map(|i| {
201                        (
202                            (i - 1) as u32,
203                            FlowRouteValue {
204                                peer: Peer::empty(i),
205                            },
206                        )
207                    })
208                    .collect::<Vec<_>>(),
209            )
210            .await
211            .unwrap();
212        let cache = CacheBuilder::new(128).build();
213        let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
214        let set = cache.get(1024).await.unwrap().unwrap();
215        assert_eq!(
216            set.as_ref().clone(),
217            HashMap::from_iter((1..=3).map(|i| { (i, Peer::empty(i),) }))
218        );
219        let set = cache.get(1025).await.unwrap().unwrap();
220        assert_eq!(
221            set.as_ref().clone(),
222            HashMap::from_iter((1..=3).map(|i| { (i, Peer::empty(i),) }))
223        );
224        let result = cache.get(1026).await.unwrap().unwrap();
225        assert_eq!(result.len(), 0);
226    }
227
228    #[tokio::test]
229    async fn test_create_flow() {
230        let mem_kv = Arc::new(MemoryKvBackend::default());
231        let cache = CacheBuilder::new(128).build();
232        let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
233        let ident = vec![CacheIdent::CreateFlow(CreateFlow {
234            source_table_ids: vec![1024, 1025],
235            flownodes: (1..=5).map(Peer::empty).collect(),
236        })];
237        cache.invalidate(&ident).await.unwrap();
238        let set = cache.get(1024).await.unwrap().unwrap();
239        assert_eq!(set.len(), 5);
240        let set = cache.get(1025).await.unwrap().unwrap();
241        assert_eq!(set.len(), 5);
242    }
243
244    #[tokio::test]
245    async fn test_drop_flow() {
246        let mem_kv = Arc::new(MemoryKvBackend::default());
247        let cache = CacheBuilder::new(128).build();
248        let cache = new_table_flownode_set_cache("test".to_string(), cache, mem_kv);
249        let ident = vec![
250            CacheIdent::CreateFlow(CreateFlow {
251                source_table_ids: vec![1024, 1025],
252                flownodes: (1..=5).map(Peer::empty).collect(),
253            }),
254            CacheIdent::CreateFlow(CreateFlow {
255                source_table_ids: vec![1024, 1025],
256                flownodes: (11..=12).map(Peer::empty).collect(),
257            }),
258        ];
259        cache.invalidate(&ident).await.unwrap();
260        let set = cache.get(1024).await.unwrap().unwrap();
261        assert_eq!(set.len(), 7);
262        let set = cache.get(1025).await.unwrap().unwrap();
263        assert_eq!(set.len(), 7);
264
265        let ident = vec![CacheIdent::DropFlow(DropFlow {
266            source_table_ids: vec![1024, 1025],
267            flownode_ids: vec![1, 2, 3, 4, 5],
268        })];
269        cache.invalidate(&ident).await.unwrap();
270        let set = cache.get(1024).await.unwrap().unwrap();
271        assert_eq!(
272            set.as_ref().clone(),
273            HashMap::from_iter((11..=12).map(|i| { (i, Peer::empty(i),) }))
274        );
275        let set = cache.get(1025).await.unwrap().unwrap();
276        assert_eq!(
277            set.as_ref().clone(),
278            HashMap::from_iter((11..=12).map(|i| { (i, Peer::empty(i),) }))
279        );
280    }
281}