common_meta/key/
flow.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod flow_info;
16pub(crate) mod flow_name;
17pub mod flow_route;
18pub mod flow_state;
19mod flownode_addr_helper;
20pub(crate) mod flownode_flow;
21pub(crate) mod table_flow;
22use std::ops::Deref;
23use std::sync::Arc;
24
25use common_telemetry::info;
26use flow_route::{FlowRouteKey, FlowRouteManager, FlowRouteValue};
27use snafu::{OptionExt, ensure};
28use table_flow::TableFlowValue;
29
30use self::flow_info::{FlowInfoKey, FlowInfoValue};
31use self::flow_name::FlowNameKey;
32use self::flownode_flow::FlownodeFlowKey;
33use self::table_flow::TableFlowKey;
34use crate::ensure_values;
35use crate::error::{self, Result};
36use crate::key::flow::flow_info::FlowInfoManager;
37use crate::key::flow::flow_name::FlowNameManager;
38use crate::key::flow::flow_state::FlowStateManager;
39use crate::key::flow::flownode_flow::FlownodeFlowManager;
40pub use crate::key::flow::table_flow::{TableFlowManager, TableFlowManagerRef};
41use crate::key::txn_helper::TxnOpGetResponseSet;
42use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey};
43use crate::kv_backend::KvBackendRef;
44use crate::kv_backend::txn::Txn;
45use crate::rpc::store::BatchDeleteRequest;
46
47/// The key of `__flow/` scope.
48#[derive(Debug, Clone, PartialEq)]
49pub struct FlowScoped<T> {
50    inner: T,
51}
52
53impl<T> Deref for FlowScoped<T> {
54    type Target = T;
55
56    fn deref(&self) -> &Self::Target {
57        &self.inner
58    }
59}
60
61impl<T> FlowScoped<T> {
62    const PREFIX: &'static str = "__flow/";
63
64    /// Returns a new [FlowScoped] key.
65    pub fn new(inner: T) -> FlowScoped<T> {
66        Self { inner }
67    }
68}
69
70impl<'a, T: MetadataKey<'a, T>> MetadataKey<'a, FlowScoped<T>> for FlowScoped<T> {
71    fn to_bytes(&self) -> Vec<u8> {
72        let prefix = FlowScoped::<T>::PREFIX.as_bytes();
73        let inner = self.inner.to_bytes();
74        let mut bytes = Vec::with_capacity(prefix.len() + inner.len());
75        bytes.extend(prefix);
76        bytes.extend(inner);
77        bytes
78    }
79
80    fn from_bytes(bytes: &'a [u8]) -> Result<FlowScoped<T>> {
81        let prefix = FlowScoped::<T>::PREFIX.as_bytes();
82        ensure!(
83            bytes.starts_with(prefix),
84            error::MismatchPrefixSnafu {
85                prefix: String::from_utf8_lossy(prefix),
86                key: String::from_utf8_lossy(bytes),
87            }
88        );
89        let inner = T::from_bytes(&bytes[prefix.len()..])?;
90        Ok(FlowScoped { inner })
91    }
92}
93
94pub type FlowMetadataManagerRef = Arc<FlowMetadataManager>;
95
96/// The manager of metadata, provides ability to:
97/// - Create metadata of the flow.
98/// - Retrieve metadata of the flow.
99/// - Delete metadata of the flow.
100pub struct FlowMetadataManager {
101    flow_info_manager: FlowInfoManager,
102    flow_route_manager: FlowRouteManager,
103    flownode_flow_manager: FlownodeFlowManager,
104    table_flow_manager: TableFlowManager,
105    flow_name_manager: FlowNameManager,
106    /// only metasrv have access to itself's memory backend, so for other case it should be None
107    flow_state_manager: Option<FlowStateManager>,
108    kv_backend: KvBackendRef,
109}
110
111impl FlowMetadataManager {
112    /// Returns a new [`FlowMetadataManager`].
113    pub fn new(kv_backend: KvBackendRef) -> Self {
114        Self {
115            flow_info_manager: FlowInfoManager::new(kv_backend.clone()),
116            flow_route_manager: FlowRouteManager::new(kv_backend.clone()),
117            flow_name_manager: FlowNameManager::new(kv_backend.clone()),
118            flownode_flow_manager: FlownodeFlowManager::new(kv_backend.clone()),
119            table_flow_manager: TableFlowManager::new(kv_backend.clone()),
120            flow_state_manager: None,
121            kv_backend,
122        }
123    }
124
125    /// Returns the [`FlowNameManager`].
126    pub fn flow_name_manager(&self) -> &FlowNameManager {
127        &self.flow_name_manager
128    }
129
130    pub fn flow_state_manager(&self) -> Option<&FlowStateManager> {
131        self.flow_state_manager.as_ref()
132    }
133
134    /// Returns the [`FlowInfoManager`].
135    pub fn flow_info_manager(&self) -> &FlowInfoManager {
136        &self.flow_info_manager
137    }
138
139    /// Returns the [`FlowRouteManager`].
140    pub fn flow_route_manager(&self) -> &FlowRouteManager {
141        &self.flow_route_manager
142    }
143
144    /// Returns the [`FlownodeFlowManager`].
145    pub fn flownode_flow_manager(&self) -> &FlownodeFlowManager {
146        &self.flownode_flow_manager
147    }
148
149    /// Returns the [`TableFlowManager`].
150    pub fn table_flow_manager(&self) -> &TableFlowManager {
151        &self.table_flow_manager
152    }
153
154    /// Creates metadata for flow and returns an error if different metadata exists.
155    pub async fn create_flow_metadata(
156        &self,
157        flow_id: FlowId,
158        flow_info: FlowInfoValue,
159        flow_routes: Vec<(FlowPartitionId, FlowRouteValue)>,
160    ) -> Result<()> {
161        let (create_flow_flow_name_txn, on_create_flow_flow_name_failure) = self
162            .flow_name_manager
163            .build_create_txn(&flow_info.catalog_name, &flow_info.flow_name, flow_id)?;
164
165        let (create_flow_txn, on_create_flow_failure) = self
166            .flow_info_manager
167            .build_create_txn(flow_id, &flow_info)?;
168
169        let create_flow_routes_txn = self
170            .flow_route_manager
171            .build_create_txn(flow_id, flow_routes.clone())?;
172
173        let create_flownode_flow_txn = self
174            .flownode_flow_manager
175            .build_create_txn(flow_id, flow_info.flownode_ids().clone());
176
177        let create_table_flow_txn = self.table_flow_manager.build_create_txn(
178            flow_id,
179            flow_routes
180                .into_iter()
181                .map(|(partition_id, route)| (partition_id, TableFlowValue { peer: route.peer }))
182                .collect(),
183            flow_info.source_table_ids(),
184        )?;
185
186        let txn = Txn::merge_all(vec![
187            create_flow_flow_name_txn,
188            create_flow_txn,
189            create_flow_routes_txn,
190            create_flownode_flow_txn,
191            create_table_flow_txn,
192        ]);
193        info!(
194            "Creating flow {}.{}({}), with {} txn operations",
195            flow_info.catalog_name,
196            flow_info.flow_name,
197            flow_id,
198            txn.max_operations()
199        );
200
201        let mut resp = self.kv_backend.txn(txn).await?;
202        if !resp.succeeded {
203            let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
204            let remote_flow_flow_name =
205                on_create_flow_flow_name_failure(&mut set)?.with_context(|| {
206                    error::UnexpectedSnafu {
207                        err_msg: format!(
208                    "Reads the empty flow name in comparing operation of the creating flow, flow_id: {flow_id}"
209                ),
210                    }
211                })?;
212
213            if remote_flow_flow_name.flow_id() != flow_id {
214                info!(
215                    "Trying to create flow {}.{}({}), but flow({}) already exists",
216                    flow_info.catalog_name,
217                    flow_info.flow_name,
218                    flow_id,
219                    remote_flow_flow_name.flow_id()
220                );
221
222                return error::FlowAlreadyExistsSnafu {
223                    flow_name: format!("{}.{}", flow_info.catalog_name, flow_info.flow_name),
224                }
225                .fail();
226            }
227
228            let remote_flow =
229                on_create_flow_failure(&mut set)?.with_context(|| error::UnexpectedSnafu {
230                    err_msg: format!(
231                        "Reads the empty flow in comparing operation of creating flow, flow_id: {flow_id}"
232                    ),
233                })?;
234            let op_name = "creating flow";
235            ensure_values!(*remote_flow, flow_info, op_name);
236        }
237
238        Ok(())
239    }
240
241    /// Update metadata for flow and returns an error if old metadata IS NOT exists.
242    pub async fn update_flow_metadata(
243        &self,
244        flow_id: FlowId,
245        current_flow_info: &DeserializedValueWithBytes<FlowInfoValue>,
246        new_flow_info: &FlowInfoValue,
247        flow_routes: Vec<(FlowPartitionId, FlowRouteValue)>,
248    ) -> Result<()> {
249        let (update_flow_flow_name_txn, on_create_flow_flow_name_failure) =
250            self.flow_name_manager.build_update_txn(
251                &new_flow_info.catalog_name,
252                &new_flow_info.flow_name,
253                flow_id,
254            )?;
255
256        let (update_flow_txn, on_create_flow_failure) =
257            self.flow_info_manager
258                .build_update_txn(flow_id, current_flow_info, new_flow_info)?;
259
260        let update_flow_routes_txn = self.flow_route_manager.build_update_txn(
261            flow_id,
262            current_flow_info,
263            flow_routes.clone(),
264        )?;
265
266        let update_flownode_flow_txn = self.flownode_flow_manager.build_update_txn(
267            flow_id,
268            current_flow_info,
269            new_flow_info.flownode_ids().clone(),
270        );
271
272        let update_table_flow_txn = self.table_flow_manager.build_update_txn(
273            flow_id,
274            current_flow_info,
275            flow_routes
276                .into_iter()
277                .map(|(partition_id, route)| (partition_id, TableFlowValue { peer: route.peer }))
278                .collect(),
279            new_flow_info.source_table_ids(),
280        )?;
281
282        let txn = Txn::merge_all(vec![
283            update_flow_flow_name_txn,
284            update_flow_txn,
285            update_flow_routes_txn,
286            update_flownode_flow_txn,
287            update_table_flow_txn,
288        ]);
289        info!(
290            "Creating flow {}.{}({}), with {} txn operations",
291            new_flow_info.catalog_name,
292            new_flow_info.flow_name,
293            flow_id,
294            txn.max_operations()
295        );
296
297        let mut resp = self.kv_backend.txn(txn).await?;
298        if !resp.succeeded {
299            let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
300            let remote_flow_flow_name =
301                on_create_flow_flow_name_failure(&mut set)?.with_context(|| {
302                    error::UnexpectedSnafu {
303                        err_msg: format!(
304                        "Reads the empty flow name in comparing operation of the updating flow, flow_id: {flow_id}"
305                    ),
306                    }
307                })?;
308
309            if remote_flow_flow_name.flow_id() != flow_id {
310                info!(
311                    "Trying to updating flow {}.{}({}), but flow({}) already exists with a different flow id",
312                    new_flow_info.catalog_name,
313                    new_flow_info.flow_name,
314                    flow_id,
315                    remote_flow_flow_name.flow_id()
316                );
317
318                return error::UnexpectedSnafu {
319                    err_msg: format!(
320                        "Reads different flow id when updating flow({2}.{3}), prev flow id = {0}, updating with flow id = {1}",
321                        remote_flow_flow_name.flow_id(),
322                        flow_id,
323                        new_flow_info.catalog_name,
324                        new_flow_info.flow_name,
325                    ),
326                }.fail();
327            }
328
329            let remote_flow =
330                on_create_flow_failure(&mut set)?.with_context(|| error::UnexpectedSnafu {
331                    err_msg: format!(
332                        "Reads the empty flow in comparing operation of the updating flow, flow_id: {flow_id}"
333                    ),
334                })?;
335            let op_name = "updating flow";
336            ensure_values!(*remote_flow, new_flow_info.clone(), op_name);
337        }
338
339        Ok(())
340    }
341
342    fn flow_metadata_keys(&self, flow_id: FlowId, flow_value: &FlowInfoValue) -> Vec<Vec<u8>> {
343        let source_table_ids = flow_value.source_table_ids();
344        let mut keys =
345            Vec::with_capacity(2 + flow_value.flownode_ids.len() * (source_table_ids.len() + 2));
346        // Builds flow name key
347        let flow_name = FlowNameKey::new(&flow_value.catalog_name, &flow_value.flow_name);
348        keys.push(flow_name.to_bytes());
349
350        // Builds flow value key
351        let flow_info_key = FlowInfoKey::new(flow_id);
352        keys.push(flow_info_key.to_bytes());
353
354        // Builds flownode flow keys & table flow keys
355        flow_value
356            .flownode_ids
357            .iter()
358            .for_each(|(&partition_id, &flownode_id)| {
359                keys.push(FlownodeFlowKey::new(flownode_id, flow_id, partition_id).to_bytes());
360                keys.push(FlowRouteKey::new(flow_id, partition_id).to_bytes());
361                source_table_ids.iter().for_each(|&table_id| {
362                    keys.push(
363                        TableFlowKey::new(table_id, flownode_id, flow_id, partition_id).to_bytes(),
364                    );
365                })
366            });
367        keys
368    }
369
370    /// Deletes metadata for table **permanently**.
371    pub async fn destroy_flow_metadata(
372        &self,
373        flow_id: FlowId,
374        flow_value: &FlowInfoValue,
375    ) -> Result<()> {
376        let keys = self.flow_metadata_keys(flow_id, flow_value);
377        let _ = self
378            .kv_backend
379            .batch_delete(BatchDeleteRequest::new().with_keys(keys))
380            .await?;
381        Ok(())
382    }
383}
384
385impl std::fmt::Debug for FlowMetadataManager {
386    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
387        f.debug_struct("FlowMetadataManager").finish()
388    }
389}
390
391#[cfg(test)]
392mod tests {
393    use std::assert_matches::assert_matches;
394    use std::collections::BTreeMap;
395    use std::sync::Arc;
396
397    use futures::TryStreamExt;
398    use table::metadata::TableId;
399    use table::table_name::TableName;
400
401    use super::*;
402    use crate::FlownodeId;
403    use crate::key::FlowPartitionId;
404    use crate::key::flow::table_flow::TableFlowKey;
405    use crate::kv_backend::memory::MemoryKvBackend;
406    use crate::peer::Peer;
407
408    #[derive(Debug)]
409    struct MockKey {
410        inner: Vec<u8>,
411    }
412
413    impl<'a> MetadataKey<'a, MockKey> for MockKey {
414        fn to_bytes(&self) -> Vec<u8> {
415            self.inner.clone()
416        }
417
418        fn from_bytes(bytes: &'a [u8]) -> Result<MockKey> {
419            Ok(MockKey {
420                inner: bytes.to_vec(),
421            })
422        }
423    }
424
425    #[test]
426    fn test_flow_scoped_to_bytes() {
427        let key = FlowScoped::new(MockKey {
428            inner: b"hi".to_vec(),
429        });
430        assert_eq!(b"__flow/hi".to_vec(), key.to_bytes());
431    }
432
433    #[test]
434    fn test_flow_scoped_from_bytes() {
435        let bytes = b"__flow/hi";
436        let key = FlowScoped::<MockKey>::from_bytes(bytes).unwrap();
437        assert_eq!(key.inner.inner, b"hi".to_vec());
438    }
439
440    #[test]
441    fn test_flow_scoped_from_bytes_mismatch() {
442        let bytes = b"__table/hi";
443        let err = FlowScoped::<MockKey>::from_bytes(bytes).unwrap_err();
444        assert_matches!(err, error::Error::MismatchPrefix { .. });
445    }
446
447    fn test_flow_info_value(
448        flow_name: &str,
449        flownode_ids: BTreeMap<FlowPartitionId, FlownodeId>,
450        source_table_ids: Vec<TableId>,
451    ) -> FlowInfoValue {
452        let catalog_name = "greptime";
453        let sink_table_name = TableName {
454            catalog_name: catalog_name.to_string(),
455            schema_name: "my_schema".to_string(),
456            table_name: "sink_table".to_string(),
457        };
458        FlowInfoValue {
459            catalog_name: catalog_name.to_string(),
460            query_context: None,
461            flow_name: flow_name.to_string(),
462            source_table_ids,
463            sink_table_name,
464            flownode_ids,
465            raw_sql: "raw".to_string(),
466            expire_after: Some(300),
467            eval_interval_secs: None,
468            comment: "hi".to_string(),
469            options: Default::default(),
470            created_time: chrono::Utc::now(),
471            updated_time: chrono::Utc::now(),
472        }
473    }
474
475    #[tokio::test]
476    async fn test_create_flow_metadata() {
477        let mem_kv = Arc::new(MemoryKvBackend::default());
478        let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
479        let flow_id = 10;
480        let flow_value = test_flow_info_value(
481            "flow",
482            [(0, 1u64), (1, 2u64)].into(),
483            vec![1024, 1025, 1026],
484        );
485        let flow_routes = vec![
486            (
487                1u32,
488                FlowRouteValue {
489                    peer: Peer::empty(1),
490                },
491            ),
492            (
493                2,
494                FlowRouteValue {
495                    peer: Peer::empty(2),
496                },
497            ),
498        ];
499        flow_metadata_manager
500            .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
501            .await
502            .unwrap();
503        // Creates again.
504        flow_metadata_manager
505            .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
506            .await
507            .unwrap();
508        let got = flow_metadata_manager
509            .flow_info_manager()
510            .get(flow_id)
511            .await
512            .unwrap()
513            .unwrap();
514        let routes = flow_metadata_manager
515            .flow_route_manager()
516            .routes(flow_id)
517            .await
518            .unwrap();
519        assert_eq!(
520            routes,
521            vec![
522                (
523                    FlowRouteKey::new(flow_id, 1),
524                    FlowRouteValue {
525                        peer: Peer::empty(1),
526                    },
527                ),
528                (
529                    FlowRouteKey::new(flow_id, 2),
530                    FlowRouteValue {
531                        peer: Peer::empty(2),
532                    },
533                ),
534            ]
535        );
536        assert_eq!(got, flow_value);
537        let flows = flow_metadata_manager
538            .flownode_flow_manager()
539            .flows(1)
540            .try_collect::<Vec<_>>()
541            .await
542            .unwrap();
543        assert_eq!(flows, vec![(flow_id, 0)]);
544        for table_id in [1024, 1025, 1026] {
545            let nodes = flow_metadata_manager
546                .table_flow_manager()
547                .flows(table_id)
548                .await
549                .unwrap();
550            assert_eq!(
551                nodes,
552                vec![
553                    (
554                        TableFlowKey::new(table_id, 1, flow_id, 1),
555                        TableFlowValue {
556                            peer: Peer::empty(1)
557                        }
558                    ),
559                    (
560                        TableFlowKey::new(table_id, 2, flow_id, 2),
561                        TableFlowValue {
562                            peer: Peer::empty(2)
563                        }
564                    )
565                ]
566            );
567        }
568    }
569
570    #[tokio::test]
571    async fn test_create_flow_metadata_flow_exists_err() {
572        let mem_kv = Arc::new(MemoryKvBackend::default());
573        let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
574        let flow_id = 10;
575        let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
576        let flow_routes = vec![
577            (
578                1u32,
579                FlowRouteValue {
580                    peer: Peer::empty(1),
581                },
582            ),
583            (
584                2,
585                FlowRouteValue {
586                    peer: Peer::empty(2),
587                },
588            ),
589        ];
590        flow_metadata_manager
591            .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
592            .await
593            .unwrap();
594        // Creates again
595        let err = flow_metadata_manager
596            .create_flow_metadata(flow_id + 1, flow_value, flow_routes.clone())
597            .await
598            .unwrap_err();
599        assert_matches!(err, error::Error::FlowAlreadyExists { .. });
600    }
601
602    #[tokio::test]
603    async fn test_create_flow_metadata_unexpected_err() {
604        let mem_kv = Arc::new(MemoryKvBackend::default());
605        let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
606        let flow_id = 10;
607        let catalog_name = "greptime";
608        let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
609        let flow_routes = vec![
610            (
611                1u32,
612                FlowRouteValue {
613                    peer: Peer::empty(1),
614                },
615            ),
616            (
617                2,
618                FlowRouteValue {
619                    peer: Peer::empty(2),
620                },
621            ),
622        ];
623        flow_metadata_manager
624            .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
625            .await
626            .unwrap();
627        // Creates again.
628        let another_sink_table_name = TableName {
629            catalog_name: catalog_name.to_string(),
630            schema_name: "my_schema".to_string(),
631            table_name: "another_sink_table".to_string(),
632        };
633        let flow_value = FlowInfoValue {
634            catalog_name: "greptime".to_string(),
635            query_context: None,
636            flow_name: "flow".to_string(),
637            source_table_ids: vec![1024, 1025, 1026],
638            sink_table_name: another_sink_table_name,
639            flownode_ids: [(0, 1u64)].into(),
640            raw_sql: "raw".to_string(),
641            expire_after: Some(300),
642            eval_interval_secs: None,
643            comment: "hi".to_string(),
644            options: Default::default(),
645            created_time: chrono::Utc::now(),
646            updated_time: chrono::Utc::now(),
647        };
648        let err = flow_metadata_manager
649            .create_flow_metadata(flow_id, flow_value, flow_routes.clone())
650            .await
651            .unwrap_err();
652        assert!(err.to_string().contains("Reads the different value"));
653    }
654
655    #[tokio::test]
656    async fn test_destroy_flow_metadata() {
657        let mem_kv = Arc::new(MemoryKvBackend::default());
658        let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
659        let flow_id = 10;
660        let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
661        let flow_routes = vec![(
662            0u32,
663            FlowRouteValue {
664                peer: Peer::empty(1),
665            },
666        )];
667        flow_metadata_manager
668            .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
669            .await
670            .unwrap();
671
672        flow_metadata_manager
673            .destroy_flow_metadata(flow_id, &flow_value)
674            .await
675            .unwrap();
676        // Destroys again
677        flow_metadata_manager
678            .destroy_flow_metadata(flow_id, &flow_value)
679            .await
680            .unwrap();
681        // Ensures all keys are deleted
682        assert!(mem_kv.is_empty())
683    }
684
685    #[tokio::test]
686    async fn test_update_flow_metadata() {
687        let mem_kv = Arc::new(MemoryKvBackend::default());
688        let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
689        let flow_id = 10;
690        let flow_value = test_flow_info_value(
691            "flow",
692            [(0, 1u64), (1, 2u64)].into(),
693            vec![1024, 1025, 1026],
694        );
695        let flow_routes = vec![
696            (
697                1u32,
698                FlowRouteValue {
699                    peer: Peer::empty(1),
700                },
701            ),
702            (
703                2,
704                FlowRouteValue {
705                    peer: Peer::empty(2),
706                },
707            ),
708        ];
709        flow_metadata_manager
710            .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
711            .await
712            .unwrap();
713
714        let new_flow_value = {
715            let mut tmp = flow_value.clone();
716            tmp.raw_sql = "new".to_string();
717            tmp
718        };
719
720        // Update flow instead
721        flow_metadata_manager
722            .update_flow_metadata(
723                flow_id,
724                &DeserializedValueWithBytes::from_inner(flow_value.clone()),
725                &new_flow_value,
726                flow_routes.clone(),
727            )
728            .await
729            .unwrap();
730
731        let got = flow_metadata_manager
732            .flow_info_manager()
733            .get(flow_id)
734            .await
735            .unwrap()
736            .unwrap();
737        let routes = flow_metadata_manager
738            .flow_route_manager()
739            .routes(flow_id)
740            .await
741            .unwrap();
742        assert_eq!(
743            routes,
744            vec![
745                (
746                    FlowRouteKey::new(flow_id, 1),
747                    FlowRouteValue {
748                        peer: Peer::empty(1),
749                    },
750                ),
751                (
752                    FlowRouteKey::new(flow_id, 2),
753                    FlowRouteValue {
754                        peer: Peer::empty(2),
755                    },
756                ),
757            ]
758        );
759        assert_eq!(got, new_flow_value);
760        let flows = flow_metadata_manager
761            .flownode_flow_manager()
762            .flows(1)
763            .try_collect::<Vec<_>>()
764            .await
765            .unwrap();
766        assert_eq!(flows, vec![(flow_id, 0)]);
767        for table_id in [1024, 1025, 1026] {
768            let nodes = flow_metadata_manager
769                .table_flow_manager()
770                .flows(table_id)
771                .await
772                .unwrap();
773            assert_eq!(
774                nodes,
775                vec![
776                    (
777                        TableFlowKey::new(table_id, 1, flow_id, 1),
778                        TableFlowValue {
779                            peer: Peer::empty(1)
780                        }
781                    ),
782                    (
783                        TableFlowKey::new(table_id, 2, flow_id, 2),
784                        TableFlowValue {
785                            peer: Peer::empty(2)
786                        }
787                    )
788                ]
789            );
790        }
791    }
792
793    #[tokio::test]
794    async fn test_update_flow_metadata_diff_flownode() {
795        let mem_kv = Arc::new(MemoryKvBackend::default());
796        let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
797        let flow_id = 10;
798        let flow_value = test_flow_info_value(
799            "flow",
800            [(0u32, 1u64), (1u32, 2u64)].into(),
801            vec![1024, 1025, 1026],
802        );
803        let flow_routes = vec![
804            (
805                0u32,
806                FlowRouteValue {
807                    peer: Peer::empty(1),
808                },
809            ),
810            (
811                1,
812                FlowRouteValue {
813                    peer: Peer::empty(2),
814                },
815            ),
816        ];
817        flow_metadata_manager
818            .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
819            .await
820            .unwrap();
821
822        let new_flow_value = {
823            let mut tmp = flow_value.clone();
824            tmp.raw_sql = "new".to_string();
825            // move to different flownodes
826            tmp.flownode_ids = [(0, 3u64), (1, 4u64)].into();
827            tmp
828        };
829        let new_flow_routes = vec![
830            (
831                0u32,
832                FlowRouteValue {
833                    peer: Peer::empty(3),
834                },
835            ),
836            (
837                1,
838                FlowRouteValue {
839                    peer: Peer::empty(4),
840                },
841            ),
842        ];
843
844        // Update flow instead
845        flow_metadata_manager
846            .update_flow_metadata(
847                flow_id,
848                &DeserializedValueWithBytes::from_inner(flow_value.clone()),
849                &new_flow_value,
850                new_flow_routes.clone(),
851            )
852            .await
853            .unwrap();
854
855        let got = flow_metadata_manager
856            .flow_info_manager()
857            .get(flow_id)
858            .await
859            .unwrap()
860            .unwrap();
861        let routes = flow_metadata_manager
862            .flow_route_manager()
863            .routes(flow_id)
864            .await
865            .unwrap();
866        assert_eq!(
867            routes,
868            vec![
869                (
870                    FlowRouteKey::new(flow_id, 0),
871                    FlowRouteValue {
872                        peer: Peer::empty(3),
873                    },
874                ),
875                (
876                    FlowRouteKey::new(flow_id, 1),
877                    FlowRouteValue {
878                        peer: Peer::empty(4),
879                    },
880                ),
881            ]
882        );
883        assert_eq!(got, new_flow_value);
884
885        let flows = flow_metadata_manager
886            .flownode_flow_manager()
887            .flows(1)
888            .try_collect::<Vec<_>>()
889            .await
890            .unwrap();
891        // should moved to different flownode
892        assert_eq!(flows, vec![]);
893
894        let flows = flow_metadata_manager
895            .flownode_flow_manager()
896            .flows(3)
897            .try_collect::<Vec<_>>()
898            .await
899            .unwrap();
900        assert_eq!(flows, vec![(flow_id, 0)]);
901
902        for table_id in [1024, 1025, 1026] {
903            let nodes = flow_metadata_manager
904                .table_flow_manager()
905                .flows(table_id)
906                .await
907                .unwrap();
908            assert_eq!(
909                nodes,
910                vec![
911                    (
912                        TableFlowKey::new(table_id, 3, flow_id, 0),
913                        TableFlowValue {
914                            peer: Peer::empty(3)
915                        }
916                    ),
917                    (
918                        TableFlowKey::new(table_id, 4, flow_id, 1),
919                        TableFlowValue {
920                            peer: Peer::empty(4)
921                        }
922                    )
923                ]
924            );
925        }
926    }
927
928    #[tokio::test]
929    async fn test_update_flow_metadata_flow_replace_diff_id_err() {
930        let mem_kv = Arc::new(MemoryKvBackend::default());
931        let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
932        let flow_id = 10;
933        let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
934        let flow_routes = vec![
935            (
936                1u32,
937                FlowRouteValue {
938                    peer: Peer::empty(1),
939                },
940            ),
941            (
942                2,
943                FlowRouteValue {
944                    peer: Peer::empty(2),
945                },
946            ),
947        ];
948        flow_metadata_manager
949            .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
950            .await
951            .unwrap();
952        // update again with same flow id
953        flow_metadata_manager
954            .update_flow_metadata(
955                flow_id,
956                &DeserializedValueWithBytes::from_inner(flow_value.clone()),
957                &flow_value,
958                flow_routes.clone(),
959            )
960            .await
961            .unwrap();
962        // update again with wrong flow id, expected error
963        let err = flow_metadata_manager
964            .update_flow_metadata(
965                flow_id + 1,
966                &DeserializedValueWithBytes::from_inner(flow_value.clone()),
967                &flow_value,
968                flow_routes,
969            )
970            .await
971            .unwrap_err();
972        assert_matches!(err, error::Error::Unexpected { .. });
973        assert!(
974            err.to_string()
975                .contains("Reads different flow id when updating flow")
976        );
977    }
978
979    #[tokio::test]
980    async fn test_update_flow_metadata_unexpected_err_prev_value_diff() {
981        let mem_kv = Arc::new(MemoryKvBackend::default());
982        let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
983        let flow_id = 10;
984        let catalog_name = "greptime";
985        let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
986        let flow_routes = vec![
987            (
988                1u32,
989                FlowRouteValue {
990                    peer: Peer::empty(1),
991                },
992            ),
993            (
994                2,
995                FlowRouteValue {
996                    peer: Peer::empty(2),
997                },
998            ),
999        ];
1000        flow_metadata_manager
1001            .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
1002            .await
1003            .unwrap();
1004        // Creates again.
1005        let another_sink_table_name = TableName {
1006            catalog_name: catalog_name.to_string(),
1007            schema_name: "my_schema".to_string(),
1008            table_name: "another_sink_table".to_string(),
1009        };
1010        let flow_value = FlowInfoValue {
1011            catalog_name: "greptime".to_string(),
1012            query_context: None,
1013            flow_name: "flow".to_string(),
1014            source_table_ids: vec![1024, 1025, 1026],
1015            sink_table_name: another_sink_table_name,
1016            flownode_ids: [(0, 1u64)].into(),
1017            raw_sql: "raw".to_string(),
1018            expire_after: Some(300),
1019            eval_interval_secs: None,
1020            comment: "hi".to_string(),
1021            options: Default::default(),
1022            created_time: chrono::Utc::now(),
1023            updated_time: chrono::Utc::now(),
1024        };
1025        let err = flow_metadata_manager
1026            .update_flow_metadata(
1027                flow_id,
1028                &DeserializedValueWithBytes::from_inner(flow_value.clone()),
1029                &flow_value,
1030                flow_routes.clone(),
1031            )
1032            .await
1033            .unwrap_err();
1034        assert!(
1035            err.to_string().contains("Reads the different value"),
1036            "error: {:?}",
1037            err
1038        );
1039    }
1040}