common_meta/key/
flow.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod flow_info;
16pub(crate) mod flow_name;
17pub mod flow_route;
18pub mod flow_state;
19mod flownode_addr_helper;
20pub(crate) mod flownode_flow;
21pub(crate) mod table_flow;
22use std::ops::Deref;
23use std::sync::Arc;
24
25use common_telemetry::info;
26use flow_route::{FlowRouteKey, FlowRouteManager, FlowRouteValue};
27use snafu::{ensure, OptionExt};
28use table_flow::TableFlowValue;
29
30use self::flow_info::{FlowInfoKey, FlowInfoValue};
31use self::flow_name::FlowNameKey;
32use self::flownode_flow::FlownodeFlowKey;
33use self::table_flow::TableFlowKey;
34use crate::ensure_values;
35use crate::error::{self, Result};
36use crate::key::flow::flow_info::FlowInfoManager;
37use crate::key::flow::flow_name::FlowNameManager;
38use crate::key::flow::flow_state::FlowStateManager;
39use crate::key::flow::flownode_flow::FlownodeFlowManager;
40pub use crate::key::flow::table_flow::{TableFlowManager, TableFlowManagerRef};
41use crate::key::txn_helper::TxnOpGetResponseSet;
42use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey};
43use crate::kv_backend::txn::Txn;
44use crate::kv_backend::KvBackendRef;
45use crate::rpc::store::BatchDeleteRequest;
46
47/// The key of `__flow/` scope.
48#[derive(Debug, Clone, PartialEq)]
49pub struct FlowScoped<T> {
50    inner: T,
51}
52
53impl<T> Deref for FlowScoped<T> {
54    type Target = T;
55
56    fn deref(&self) -> &Self::Target {
57        &self.inner
58    }
59}
60
61impl<T> FlowScoped<T> {
62    const PREFIX: &'static str = "__flow/";
63
64    /// Returns a new [FlowScoped] key.
65    pub fn new(inner: T) -> FlowScoped<T> {
66        Self { inner }
67    }
68}
69
70impl<'a, T: MetadataKey<'a, T>> MetadataKey<'a, FlowScoped<T>> for FlowScoped<T> {
71    fn to_bytes(&self) -> Vec<u8> {
72        let prefix = FlowScoped::<T>::PREFIX.as_bytes();
73        let inner = self.inner.to_bytes();
74        let mut bytes = Vec::with_capacity(prefix.len() + inner.len());
75        bytes.extend(prefix);
76        bytes.extend(inner);
77        bytes
78    }
79
80    fn from_bytes(bytes: &'a [u8]) -> Result<FlowScoped<T>> {
81        let prefix = FlowScoped::<T>::PREFIX.as_bytes();
82        ensure!(
83            bytes.starts_with(prefix),
84            error::MismatchPrefixSnafu {
85                prefix: String::from_utf8_lossy(prefix),
86                key: String::from_utf8_lossy(bytes),
87            }
88        );
89        let inner = T::from_bytes(&bytes[prefix.len()..])?;
90        Ok(FlowScoped { inner })
91    }
92}
93
94pub type FlowMetadataManagerRef = Arc<FlowMetadataManager>;
95
96/// The manager of metadata, provides ability to:
97/// - Create metadata of the flow.
98/// - Retrieve metadata of the flow.
99/// - Delete metadata of the flow.
100pub struct FlowMetadataManager {
101    flow_info_manager: FlowInfoManager,
102    flow_route_manager: FlowRouteManager,
103    flownode_flow_manager: FlownodeFlowManager,
104    table_flow_manager: TableFlowManager,
105    flow_name_manager: FlowNameManager,
106    /// only metasrv have access to itself's memory backend, so for other case it should be None
107    flow_state_manager: Option<FlowStateManager>,
108    kv_backend: KvBackendRef,
109}
110
111impl FlowMetadataManager {
112    /// Returns a new [`FlowMetadataManager`].
113    pub fn new(kv_backend: KvBackendRef) -> Self {
114        Self {
115            flow_info_manager: FlowInfoManager::new(kv_backend.clone()),
116            flow_route_manager: FlowRouteManager::new(kv_backend.clone()),
117            flow_name_manager: FlowNameManager::new(kv_backend.clone()),
118            flownode_flow_manager: FlownodeFlowManager::new(kv_backend.clone()),
119            table_flow_manager: TableFlowManager::new(kv_backend.clone()),
120            flow_state_manager: None,
121            kv_backend,
122        }
123    }
124
125    /// Returns the [`FlowNameManager`].
126    pub fn flow_name_manager(&self) -> &FlowNameManager {
127        &self.flow_name_manager
128    }
129
130    pub fn flow_state_manager(&self) -> Option<&FlowStateManager> {
131        self.flow_state_manager.as_ref()
132    }
133
134    /// Returns the [`FlowInfoManager`].
135    pub fn flow_info_manager(&self) -> &FlowInfoManager {
136        &self.flow_info_manager
137    }
138
139    /// Returns the [`FlowRouteManager`].
140    pub fn flow_route_manager(&self) -> &FlowRouteManager {
141        &self.flow_route_manager
142    }
143
144    /// Returns the [`FlownodeFlowManager`].
145    pub fn flownode_flow_manager(&self) -> &FlownodeFlowManager {
146        &self.flownode_flow_manager
147    }
148
149    /// Returns the [`TableFlowManager`].
150    pub fn table_flow_manager(&self) -> &TableFlowManager {
151        &self.table_flow_manager
152    }
153
154    /// Creates metadata for flow and returns an error if different metadata exists.
155    pub async fn create_flow_metadata(
156        &self,
157        flow_id: FlowId,
158        flow_info: FlowInfoValue,
159        flow_routes: Vec<(FlowPartitionId, FlowRouteValue)>,
160    ) -> Result<()> {
161        let (create_flow_flow_name_txn, on_create_flow_flow_name_failure) = self
162            .flow_name_manager
163            .build_create_txn(&flow_info.catalog_name, &flow_info.flow_name, flow_id)?;
164
165        let (create_flow_txn, on_create_flow_failure) = self
166            .flow_info_manager
167            .build_create_txn(flow_id, &flow_info)?;
168
169        let create_flow_routes_txn = self
170            .flow_route_manager
171            .build_create_txn(flow_id, flow_routes.clone())?;
172
173        let create_flownode_flow_txn = self
174            .flownode_flow_manager
175            .build_create_txn(flow_id, flow_info.flownode_ids().clone());
176
177        let create_table_flow_txn = self.table_flow_manager.build_create_txn(
178            flow_id,
179            flow_routes
180                .into_iter()
181                .map(|(partition_id, route)| (partition_id, TableFlowValue { peer: route.peer }))
182                .collect(),
183            flow_info.source_table_ids(),
184        )?;
185
186        let txn = Txn::merge_all(vec![
187            create_flow_flow_name_txn,
188            create_flow_txn,
189            create_flow_routes_txn,
190            create_flownode_flow_txn,
191            create_table_flow_txn,
192        ]);
193        info!(
194            "Creating flow {}.{}({}), with {} txn operations",
195            flow_info.catalog_name,
196            flow_info.flow_name,
197            flow_id,
198            txn.max_operations()
199        );
200
201        let mut resp = self.kv_backend.txn(txn).await?;
202        if !resp.succeeded {
203            let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
204            let remote_flow_flow_name =
205                on_create_flow_flow_name_failure(&mut set)?.with_context(|| {
206                    error::UnexpectedSnafu {
207                        err_msg: format!(
208                    "Reads the empty flow name in comparing operation of the creating flow, flow_id: {flow_id}"
209                ),
210                    }
211                })?;
212
213            if remote_flow_flow_name.flow_id() != flow_id {
214                info!(
215                    "Trying to create flow {}.{}({}), but flow({}) already exists",
216                    flow_info.catalog_name,
217                    flow_info.flow_name,
218                    flow_id,
219                    remote_flow_flow_name.flow_id()
220                );
221
222                return error::FlowAlreadyExistsSnafu {
223                    flow_name: format!("{}.{}", flow_info.catalog_name, flow_info.flow_name),
224                }
225                .fail();
226            }
227
228            let remote_flow =
229                on_create_flow_failure(&mut set)?.with_context(|| error::UnexpectedSnafu {
230                    err_msg: format!(
231                        "Reads the empty flow in comparing operation of creating flow, flow_id: {flow_id}"
232                    ),
233                })?;
234            let op_name = "creating flow";
235            ensure_values!(*remote_flow, flow_info, op_name);
236        }
237
238        Ok(())
239    }
240
241    /// Update metadata for flow and returns an error if old metadata IS NOT exists.
242    pub async fn update_flow_metadata(
243        &self,
244        flow_id: FlowId,
245        current_flow_info: &DeserializedValueWithBytes<FlowInfoValue>,
246        new_flow_info: &FlowInfoValue,
247        flow_routes: Vec<(FlowPartitionId, FlowRouteValue)>,
248    ) -> Result<()> {
249        let (update_flow_flow_name_txn, on_create_flow_flow_name_failure) =
250            self.flow_name_manager.build_update_txn(
251                &new_flow_info.catalog_name,
252                &new_flow_info.flow_name,
253                flow_id,
254            )?;
255
256        let (update_flow_txn, on_create_flow_failure) =
257            self.flow_info_manager
258                .build_update_txn(flow_id, current_flow_info, new_flow_info)?;
259
260        let update_flow_routes_txn = self.flow_route_manager.build_update_txn(
261            flow_id,
262            current_flow_info,
263            flow_routes.clone(),
264        )?;
265
266        let update_flownode_flow_txn = self.flownode_flow_manager.build_update_txn(
267            flow_id,
268            current_flow_info,
269            new_flow_info.flownode_ids().clone(),
270        );
271
272        let update_table_flow_txn = self.table_flow_manager.build_update_txn(
273            flow_id,
274            current_flow_info,
275            flow_routes
276                .into_iter()
277                .map(|(partition_id, route)| (partition_id, TableFlowValue { peer: route.peer }))
278                .collect(),
279            new_flow_info.source_table_ids(),
280        )?;
281
282        let txn = Txn::merge_all(vec![
283            update_flow_flow_name_txn,
284            update_flow_txn,
285            update_flow_routes_txn,
286            update_flownode_flow_txn,
287            update_table_flow_txn,
288        ]);
289        info!(
290            "Creating flow {}.{}({}), with {} txn operations",
291            new_flow_info.catalog_name,
292            new_flow_info.flow_name,
293            flow_id,
294            txn.max_operations()
295        );
296
297        let mut resp = self.kv_backend.txn(txn).await?;
298        if !resp.succeeded {
299            let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
300            let remote_flow_flow_name =
301                on_create_flow_flow_name_failure(&mut set)?.with_context(|| {
302                    error::UnexpectedSnafu {
303                        err_msg: format!(
304                        "Reads the empty flow name in comparing operation of the updating flow, flow_id: {flow_id}"
305                    ),
306                    }
307                })?;
308
309            if remote_flow_flow_name.flow_id() != flow_id {
310                info!(
311                    "Trying to updating flow {}.{}({}), but flow({}) already exists with a different flow id",
312                    new_flow_info.catalog_name,
313                    new_flow_info.flow_name,
314                    flow_id,
315                    remote_flow_flow_name.flow_id()
316                );
317
318                return error::UnexpectedSnafu {
319                    err_msg: format!(
320                        "Reads different flow id when updating flow({2}.{3}), prev flow id = {0}, updating with flow id = {1}",
321                        remote_flow_flow_name.flow_id(),
322                        flow_id,
323                        new_flow_info.catalog_name,
324                        new_flow_info.flow_name,
325                    ),
326                }.fail();
327            }
328
329            let remote_flow =
330                on_create_flow_failure(&mut set)?.with_context(|| error::UnexpectedSnafu {
331                    err_msg: format!(
332                        "Reads the empty flow in comparing operation of the updating flow, flow_id: {flow_id}"
333                    ),
334                })?;
335            let op_name = "updating flow";
336            ensure_values!(*remote_flow, new_flow_info.clone(), op_name);
337        }
338
339        Ok(())
340    }
341
342    fn flow_metadata_keys(&self, flow_id: FlowId, flow_value: &FlowInfoValue) -> Vec<Vec<u8>> {
343        let source_table_ids = flow_value.source_table_ids();
344        let mut keys =
345            Vec::with_capacity(2 + flow_value.flownode_ids.len() * (source_table_ids.len() + 2));
346        // Builds flow name key
347        let flow_name = FlowNameKey::new(&flow_value.catalog_name, &flow_value.flow_name);
348        keys.push(flow_name.to_bytes());
349
350        // Builds flow value key
351        let flow_info_key = FlowInfoKey::new(flow_id);
352        keys.push(flow_info_key.to_bytes());
353
354        // Builds flownode flow keys & table flow keys
355        flow_value
356            .flownode_ids
357            .iter()
358            .for_each(|(&partition_id, &flownode_id)| {
359                keys.push(FlownodeFlowKey::new(flownode_id, flow_id, partition_id).to_bytes());
360                keys.push(FlowRouteKey::new(flow_id, partition_id).to_bytes());
361                source_table_ids.iter().for_each(|&table_id| {
362                    keys.push(
363                        TableFlowKey::new(table_id, flownode_id, flow_id, partition_id).to_bytes(),
364                    );
365                })
366            });
367        keys
368    }
369
370    /// Deletes metadata for table **permanently**.
371    pub async fn destroy_flow_metadata(
372        &self,
373        flow_id: FlowId,
374        flow_value: &FlowInfoValue,
375    ) -> Result<()> {
376        let keys = self.flow_metadata_keys(flow_id, flow_value);
377        let _ = self
378            .kv_backend
379            .batch_delete(BatchDeleteRequest::new().with_keys(keys))
380            .await?;
381        Ok(())
382    }
383}
384
385impl std::fmt::Debug for FlowMetadataManager {
386    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
387        f.debug_struct("FlowMetadataManager").finish()
388    }
389}
390
391#[cfg(test)]
392mod tests {
393    use std::assert_matches::assert_matches;
394    use std::collections::BTreeMap;
395    use std::sync::Arc;
396
397    use futures::TryStreamExt;
398    use table::metadata::TableId;
399    use table::table_name::TableName;
400
401    use super::*;
402    use crate::key::flow::table_flow::TableFlowKey;
403    use crate::key::FlowPartitionId;
404    use crate::kv_backend::memory::MemoryKvBackend;
405    use crate::peer::Peer;
406    use crate::FlownodeId;
407
408    #[derive(Debug)]
409    struct MockKey {
410        inner: Vec<u8>,
411    }
412
413    impl<'a> MetadataKey<'a, MockKey> for MockKey {
414        fn to_bytes(&self) -> Vec<u8> {
415            self.inner.clone()
416        }
417
418        fn from_bytes(bytes: &'a [u8]) -> Result<MockKey> {
419            Ok(MockKey {
420                inner: bytes.to_vec(),
421            })
422        }
423    }
424
425    #[test]
426    fn test_flow_scoped_to_bytes() {
427        let key = FlowScoped::new(MockKey {
428            inner: b"hi".to_vec(),
429        });
430        assert_eq!(b"__flow/hi".to_vec(), key.to_bytes());
431    }
432
433    #[test]
434    fn test_flow_scoped_from_bytes() {
435        let bytes = b"__flow/hi";
436        let key = FlowScoped::<MockKey>::from_bytes(bytes).unwrap();
437        assert_eq!(key.inner.inner, b"hi".to_vec());
438    }
439
440    #[test]
441    fn test_flow_scoped_from_bytes_mismatch() {
442        let bytes = b"__table/hi";
443        let err = FlowScoped::<MockKey>::from_bytes(bytes).unwrap_err();
444        assert_matches!(err, error::Error::MismatchPrefix { .. });
445    }
446
447    fn test_flow_info_value(
448        flow_name: &str,
449        flownode_ids: BTreeMap<FlowPartitionId, FlownodeId>,
450        source_table_ids: Vec<TableId>,
451    ) -> FlowInfoValue {
452        let catalog_name = "greptime";
453        let sink_table_name = TableName {
454            catalog_name: catalog_name.to_string(),
455            schema_name: "my_schema".to_string(),
456            table_name: "sink_table".to_string(),
457        };
458        FlowInfoValue {
459            catalog_name: catalog_name.to_string(),
460            query_context: None,
461            flow_name: flow_name.to_string(),
462            source_table_ids,
463            sink_table_name,
464            flownode_ids,
465            raw_sql: "raw".to_string(),
466            expire_after: Some(300),
467            comment: "hi".to_string(),
468            options: Default::default(),
469            created_time: chrono::Utc::now(),
470            updated_time: chrono::Utc::now(),
471        }
472    }
473
474    #[tokio::test]
475    async fn test_create_flow_metadata() {
476        let mem_kv = Arc::new(MemoryKvBackend::default());
477        let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
478        let flow_id = 10;
479        let flow_value = test_flow_info_value(
480            "flow",
481            [(0, 1u64), (1, 2u64)].into(),
482            vec![1024, 1025, 1026],
483        );
484        let flow_routes = vec![
485            (
486                1u32,
487                FlowRouteValue {
488                    peer: Peer::empty(1),
489                },
490            ),
491            (
492                2,
493                FlowRouteValue {
494                    peer: Peer::empty(2),
495                },
496            ),
497        ];
498        flow_metadata_manager
499            .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
500            .await
501            .unwrap();
502        // Creates again.
503        flow_metadata_manager
504            .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
505            .await
506            .unwrap();
507        let got = flow_metadata_manager
508            .flow_info_manager()
509            .get(flow_id)
510            .await
511            .unwrap()
512            .unwrap();
513        let routes = flow_metadata_manager
514            .flow_route_manager()
515            .routes(flow_id)
516            .await
517            .unwrap();
518        assert_eq!(
519            routes,
520            vec![
521                (
522                    FlowRouteKey::new(flow_id, 1),
523                    FlowRouteValue {
524                        peer: Peer::empty(1),
525                    },
526                ),
527                (
528                    FlowRouteKey::new(flow_id, 2),
529                    FlowRouteValue {
530                        peer: Peer::empty(2),
531                    },
532                ),
533            ]
534        );
535        assert_eq!(got, flow_value);
536        let flows = flow_metadata_manager
537            .flownode_flow_manager()
538            .flows(1)
539            .try_collect::<Vec<_>>()
540            .await
541            .unwrap();
542        assert_eq!(flows, vec![(flow_id, 0)]);
543        for table_id in [1024, 1025, 1026] {
544            let nodes = flow_metadata_manager
545                .table_flow_manager()
546                .flows(table_id)
547                .await
548                .unwrap();
549            assert_eq!(
550                nodes,
551                vec![
552                    (
553                        TableFlowKey::new(table_id, 1, flow_id, 1),
554                        TableFlowValue {
555                            peer: Peer::empty(1)
556                        }
557                    ),
558                    (
559                        TableFlowKey::new(table_id, 2, flow_id, 2),
560                        TableFlowValue {
561                            peer: Peer::empty(2)
562                        }
563                    )
564                ]
565            );
566        }
567    }
568
569    #[tokio::test]
570    async fn test_create_flow_metadata_flow_exists_err() {
571        let mem_kv = Arc::new(MemoryKvBackend::default());
572        let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
573        let flow_id = 10;
574        let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
575        let flow_routes = vec![
576            (
577                1u32,
578                FlowRouteValue {
579                    peer: Peer::empty(1),
580                },
581            ),
582            (
583                2,
584                FlowRouteValue {
585                    peer: Peer::empty(2),
586                },
587            ),
588        ];
589        flow_metadata_manager
590            .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
591            .await
592            .unwrap();
593        // Creates again
594        let err = flow_metadata_manager
595            .create_flow_metadata(flow_id + 1, flow_value, flow_routes.clone())
596            .await
597            .unwrap_err();
598        assert_matches!(err, error::Error::FlowAlreadyExists { .. });
599    }
600
601    #[tokio::test]
602    async fn test_create_flow_metadata_unexpected_err() {
603        let mem_kv = Arc::new(MemoryKvBackend::default());
604        let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
605        let flow_id = 10;
606        let catalog_name = "greptime";
607        let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
608        let flow_routes = vec![
609            (
610                1u32,
611                FlowRouteValue {
612                    peer: Peer::empty(1),
613                },
614            ),
615            (
616                2,
617                FlowRouteValue {
618                    peer: Peer::empty(2),
619                },
620            ),
621        ];
622        flow_metadata_manager
623            .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
624            .await
625            .unwrap();
626        // Creates again.
627        let another_sink_table_name = TableName {
628            catalog_name: catalog_name.to_string(),
629            schema_name: "my_schema".to_string(),
630            table_name: "another_sink_table".to_string(),
631        };
632        let flow_value = FlowInfoValue {
633            catalog_name: "greptime".to_string(),
634            query_context: None,
635            flow_name: "flow".to_string(),
636            source_table_ids: vec![1024, 1025, 1026],
637            sink_table_name: another_sink_table_name,
638            flownode_ids: [(0, 1u64)].into(),
639            raw_sql: "raw".to_string(),
640            expire_after: Some(300),
641            comment: "hi".to_string(),
642            options: Default::default(),
643            created_time: chrono::Utc::now(),
644            updated_time: chrono::Utc::now(),
645        };
646        let err = flow_metadata_manager
647            .create_flow_metadata(flow_id, flow_value, flow_routes.clone())
648            .await
649            .unwrap_err();
650        assert!(err.to_string().contains("Reads the different value"));
651    }
652
653    #[tokio::test]
654    async fn test_destroy_flow_metadata() {
655        let mem_kv = Arc::new(MemoryKvBackend::default());
656        let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
657        let flow_id = 10;
658        let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
659        let flow_routes = vec![(
660            0u32,
661            FlowRouteValue {
662                peer: Peer::empty(1),
663            },
664        )];
665        flow_metadata_manager
666            .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
667            .await
668            .unwrap();
669
670        flow_metadata_manager
671            .destroy_flow_metadata(flow_id, &flow_value)
672            .await
673            .unwrap();
674        // Destroys again
675        flow_metadata_manager
676            .destroy_flow_metadata(flow_id, &flow_value)
677            .await
678            .unwrap();
679        // Ensures all keys are deleted
680        assert!(mem_kv.is_empty())
681    }
682
683    #[tokio::test]
684    async fn test_update_flow_metadata() {
685        let mem_kv = Arc::new(MemoryKvBackend::default());
686        let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
687        let flow_id = 10;
688        let flow_value = test_flow_info_value(
689            "flow",
690            [(0, 1u64), (1, 2u64)].into(),
691            vec![1024, 1025, 1026],
692        );
693        let flow_routes = vec![
694            (
695                1u32,
696                FlowRouteValue {
697                    peer: Peer::empty(1),
698                },
699            ),
700            (
701                2,
702                FlowRouteValue {
703                    peer: Peer::empty(2),
704                },
705            ),
706        ];
707        flow_metadata_manager
708            .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
709            .await
710            .unwrap();
711
712        let new_flow_value = {
713            let mut tmp = flow_value.clone();
714            tmp.raw_sql = "new".to_string();
715            tmp
716        };
717
718        // Update flow instead
719        flow_metadata_manager
720            .update_flow_metadata(
721                flow_id,
722                &DeserializedValueWithBytes::from_inner(flow_value.clone()),
723                &new_flow_value,
724                flow_routes.clone(),
725            )
726            .await
727            .unwrap();
728
729        let got = flow_metadata_manager
730            .flow_info_manager()
731            .get(flow_id)
732            .await
733            .unwrap()
734            .unwrap();
735        let routes = flow_metadata_manager
736            .flow_route_manager()
737            .routes(flow_id)
738            .await
739            .unwrap();
740        assert_eq!(
741            routes,
742            vec![
743                (
744                    FlowRouteKey::new(flow_id, 1),
745                    FlowRouteValue {
746                        peer: Peer::empty(1),
747                    },
748                ),
749                (
750                    FlowRouteKey::new(flow_id, 2),
751                    FlowRouteValue {
752                        peer: Peer::empty(2),
753                    },
754                ),
755            ]
756        );
757        assert_eq!(got, new_flow_value);
758        let flows = flow_metadata_manager
759            .flownode_flow_manager()
760            .flows(1)
761            .try_collect::<Vec<_>>()
762            .await
763            .unwrap();
764        assert_eq!(flows, vec![(flow_id, 0)]);
765        for table_id in [1024, 1025, 1026] {
766            let nodes = flow_metadata_manager
767                .table_flow_manager()
768                .flows(table_id)
769                .await
770                .unwrap();
771            assert_eq!(
772                nodes,
773                vec![
774                    (
775                        TableFlowKey::new(table_id, 1, flow_id, 1),
776                        TableFlowValue {
777                            peer: Peer::empty(1)
778                        }
779                    ),
780                    (
781                        TableFlowKey::new(table_id, 2, flow_id, 2),
782                        TableFlowValue {
783                            peer: Peer::empty(2)
784                        }
785                    )
786                ]
787            );
788        }
789    }
790
791    #[tokio::test]
792    async fn test_update_flow_metadata_diff_flownode() {
793        let mem_kv = Arc::new(MemoryKvBackend::default());
794        let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
795        let flow_id = 10;
796        let flow_value = test_flow_info_value(
797            "flow",
798            [(0u32, 1u64), (1u32, 2u64)].into(),
799            vec![1024, 1025, 1026],
800        );
801        let flow_routes = vec![
802            (
803                0u32,
804                FlowRouteValue {
805                    peer: Peer::empty(1),
806                },
807            ),
808            (
809                1,
810                FlowRouteValue {
811                    peer: Peer::empty(2),
812                },
813            ),
814        ];
815        flow_metadata_manager
816            .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
817            .await
818            .unwrap();
819
820        let new_flow_value = {
821            let mut tmp = flow_value.clone();
822            tmp.raw_sql = "new".to_string();
823            // move to different flownodes
824            tmp.flownode_ids = [(0, 3u64), (1, 4u64)].into();
825            tmp
826        };
827        let new_flow_routes = vec![
828            (
829                0u32,
830                FlowRouteValue {
831                    peer: Peer::empty(3),
832                },
833            ),
834            (
835                1,
836                FlowRouteValue {
837                    peer: Peer::empty(4),
838                },
839            ),
840        ];
841
842        // Update flow instead
843        flow_metadata_manager
844            .update_flow_metadata(
845                flow_id,
846                &DeserializedValueWithBytes::from_inner(flow_value.clone()),
847                &new_flow_value,
848                new_flow_routes.clone(),
849            )
850            .await
851            .unwrap();
852
853        let got = flow_metadata_manager
854            .flow_info_manager()
855            .get(flow_id)
856            .await
857            .unwrap()
858            .unwrap();
859        let routes = flow_metadata_manager
860            .flow_route_manager()
861            .routes(flow_id)
862            .await
863            .unwrap();
864        assert_eq!(
865            routes,
866            vec![
867                (
868                    FlowRouteKey::new(flow_id, 0),
869                    FlowRouteValue {
870                        peer: Peer::empty(3),
871                    },
872                ),
873                (
874                    FlowRouteKey::new(flow_id, 1),
875                    FlowRouteValue {
876                        peer: Peer::empty(4),
877                    },
878                ),
879            ]
880        );
881        assert_eq!(got, new_flow_value);
882
883        let flows = flow_metadata_manager
884            .flownode_flow_manager()
885            .flows(1)
886            .try_collect::<Vec<_>>()
887            .await
888            .unwrap();
889        // should moved to different flownode
890        assert_eq!(flows, vec![]);
891
892        let flows = flow_metadata_manager
893            .flownode_flow_manager()
894            .flows(3)
895            .try_collect::<Vec<_>>()
896            .await
897            .unwrap();
898        assert_eq!(flows, vec![(flow_id, 0)]);
899
900        for table_id in [1024, 1025, 1026] {
901            let nodes = flow_metadata_manager
902                .table_flow_manager()
903                .flows(table_id)
904                .await
905                .unwrap();
906            assert_eq!(
907                nodes,
908                vec![
909                    (
910                        TableFlowKey::new(table_id, 3, flow_id, 0),
911                        TableFlowValue {
912                            peer: Peer::empty(3)
913                        }
914                    ),
915                    (
916                        TableFlowKey::new(table_id, 4, flow_id, 1),
917                        TableFlowValue {
918                            peer: Peer::empty(4)
919                        }
920                    )
921                ]
922            );
923        }
924    }
925
926    #[tokio::test]
927    async fn test_update_flow_metadata_flow_replace_diff_id_err() {
928        let mem_kv = Arc::new(MemoryKvBackend::default());
929        let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
930        let flow_id = 10;
931        let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
932        let flow_routes = vec![
933            (
934                1u32,
935                FlowRouteValue {
936                    peer: Peer::empty(1),
937                },
938            ),
939            (
940                2,
941                FlowRouteValue {
942                    peer: Peer::empty(2),
943                },
944            ),
945        ];
946        flow_metadata_manager
947            .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
948            .await
949            .unwrap();
950        // update again with same flow id
951        flow_metadata_manager
952            .update_flow_metadata(
953                flow_id,
954                &DeserializedValueWithBytes::from_inner(flow_value.clone()),
955                &flow_value,
956                flow_routes.clone(),
957            )
958            .await
959            .unwrap();
960        // update again with wrong flow id, expected error
961        let err = flow_metadata_manager
962            .update_flow_metadata(
963                flow_id + 1,
964                &DeserializedValueWithBytes::from_inner(flow_value.clone()),
965                &flow_value,
966                flow_routes,
967            )
968            .await
969            .unwrap_err();
970        assert_matches!(err, error::Error::Unexpected { .. });
971        assert!(err
972            .to_string()
973            .contains("Reads different flow id when updating flow"));
974    }
975
976    #[tokio::test]
977    async fn test_update_flow_metadata_unexpected_err_prev_value_diff() {
978        let mem_kv = Arc::new(MemoryKvBackend::default());
979        let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
980        let flow_id = 10;
981        let catalog_name = "greptime";
982        let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
983        let flow_routes = vec![
984            (
985                1u32,
986                FlowRouteValue {
987                    peer: Peer::empty(1),
988                },
989            ),
990            (
991                2,
992                FlowRouteValue {
993                    peer: Peer::empty(2),
994                },
995            ),
996        ];
997        flow_metadata_manager
998            .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
999            .await
1000            .unwrap();
1001        // Creates again.
1002        let another_sink_table_name = TableName {
1003            catalog_name: catalog_name.to_string(),
1004            schema_name: "my_schema".to_string(),
1005            table_name: "another_sink_table".to_string(),
1006        };
1007        let flow_value = FlowInfoValue {
1008            catalog_name: "greptime".to_string(),
1009            query_context: None,
1010            flow_name: "flow".to_string(),
1011            source_table_ids: vec![1024, 1025, 1026],
1012            sink_table_name: another_sink_table_name,
1013            flownode_ids: [(0, 1u64)].into(),
1014            raw_sql: "raw".to_string(),
1015            expire_after: Some(300),
1016            comment: "hi".to_string(),
1017            options: Default::default(),
1018            created_time: chrono::Utc::now(),
1019            updated_time: chrono::Utc::now(),
1020        };
1021        let err = flow_metadata_manager
1022            .update_flow_metadata(
1023                flow_id,
1024                &DeserializedValueWithBytes::from_inner(flow_value.clone()),
1025                &flow_value,
1026                flow_routes.clone(),
1027            )
1028            .await
1029            .unwrap_err();
1030        assert!(
1031            err.to_string().contains("Reads the different value"),
1032            "error: {:?}",
1033            err
1034        );
1035    }
1036}