Skip to main content

common_meta/key/
flow.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod flow_info;
16pub(crate) mod flow_name;
17pub mod flow_route;
18pub mod flow_state;
19mod flownode_addr_helper;
20pub(crate) mod flownode_flow;
21pub(crate) mod table_flow;
22use std::collections::BTreeMap;
23use std::ops::Deref;
24use std::sync::Arc;
25
26use common_telemetry::info;
27use flow_route::{FlowRouteKey, FlowRouteManager, FlowRouteValue};
28use snafu::{OptionExt, ensure};
29use table_flow::TableFlowValue;
30
31use self::flow_info::{FlowInfoKey, FlowInfoValue};
32use self::flow_name::FlowNameKey;
33use self::flownode_flow::FlownodeFlowKey;
34use self::table_flow::TableFlowKey;
35use crate::ensure_values;
36use crate::error::{self, Result};
37use crate::key::flow::flow_info::FlowInfoManager;
38use crate::key::flow::flow_name::FlowNameManager;
39use crate::key::flow::flow_state::FlowStateManager;
40use crate::key::flow::flownode_flow::FlownodeFlowManager;
41pub use crate::key::flow::table_flow::{TableFlowManager, TableFlowManagerRef};
42use crate::key::txn_helper::TxnOpGetResponseSet;
43use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey};
44use crate::kv_backend::KvBackendRef;
45use crate::kv_backend::txn::Txn;
46use crate::rpc::store::BatchDeleteRequest;
47
48pub const FLOW_KEY_PREFIX: &str = "__flow";
49
50/// Returns a flow metadata key under the `__flow` scope.
51pub fn scoped_flow_key(inner_key: &str) -> String {
52    format!("{FLOW_KEY_PREFIX}/{inner_key}")
53}
54
55/// Returns a flow metadata key prefix under the `__flow` scope.
56pub fn scoped_flow_key_prefix(inner_prefix: &str) -> String {
57    scoped_flow_key(inner_prefix)
58}
59
60/// Returns the full prefix of flow info keys.
61pub fn flow_info_key_prefix() -> String {
62    scoped_flow_key_prefix(flow_info::FLOW_INFO_KEY_PREFIX)
63}
64
65/// Returns the full prefix of flow name keys.
66pub fn flow_name_key_prefix() -> String {
67    scoped_flow_key_prefix(flow_name::FLOW_NAME_KEY_PREFIX)
68}
69
70/// Returns the full prefix of flow route keys.
71pub fn flow_route_key_prefix() -> String {
72    scoped_flow_key_prefix(flow_route::FLOW_ROUTE_KEY_PREFIX)
73}
74
75/// Returns the full prefix of table flow keys.
76pub fn table_flow_key_prefix() -> String {
77    scoped_flow_key_prefix(table_flow::TABLE_FLOW_KEY_PREFIX)
78}
79
80/// Returns the full prefix of flownode flow keys.
81pub fn flownode_flow_key_prefix() -> String {
82    scoped_flow_key_prefix(flownode_flow::FLOWNODE_FLOW_KEY_PREFIX)
83}
84
85/// Returns the full key of flow state.
86pub fn flow_state_full_key() -> String {
87    scoped_flow_key(flow_state::FLOW_STATE_KEY)
88}
89
90/// The key of `__flow/` scope.
91#[derive(Debug, Clone, PartialEq)]
92pub struct FlowScoped<T> {
93    inner: T,
94}
95
96impl<T> Deref for FlowScoped<T> {
97    type Target = T;
98
99    fn deref(&self) -> &Self::Target {
100        &self.inner
101    }
102}
103
104impl<T> FlowScoped<T> {
105    const PREFIX: &'static str = "__flow/";
106
107    /// Returns a new [FlowScoped] key.
108    pub fn new(inner: T) -> FlowScoped<T> {
109        Self { inner }
110    }
111}
112
113impl<'a, T: MetadataKey<'a, T>> MetadataKey<'a, FlowScoped<T>> for FlowScoped<T> {
114    fn to_bytes(&self) -> Vec<u8> {
115        let prefix = FlowScoped::<T>::PREFIX.as_bytes();
116        let inner = self.inner.to_bytes();
117        let mut bytes = Vec::with_capacity(prefix.len() + inner.len());
118        bytes.extend(prefix);
119        bytes.extend(inner);
120        bytes
121    }
122
123    fn from_bytes(bytes: &'a [u8]) -> Result<FlowScoped<T>> {
124        let prefix = FlowScoped::<T>::PREFIX.as_bytes();
125        ensure!(
126            bytes.starts_with(prefix),
127            error::MismatchPrefixSnafu {
128                prefix: String::from_utf8_lossy(prefix),
129                key: String::from_utf8_lossy(bytes),
130            }
131        );
132        let inner = T::from_bytes(&bytes[prefix.len()..])?;
133        Ok(FlowScoped { inner })
134    }
135}
136
137pub type FlowMetadataManagerRef = Arc<FlowMetadataManager>;
138
139/// The manager of metadata, provides ability to:
140/// - Create metadata of the flow.
141/// - Retrieve metadata of the flow.
142/// - Delete metadata of the flow.
143pub struct FlowMetadataManager {
144    flow_info_manager: FlowInfoManager,
145    flow_route_manager: FlowRouteManager,
146    flownode_flow_manager: FlownodeFlowManager,
147    table_flow_manager: TableFlowManager,
148    flow_name_manager: FlowNameManager,
149    /// only metasrv have access to itself's memory backend, so for other case it should be None
150    flow_state_manager: Option<FlowStateManager>,
151    kv_backend: KvBackendRef,
152}
153
154impl FlowMetadataManager {
155    /// Returns a new [`FlowMetadataManager`].
156    pub fn new(kv_backend: KvBackendRef) -> Self {
157        Self {
158            flow_info_manager: FlowInfoManager::new(kv_backend.clone()),
159            flow_route_manager: FlowRouteManager::new(kv_backend.clone()),
160            flow_name_manager: FlowNameManager::new(kv_backend.clone()),
161            flownode_flow_manager: FlownodeFlowManager::new(kv_backend.clone()),
162            table_flow_manager: TableFlowManager::new(kv_backend.clone()),
163            flow_state_manager: None,
164            kv_backend,
165        }
166    }
167
168    /// Returns the [`FlowNameManager`].
169    pub fn flow_name_manager(&self) -> &FlowNameManager {
170        &self.flow_name_manager
171    }
172
173    pub fn flow_state_manager(&self) -> Option<&FlowStateManager> {
174        self.flow_state_manager.as_ref()
175    }
176
177    /// Returns the [`FlowInfoManager`].
178    pub fn flow_info_manager(&self) -> &FlowInfoManager {
179        &self.flow_info_manager
180    }
181
182    /// Returns the [`FlowRouteManager`].
183    pub fn flow_route_manager(&self) -> &FlowRouteManager {
184        &self.flow_route_manager
185    }
186
187    /// Returns the [`FlownodeFlowManager`].
188    pub fn flownode_flow_manager(&self) -> &FlownodeFlowManager {
189        &self.flownode_flow_manager
190    }
191
192    /// Returns the [`TableFlowManager`].
193    pub fn table_flow_manager(&self) -> &TableFlowManager {
194        &self.table_flow_manager
195    }
196
197    /// Returns a best-effort mapping from flow partition to flownode address.
198    pub async fn flownode_addrs(
199        &self,
200        flow_id: FlowId,
201    ) -> Result<BTreeMap<FlowPartitionId, String>> {
202        let routes = self.flow_route_manager.routes(flow_id).await?;
203
204        Ok(routes
205            .into_iter()
206            .filter_map(|(key, route)| {
207                let addr = route.peer.addr;
208                (!addr.is_empty()).then_some((key.partition_id(), addr))
209            })
210            .collect())
211    }
212
213    /// Creates metadata for flow and returns an error if different metadata exists.
214    pub async fn create_flow_metadata(
215        &self,
216        flow_id: FlowId,
217        flow_info: FlowInfoValue,
218        flow_routes: Vec<(FlowPartitionId, FlowRouteValue)>,
219    ) -> Result<()> {
220        let (create_flow_flow_name_txn, on_create_flow_flow_name_failure) = self
221            .flow_name_manager
222            .build_create_txn(&flow_info.catalog_name, &flow_info.flow_name, flow_id)?;
223
224        let (create_flow_txn, on_create_flow_failure) = self
225            .flow_info_manager
226            .build_create_txn(flow_id, &flow_info)?;
227
228        let create_flow_routes_txn = self
229            .flow_route_manager
230            .build_create_txn(flow_id, flow_routes.clone())?;
231
232        let create_flownode_flow_txn = self
233            .flownode_flow_manager
234            .build_create_txn(flow_id, flow_info.flownode_ids().clone());
235
236        let create_table_flow_txn = self.table_flow_manager.build_create_txn(
237            flow_id,
238            flow_routes
239                .into_iter()
240                .map(|(partition_id, route)| (partition_id, TableFlowValue { peer: route.peer }))
241                .collect(),
242            flow_info.source_table_ids(),
243        )?;
244
245        let txn = Txn::merge_all(vec![
246            create_flow_flow_name_txn,
247            create_flow_txn,
248            create_flow_routes_txn,
249            create_flownode_flow_txn,
250            create_table_flow_txn,
251        ]);
252        info!(
253            "Creating flow {}.{}({}), with {} txn operations",
254            flow_info.catalog_name,
255            flow_info.flow_name,
256            flow_id,
257            txn.max_operations()
258        );
259
260        let mut resp = self.kv_backend.txn(txn).await?;
261        if !resp.succeeded {
262            let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
263            let remote_flow_flow_name =
264                on_create_flow_flow_name_failure(&mut set)?.with_context(|| {
265                    error::UnexpectedSnafu {
266                        err_msg: format!(
267                    "Reads the empty flow name in comparing operation of the creating flow, flow_id: {flow_id}"
268                ),
269                    }
270                })?;
271
272            if remote_flow_flow_name.flow_id() != flow_id {
273                info!(
274                    "Trying to create flow {}.{}({}), but flow({}) already exists",
275                    flow_info.catalog_name,
276                    flow_info.flow_name,
277                    flow_id,
278                    remote_flow_flow_name.flow_id()
279                );
280
281                return error::FlowAlreadyExistsSnafu {
282                    flow_name: format!("{}.{}", flow_info.catalog_name, flow_info.flow_name),
283                }
284                .fail();
285            }
286
287            let remote_flow =
288                on_create_flow_failure(&mut set)?.with_context(|| error::UnexpectedSnafu {
289                    err_msg: format!(
290                        "Reads the empty flow in comparing operation of creating flow, flow_id: {flow_id}"
291                    ),
292                })?;
293            let op_name = "creating flow";
294            ensure_values!(*remote_flow, flow_info, op_name);
295        }
296
297        Ok(())
298    }
299
300    /// Update metadata for flow and returns an error if old metadata IS NOT exists.
301    pub async fn update_flow_metadata(
302        &self,
303        flow_id: FlowId,
304        current_flow_info: &DeserializedValueWithBytes<FlowInfoValue>,
305        new_flow_info: &FlowInfoValue,
306        flow_routes: Vec<(FlowPartitionId, FlowRouteValue)>,
307    ) -> Result<()> {
308        let (update_flow_flow_name_txn, on_create_flow_flow_name_failure) =
309            self.flow_name_manager.build_update_txn(
310                &new_flow_info.catalog_name,
311                &new_flow_info.flow_name,
312                flow_id,
313            )?;
314
315        let (update_flow_txn, on_create_flow_failure) =
316            self.flow_info_manager
317                .build_update_txn(flow_id, current_flow_info, new_flow_info)?;
318
319        let update_flow_routes_txn = self.flow_route_manager.build_update_txn(
320            flow_id,
321            current_flow_info,
322            flow_routes.clone(),
323        )?;
324
325        let update_flownode_flow_txn = self.flownode_flow_manager.build_update_txn(
326            flow_id,
327            current_flow_info,
328            new_flow_info.flownode_ids().clone(),
329        );
330
331        let update_table_flow_txn = self.table_flow_manager.build_update_txn(
332            flow_id,
333            current_flow_info,
334            flow_routes
335                .into_iter()
336                .map(|(partition_id, route)| (partition_id, TableFlowValue { peer: route.peer }))
337                .collect(),
338            new_flow_info.source_table_ids(),
339        )?;
340
341        let txn = Txn::merge_all(vec![
342            update_flow_flow_name_txn,
343            update_flow_txn,
344            update_flow_routes_txn,
345            update_flownode_flow_txn,
346            update_table_flow_txn,
347        ]);
348        info!(
349            "Creating flow {}.{}({}), with {} txn operations",
350            new_flow_info.catalog_name,
351            new_flow_info.flow_name,
352            flow_id,
353            txn.max_operations()
354        );
355
356        let mut resp = self.kv_backend.txn(txn).await?;
357        if !resp.succeeded {
358            let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
359            let remote_flow_flow_name =
360                on_create_flow_flow_name_failure(&mut set)?.with_context(|| {
361                    error::UnexpectedSnafu {
362                        err_msg: format!(
363                        "Reads the empty flow name in comparing operation of the updating flow, flow_id: {flow_id}"
364                    ),
365                    }
366                })?;
367
368            if remote_flow_flow_name.flow_id() != flow_id {
369                info!(
370                    "Trying to updating flow {}.{}({}), but flow({}) already exists with a different flow id",
371                    new_flow_info.catalog_name,
372                    new_flow_info.flow_name,
373                    flow_id,
374                    remote_flow_flow_name.flow_id()
375                );
376
377                return error::UnexpectedSnafu {
378                    err_msg: format!(
379                        "Reads different flow id when updating flow({2}.{3}), prev flow id = {0}, updating with flow id = {1}",
380                        remote_flow_flow_name.flow_id(),
381                        flow_id,
382                        new_flow_info.catalog_name,
383                        new_flow_info.flow_name,
384                    ),
385                }.fail();
386            }
387
388            let remote_flow =
389                on_create_flow_failure(&mut set)?.with_context(|| error::UnexpectedSnafu {
390                    err_msg: format!(
391                        "Reads the empty flow in comparing operation of the updating flow, flow_id: {flow_id}"
392                    ),
393                })?;
394            let op_name = "updating flow";
395            ensure_values!(*remote_flow, new_flow_info.clone(), op_name);
396        }
397
398        Ok(())
399    }
400
401    fn flow_metadata_keys(&self, flow_id: FlowId, flow_value: &FlowInfoValue) -> Vec<Vec<u8>> {
402        let source_table_ids = flow_value.source_table_ids();
403        let mut keys =
404            Vec::with_capacity(2 + flow_value.flownode_ids.len() * (source_table_ids.len() + 2));
405        // Builds flow name key
406        let flow_name = FlowNameKey::new(&flow_value.catalog_name, &flow_value.flow_name);
407        keys.push(flow_name.to_bytes());
408
409        // Builds flow value key
410        let flow_info_key = FlowInfoKey::new(flow_id);
411        keys.push(flow_info_key.to_bytes());
412
413        // Builds flownode flow keys & table flow keys
414        flow_value
415            .flownode_ids
416            .iter()
417            .for_each(|(&partition_id, &flownode_id)| {
418                keys.push(FlownodeFlowKey::new(flownode_id, flow_id, partition_id).to_bytes());
419                keys.push(FlowRouteKey::new(flow_id, partition_id).to_bytes());
420                source_table_ids.iter().for_each(|&table_id| {
421                    keys.push(
422                        TableFlowKey::new(table_id, flownode_id, flow_id, partition_id).to_bytes(),
423                    );
424                })
425            });
426        keys
427    }
428
429    /// Deletes metadata for table **permanently**.
430    pub async fn destroy_flow_metadata(
431        &self,
432        flow_id: FlowId,
433        flow_value: &FlowInfoValue,
434    ) -> Result<()> {
435        let keys = self.flow_metadata_keys(flow_id, flow_value);
436        let _ = self
437            .kv_backend
438            .batch_delete(BatchDeleteRequest::new().with_keys(keys))
439            .await?;
440        Ok(())
441    }
442}
443
444impl std::fmt::Debug for FlowMetadataManager {
445    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
446        f.debug_struct("FlowMetadataManager").finish()
447    }
448}
449
450#[cfg(test)]
451mod tests {
452    use std::assert_matches;
453    use std::collections::BTreeMap;
454    use std::sync::Arc;
455
456    use futures::TryStreamExt;
457    use table::metadata::TableId;
458    use table::table_name::TableName;
459
460    use super::*;
461    use crate::FlownodeId;
462    use crate::key::flow::table_flow::TableFlowKey;
463    use crate::key::node_address::{NodeAddressKey, NodeAddressValue};
464    use crate::key::{FlowPartitionId, MetadataValue};
465    use crate::kv_backend::KvBackend;
466    use crate::kv_backend::memory::MemoryKvBackend;
467    use crate::peer::Peer;
468    use crate::rpc::store::PutRequest;
469
470    #[derive(Debug)]
471    struct MockKey {
472        inner: Vec<u8>,
473    }
474
475    impl<'a> MetadataKey<'a, MockKey> for MockKey {
476        fn to_bytes(&self) -> Vec<u8> {
477            self.inner.clone()
478        }
479
480        fn from_bytes(bytes: &'a [u8]) -> Result<MockKey> {
481            Ok(MockKey {
482                inner: bytes.to_vec(),
483            })
484        }
485    }
486
487    #[test]
488    fn test_flow_scoped_to_bytes() {
489        let key = FlowScoped::new(MockKey {
490            inner: b"hi".to_vec(),
491        });
492        assert_eq!(b"__flow/hi".to_vec(), key.to_bytes());
493    }
494
495    #[test]
496    fn test_flow_scoped_from_bytes() {
497        let bytes = b"__flow/hi";
498        let key = FlowScoped::<MockKey>::from_bytes(bytes).unwrap();
499        assert_eq!(key.inner.inner, b"hi".to_vec());
500    }
501
502    #[test]
503    fn test_flow_scoped_from_bytes_mismatch() {
504        let bytes = b"__table/hi";
505        let err = FlowScoped::<MockKey>::from_bytes(bytes).unwrap_err();
506        assert_matches!(err, error::Error::MismatchPrefix { .. });
507    }
508
509    fn test_flow_info_value(
510        flow_name: &str,
511        flownode_ids: BTreeMap<FlowPartitionId, FlownodeId>,
512        source_table_ids: Vec<TableId>,
513    ) -> FlowInfoValue {
514        let catalog_name = "greptime";
515        let sink_table_name = TableName {
516            catalog_name: catalog_name.to_string(),
517            schema_name: "my_schema".to_string(),
518            table_name: "sink_table".to_string(),
519        };
520        FlowInfoValue {
521            catalog_name: catalog_name.to_string(),
522            query_context: None,
523            flow_name: flow_name.to_string(),
524            source_table_ids,
525            sink_table_name,
526            flownode_ids,
527            raw_sql: "raw".to_string(),
528            expire_after: Some(300),
529            eval_interval_secs: None,
530            comment: "hi".to_string(),
531            options: Default::default(),
532            created_time: chrono::Utc::now(),
533            updated_time: chrono::Utc::now(),
534        }
535    }
536
537    #[tokio::test]
538    async fn test_create_flow_metadata() {
539        let mem_kv = Arc::new(MemoryKvBackend::default());
540        let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
541        let flow_id = 10;
542        let flow_value = test_flow_info_value(
543            "flow",
544            [(0, 1u64), (1, 2u64)].into(),
545            vec![1024, 1025, 1026],
546        );
547        let flow_routes = vec![
548            (
549                1u32,
550                FlowRouteValue {
551                    peer: Peer::empty(1),
552                },
553            ),
554            (
555                2,
556                FlowRouteValue {
557                    peer: Peer::empty(2),
558                },
559            ),
560        ];
561        flow_metadata_manager
562            .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
563            .await
564            .unwrap();
565        // Creates again.
566        flow_metadata_manager
567            .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
568            .await
569            .unwrap();
570        let got = flow_metadata_manager
571            .flow_info_manager()
572            .get(flow_id)
573            .await
574            .unwrap()
575            .unwrap();
576        let routes = flow_metadata_manager
577            .flow_route_manager()
578            .routes(flow_id)
579            .await
580            .unwrap();
581        assert_eq!(
582            routes,
583            vec![
584                (
585                    FlowRouteKey::new(flow_id, 1),
586                    FlowRouteValue {
587                        peer: Peer::empty(1),
588                    },
589                ),
590                (
591                    FlowRouteKey::new(flow_id, 2),
592                    FlowRouteValue {
593                        peer: Peer::empty(2),
594                    },
595                ),
596            ]
597        );
598        assert_eq!(got, flow_value);
599        let flows = flow_metadata_manager
600            .flownode_flow_manager()
601            .flows(1)
602            .try_collect::<Vec<_>>()
603            .await
604            .unwrap();
605        assert_eq!(flows, vec![(flow_id, 0)]);
606        for table_id in [1024, 1025, 1026] {
607            let nodes = flow_metadata_manager
608                .table_flow_manager()
609                .flows(table_id)
610                .await
611                .unwrap();
612            assert_eq!(
613                nodes,
614                vec![
615                    (
616                        TableFlowKey::new(table_id, 1, flow_id, 1),
617                        TableFlowValue {
618                            peer: Peer::empty(1)
619                        }
620                    ),
621                    (
622                        TableFlowKey::new(table_id, 2, flow_id, 2),
623                        TableFlowValue {
624                            peer: Peer::empty(2)
625                        }
626                    )
627                ]
628            );
629        }
630    }
631
632    #[tokio::test]
633    async fn test_flownode_addrs_remaps_to_latest_address() {
634        let mem_kv = Arc::new(MemoryKvBackend::default());
635        let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
636        let flow_id = 10;
637        let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024]);
638        let flow_routes = vec![(
639            0u32,
640            FlowRouteValue {
641                peer: Peer::new(1, "old-addr"),
642            },
643        )];
644
645        flow_metadata_manager
646            .create_flow_metadata(flow_id, flow_value, flow_routes)
647            .await
648            .unwrap();
649
650        mem_kv
651            .put(PutRequest {
652                key: NodeAddressKey::with_flownode(1).to_bytes(),
653                value: NodeAddressValue::new(Peer::new(1, "new-addr"))
654                    .try_as_raw_value()
655                    .unwrap(),
656                ..Default::default()
657            })
658            .await
659            .unwrap();
660
661        let addrs = flow_metadata_manager.flownode_addrs(flow_id).await.unwrap();
662        assert_eq!(addrs, BTreeMap::from([(0, "new-addr".to_string())]));
663    }
664
665    #[tokio::test]
666    async fn test_flownode_addrs_falls_back_to_route_address() {
667        let mem_kv = Arc::new(MemoryKvBackend::default());
668        let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
669        let flow_id = 10;
670        let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024]);
671        let flow_routes = vec![(
672            0u32,
673            FlowRouteValue {
674                peer: Peer::new(1, "route-addr"),
675            },
676        )];
677
678        flow_metadata_manager
679            .create_flow_metadata(flow_id, flow_value, flow_routes)
680            .await
681            .unwrap();
682
683        let addrs = flow_metadata_manager.flownode_addrs(flow_id).await.unwrap();
684        assert_eq!(addrs, BTreeMap::from([(0, "route-addr".to_string())]));
685    }
686
687    #[tokio::test]
688    async fn test_flownode_addrs_skips_empty_addresses() {
689        let mem_kv = Arc::new(MemoryKvBackend::default());
690        let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
691        let flow_id = 10;
692        let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024]);
693        let flow_routes = vec![(
694            0u32,
695            FlowRouteValue {
696                peer: Peer::empty(1),
697            },
698        )];
699
700        flow_metadata_manager
701            .create_flow_metadata(flow_id, flow_value, flow_routes)
702            .await
703            .unwrap();
704
705        let addrs = flow_metadata_manager.flownode_addrs(flow_id).await.unwrap();
706        assert!(addrs.is_empty());
707    }
708
709    #[tokio::test]
710    async fn test_create_flow_metadata_flow_exists_err() {
711        let mem_kv = Arc::new(MemoryKvBackend::default());
712        let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
713        let flow_id = 10;
714        let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
715        let flow_routes = vec![
716            (
717                1u32,
718                FlowRouteValue {
719                    peer: Peer::empty(1),
720                },
721            ),
722            (
723                2,
724                FlowRouteValue {
725                    peer: Peer::empty(2),
726                },
727            ),
728        ];
729        flow_metadata_manager
730            .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
731            .await
732            .unwrap();
733        // Creates again
734        let err = flow_metadata_manager
735            .create_flow_metadata(flow_id + 1, flow_value, flow_routes.clone())
736            .await
737            .unwrap_err();
738        assert_matches!(err, error::Error::FlowAlreadyExists { .. });
739    }
740
741    #[tokio::test]
742    async fn test_create_flow_metadata_unexpected_err() {
743        let mem_kv = Arc::new(MemoryKvBackend::default());
744        let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
745        let flow_id = 10;
746        let catalog_name = "greptime";
747        let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
748        let flow_routes = vec![
749            (
750                1u32,
751                FlowRouteValue {
752                    peer: Peer::empty(1),
753                },
754            ),
755            (
756                2,
757                FlowRouteValue {
758                    peer: Peer::empty(2),
759                },
760            ),
761        ];
762        flow_metadata_manager
763            .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
764            .await
765            .unwrap();
766        // Creates again.
767        let another_sink_table_name = TableName {
768            catalog_name: catalog_name.to_string(),
769            schema_name: "my_schema".to_string(),
770            table_name: "another_sink_table".to_string(),
771        };
772        let flow_value = FlowInfoValue {
773            catalog_name: "greptime".to_string(),
774            query_context: None,
775            flow_name: "flow".to_string(),
776            source_table_ids: vec![1024, 1025, 1026],
777            sink_table_name: another_sink_table_name,
778            flownode_ids: [(0, 1u64)].into(),
779            raw_sql: "raw".to_string(),
780            expire_after: Some(300),
781            eval_interval_secs: None,
782            comment: "hi".to_string(),
783            options: Default::default(),
784            created_time: chrono::Utc::now(),
785            updated_time: chrono::Utc::now(),
786        };
787        let err = flow_metadata_manager
788            .create_flow_metadata(flow_id, flow_value, flow_routes.clone())
789            .await
790            .unwrap_err();
791        assert!(err.to_string().contains("Reads the different value"));
792    }
793
794    #[tokio::test]
795    async fn test_destroy_flow_metadata() {
796        let mem_kv = Arc::new(MemoryKvBackend::default());
797        let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
798        let flow_id = 10;
799        let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
800        let flow_routes = vec![(
801            0u32,
802            FlowRouteValue {
803                peer: Peer::empty(1),
804            },
805        )];
806        flow_metadata_manager
807            .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
808            .await
809            .unwrap();
810
811        flow_metadata_manager
812            .destroy_flow_metadata(flow_id, &flow_value)
813            .await
814            .unwrap();
815        // Destroys again
816        flow_metadata_manager
817            .destroy_flow_metadata(flow_id, &flow_value)
818            .await
819            .unwrap();
820        // Ensures all keys are deleted
821        assert!(mem_kv.is_empty())
822    }
823
824    #[tokio::test]
825    async fn test_update_flow_metadata() {
826        let mem_kv = Arc::new(MemoryKvBackend::default());
827        let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
828        let flow_id = 10;
829        let flow_value = test_flow_info_value(
830            "flow",
831            [(0, 1u64), (1, 2u64)].into(),
832            vec![1024, 1025, 1026],
833        );
834        let flow_routes = vec![
835            (
836                1u32,
837                FlowRouteValue {
838                    peer: Peer::empty(1),
839                },
840            ),
841            (
842                2,
843                FlowRouteValue {
844                    peer: Peer::empty(2),
845                },
846            ),
847        ];
848        flow_metadata_manager
849            .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
850            .await
851            .unwrap();
852
853        let new_flow_value = {
854            let mut tmp = flow_value.clone();
855            tmp.raw_sql = "new".to_string();
856            tmp
857        };
858
859        // Update flow instead
860        flow_metadata_manager
861            .update_flow_metadata(
862                flow_id,
863                &DeserializedValueWithBytes::from_inner(flow_value.clone()),
864                &new_flow_value,
865                flow_routes.clone(),
866            )
867            .await
868            .unwrap();
869
870        let got = flow_metadata_manager
871            .flow_info_manager()
872            .get(flow_id)
873            .await
874            .unwrap()
875            .unwrap();
876        let routes = flow_metadata_manager
877            .flow_route_manager()
878            .routes(flow_id)
879            .await
880            .unwrap();
881        assert_eq!(
882            routes,
883            vec![
884                (
885                    FlowRouteKey::new(flow_id, 1),
886                    FlowRouteValue {
887                        peer: Peer::empty(1),
888                    },
889                ),
890                (
891                    FlowRouteKey::new(flow_id, 2),
892                    FlowRouteValue {
893                        peer: Peer::empty(2),
894                    },
895                ),
896            ]
897        );
898        assert_eq!(got, new_flow_value);
899        let flows = flow_metadata_manager
900            .flownode_flow_manager()
901            .flows(1)
902            .try_collect::<Vec<_>>()
903            .await
904            .unwrap();
905        assert_eq!(flows, vec![(flow_id, 0)]);
906        for table_id in [1024, 1025, 1026] {
907            let nodes = flow_metadata_manager
908                .table_flow_manager()
909                .flows(table_id)
910                .await
911                .unwrap();
912            assert_eq!(
913                nodes,
914                vec![
915                    (
916                        TableFlowKey::new(table_id, 1, flow_id, 1),
917                        TableFlowValue {
918                            peer: Peer::empty(1)
919                        }
920                    ),
921                    (
922                        TableFlowKey::new(table_id, 2, flow_id, 2),
923                        TableFlowValue {
924                            peer: Peer::empty(2)
925                        }
926                    )
927                ]
928            );
929        }
930    }
931
932    #[tokio::test]
933    async fn test_update_flow_metadata_diff_flownode() {
934        let mem_kv = Arc::new(MemoryKvBackend::default());
935        let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
936        let flow_id = 10;
937        let flow_value = test_flow_info_value(
938            "flow",
939            [(0u32, 1u64), (1u32, 2u64)].into(),
940            vec![1024, 1025, 1026],
941        );
942        let flow_routes = vec![
943            (
944                0u32,
945                FlowRouteValue {
946                    peer: Peer::empty(1),
947                },
948            ),
949            (
950                1,
951                FlowRouteValue {
952                    peer: Peer::empty(2),
953                },
954            ),
955        ];
956        flow_metadata_manager
957            .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
958            .await
959            .unwrap();
960
961        let new_flow_value = {
962            let mut tmp = flow_value.clone();
963            tmp.raw_sql = "new".to_string();
964            // move to different flownodes
965            tmp.flownode_ids = [(0, 3u64), (1, 4u64)].into();
966            tmp
967        };
968        let new_flow_routes = vec![
969            (
970                0u32,
971                FlowRouteValue {
972                    peer: Peer::empty(3),
973                },
974            ),
975            (
976                1,
977                FlowRouteValue {
978                    peer: Peer::empty(4),
979                },
980            ),
981        ];
982
983        // Update flow instead
984        flow_metadata_manager
985            .update_flow_metadata(
986                flow_id,
987                &DeserializedValueWithBytes::from_inner(flow_value.clone()),
988                &new_flow_value,
989                new_flow_routes.clone(),
990            )
991            .await
992            .unwrap();
993
994        let got = flow_metadata_manager
995            .flow_info_manager()
996            .get(flow_id)
997            .await
998            .unwrap()
999            .unwrap();
1000        let routes = flow_metadata_manager
1001            .flow_route_manager()
1002            .routes(flow_id)
1003            .await
1004            .unwrap();
1005        assert_eq!(
1006            routes,
1007            vec![
1008                (
1009                    FlowRouteKey::new(flow_id, 0),
1010                    FlowRouteValue {
1011                        peer: Peer::empty(3),
1012                    },
1013                ),
1014                (
1015                    FlowRouteKey::new(flow_id, 1),
1016                    FlowRouteValue {
1017                        peer: Peer::empty(4),
1018                    },
1019                ),
1020            ]
1021        );
1022        assert_eq!(got, new_flow_value);
1023
1024        let flows = flow_metadata_manager
1025            .flownode_flow_manager()
1026            .flows(1)
1027            .try_collect::<Vec<_>>()
1028            .await
1029            .unwrap();
1030        // should moved to different flownode
1031        assert_eq!(flows, vec![]);
1032
1033        let flows = flow_metadata_manager
1034            .flownode_flow_manager()
1035            .flows(3)
1036            .try_collect::<Vec<_>>()
1037            .await
1038            .unwrap();
1039        assert_eq!(flows, vec![(flow_id, 0)]);
1040
1041        for table_id in [1024, 1025, 1026] {
1042            let nodes = flow_metadata_manager
1043                .table_flow_manager()
1044                .flows(table_id)
1045                .await
1046                .unwrap();
1047            assert_eq!(
1048                nodes,
1049                vec![
1050                    (
1051                        TableFlowKey::new(table_id, 3, flow_id, 0),
1052                        TableFlowValue {
1053                            peer: Peer::empty(3)
1054                        }
1055                    ),
1056                    (
1057                        TableFlowKey::new(table_id, 4, flow_id, 1),
1058                        TableFlowValue {
1059                            peer: Peer::empty(4)
1060                        }
1061                    )
1062                ]
1063            );
1064        }
1065    }
1066
1067    #[tokio::test]
1068    async fn test_update_flow_metadata_flow_replace_diff_id_err() {
1069        let mem_kv = Arc::new(MemoryKvBackend::default());
1070        let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
1071        let flow_id = 10;
1072        let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
1073        let flow_routes = vec![
1074            (
1075                1u32,
1076                FlowRouteValue {
1077                    peer: Peer::empty(1),
1078                },
1079            ),
1080            (
1081                2,
1082                FlowRouteValue {
1083                    peer: Peer::empty(2),
1084                },
1085            ),
1086        ];
1087        flow_metadata_manager
1088            .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
1089            .await
1090            .unwrap();
1091        // update again with same flow id
1092        flow_metadata_manager
1093            .update_flow_metadata(
1094                flow_id,
1095                &DeserializedValueWithBytes::from_inner(flow_value.clone()),
1096                &flow_value,
1097                flow_routes.clone(),
1098            )
1099            .await
1100            .unwrap();
1101        // update again with wrong flow id, expected error
1102        let err = flow_metadata_manager
1103            .update_flow_metadata(
1104                flow_id + 1,
1105                &DeserializedValueWithBytes::from_inner(flow_value.clone()),
1106                &flow_value,
1107                flow_routes,
1108            )
1109            .await
1110            .unwrap_err();
1111        assert_matches!(err, error::Error::Unexpected { .. });
1112        assert!(
1113            err.to_string()
1114                .contains("Reads different flow id when updating flow")
1115        );
1116    }
1117
1118    #[tokio::test]
1119    async fn test_update_flow_metadata_unexpected_err_prev_value_diff() {
1120        let mem_kv = Arc::new(MemoryKvBackend::default());
1121        let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
1122        let flow_id = 10;
1123        let catalog_name = "greptime";
1124        let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
1125        let flow_routes = vec![
1126            (
1127                1u32,
1128                FlowRouteValue {
1129                    peer: Peer::empty(1),
1130                },
1131            ),
1132            (
1133                2,
1134                FlowRouteValue {
1135                    peer: Peer::empty(2),
1136                },
1137            ),
1138        ];
1139        flow_metadata_manager
1140            .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
1141            .await
1142            .unwrap();
1143        // Creates again.
1144        let another_sink_table_name = TableName {
1145            catalog_name: catalog_name.to_string(),
1146            schema_name: "my_schema".to_string(),
1147            table_name: "another_sink_table".to_string(),
1148        };
1149        let flow_value = FlowInfoValue {
1150            catalog_name: "greptime".to_string(),
1151            query_context: None,
1152            flow_name: "flow".to_string(),
1153            source_table_ids: vec![1024, 1025, 1026],
1154            sink_table_name: another_sink_table_name,
1155            flownode_ids: [(0, 1u64)].into(),
1156            raw_sql: "raw".to_string(),
1157            expire_after: Some(300),
1158            eval_interval_secs: None,
1159            comment: "hi".to_string(),
1160            options: Default::default(),
1161            created_time: chrono::Utc::now(),
1162            updated_time: chrono::Utc::now(),
1163        };
1164        let err = flow_metadata_manager
1165            .update_flow_metadata(
1166                flow_id,
1167                &DeserializedValueWithBytes::from_inner(flow_value.clone()),
1168                &flow_value,
1169                flow_routes.clone(),
1170            )
1171            .await
1172            .unwrap_err();
1173        assert!(
1174            err.to_string().contains("Reads the different value"),
1175            "error: {:?}",
1176            err
1177        );
1178    }
1179}