1pub mod flow_info;
16pub(crate) mod flow_name;
17pub mod flow_route;
18pub mod flow_state;
19mod flownode_addr_helper;
20pub(crate) mod flownode_flow;
21pub(crate) mod table_flow;
22use std::collections::BTreeMap;
23use std::ops::Deref;
24use std::sync::Arc;
25
26use common_telemetry::info;
27use flow_route::{FlowRouteKey, FlowRouteManager, FlowRouteValue};
28use snafu::{OptionExt, ensure};
29use table_flow::TableFlowValue;
30
31use self::flow_info::{FlowInfoKey, FlowInfoValue};
32use self::flow_name::FlowNameKey;
33use self::flownode_flow::FlownodeFlowKey;
34use self::table_flow::TableFlowKey;
35use crate::ensure_values;
36use crate::error::{self, Result};
37use crate::key::flow::flow_info::FlowInfoManager;
38use crate::key::flow::flow_name::FlowNameManager;
39use crate::key::flow::flow_state::FlowStateManager;
40use crate::key::flow::flownode_flow::FlownodeFlowManager;
41pub use crate::key::flow::table_flow::{TableFlowManager, TableFlowManagerRef};
42use crate::key::txn_helper::TxnOpGetResponseSet;
43use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey};
44use crate::kv_backend::KvBackendRef;
45use crate::kv_backend::txn::Txn;
46use crate::rpc::store::BatchDeleteRequest;
47
48pub const FLOW_KEY_PREFIX: &str = "__flow";
49
50pub fn scoped_flow_key(inner_key: &str) -> String {
52 format!("{FLOW_KEY_PREFIX}/{inner_key}")
53}
54
55pub fn scoped_flow_key_prefix(inner_prefix: &str) -> String {
57 scoped_flow_key(inner_prefix)
58}
59
60pub fn flow_info_key_prefix() -> String {
62 scoped_flow_key_prefix(flow_info::FLOW_INFO_KEY_PREFIX)
63}
64
65pub fn flow_name_key_prefix() -> String {
67 scoped_flow_key_prefix(flow_name::FLOW_NAME_KEY_PREFIX)
68}
69
70pub fn flow_route_key_prefix() -> String {
72 scoped_flow_key_prefix(flow_route::FLOW_ROUTE_KEY_PREFIX)
73}
74
75pub fn table_flow_key_prefix() -> String {
77 scoped_flow_key_prefix(table_flow::TABLE_FLOW_KEY_PREFIX)
78}
79
80pub fn flownode_flow_key_prefix() -> String {
82 scoped_flow_key_prefix(flownode_flow::FLOWNODE_FLOW_KEY_PREFIX)
83}
84
85pub fn flow_state_full_key() -> String {
87 scoped_flow_key(flow_state::FLOW_STATE_KEY)
88}
89
90#[derive(Debug, Clone, PartialEq)]
92pub struct FlowScoped<T> {
93 inner: T,
94}
95
96impl<T> Deref for FlowScoped<T> {
97 type Target = T;
98
99 fn deref(&self) -> &Self::Target {
100 &self.inner
101 }
102}
103
104impl<T> FlowScoped<T> {
105 const PREFIX: &'static str = "__flow/";
106
107 pub fn new(inner: T) -> FlowScoped<T> {
109 Self { inner }
110 }
111}
112
113impl<'a, T: MetadataKey<'a, T>> MetadataKey<'a, FlowScoped<T>> for FlowScoped<T> {
114 fn to_bytes(&self) -> Vec<u8> {
115 let prefix = FlowScoped::<T>::PREFIX.as_bytes();
116 let inner = self.inner.to_bytes();
117 let mut bytes = Vec::with_capacity(prefix.len() + inner.len());
118 bytes.extend(prefix);
119 bytes.extend(inner);
120 bytes
121 }
122
123 fn from_bytes(bytes: &'a [u8]) -> Result<FlowScoped<T>> {
124 let prefix = FlowScoped::<T>::PREFIX.as_bytes();
125 ensure!(
126 bytes.starts_with(prefix),
127 error::MismatchPrefixSnafu {
128 prefix: String::from_utf8_lossy(prefix),
129 key: String::from_utf8_lossy(bytes),
130 }
131 );
132 let inner = T::from_bytes(&bytes[prefix.len()..])?;
133 Ok(FlowScoped { inner })
134 }
135}
136
137pub type FlowMetadataManagerRef = Arc<FlowMetadataManager>;
138
139pub struct FlowMetadataManager {
144 flow_info_manager: FlowInfoManager,
145 flow_route_manager: FlowRouteManager,
146 flownode_flow_manager: FlownodeFlowManager,
147 table_flow_manager: TableFlowManager,
148 flow_name_manager: FlowNameManager,
149 flow_state_manager: Option<FlowStateManager>,
151 kv_backend: KvBackendRef,
152}
153
154impl FlowMetadataManager {
155 pub fn new(kv_backend: KvBackendRef) -> Self {
157 Self {
158 flow_info_manager: FlowInfoManager::new(kv_backend.clone()),
159 flow_route_manager: FlowRouteManager::new(kv_backend.clone()),
160 flow_name_manager: FlowNameManager::new(kv_backend.clone()),
161 flownode_flow_manager: FlownodeFlowManager::new(kv_backend.clone()),
162 table_flow_manager: TableFlowManager::new(kv_backend.clone()),
163 flow_state_manager: None,
164 kv_backend,
165 }
166 }
167
168 pub fn flow_name_manager(&self) -> &FlowNameManager {
170 &self.flow_name_manager
171 }
172
173 pub fn flow_state_manager(&self) -> Option<&FlowStateManager> {
174 self.flow_state_manager.as_ref()
175 }
176
177 pub fn flow_info_manager(&self) -> &FlowInfoManager {
179 &self.flow_info_manager
180 }
181
182 pub fn flow_route_manager(&self) -> &FlowRouteManager {
184 &self.flow_route_manager
185 }
186
187 pub fn flownode_flow_manager(&self) -> &FlownodeFlowManager {
189 &self.flownode_flow_manager
190 }
191
192 pub fn table_flow_manager(&self) -> &TableFlowManager {
194 &self.table_flow_manager
195 }
196
197 pub async fn flownode_addrs(
199 &self,
200 flow_id: FlowId,
201 ) -> Result<BTreeMap<FlowPartitionId, String>> {
202 let routes = self.flow_route_manager.routes(flow_id).await?;
203
204 Ok(routes
205 .into_iter()
206 .filter_map(|(key, route)| {
207 let addr = route.peer.addr;
208 (!addr.is_empty()).then_some((key.partition_id(), addr))
209 })
210 .collect())
211 }
212
213 pub async fn create_flow_metadata(
215 &self,
216 flow_id: FlowId,
217 flow_info: FlowInfoValue,
218 flow_routes: Vec<(FlowPartitionId, FlowRouteValue)>,
219 ) -> Result<()> {
220 let (create_flow_flow_name_txn, on_create_flow_flow_name_failure) = self
221 .flow_name_manager
222 .build_create_txn(&flow_info.catalog_name, &flow_info.flow_name, flow_id)?;
223
224 let (create_flow_txn, on_create_flow_failure) = self
225 .flow_info_manager
226 .build_create_txn(flow_id, &flow_info)?;
227
228 let create_flow_routes_txn = self
229 .flow_route_manager
230 .build_create_txn(flow_id, flow_routes.clone())?;
231
232 let create_flownode_flow_txn = self
233 .flownode_flow_manager
234 .build_create_txn(flow_id, flow_info.flownode_ids().clone());
235
236 let create_table_flow_txn = self.table_flow_manager.build_create_txn(
237 flow_id,
238 flow_routes
239 .into_iter()
240 .map(|(partition_id, route)| (partition_id, TableFlowValue { peer: route.peer }))
241 .collect(),
242 flow_info.source_table_ids(),
243 )?;
244
245 let txn = Txn::merge_all(vec![
246 create_flow_flow_name_txn,
247 create_flow_txn,
248 create_flow_routes_txn,
249 create_flownode_flow_txn,
250 create_table_flow_txn,
251 ]);
252 info!(
253 "Creating flow {}.{}({}), with {} txn operations",
254 flow_info.catalog_name,
255 flow_info.flow_name,
256 flow_id,
257 txn.max_operations()
258 );
259
260 let mut resp = self.kv_backend.txn(txn).await?;
261 if !resp.succeeded {
262 let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
263 let remote_flow_flow_name =
264 on_create_flow_flow_name_failure(&mut set)?.with_context(|| {
265 error::UnexpectedSnafu {
266 err_msg: format!(
267 "Reads the empty flow name in comparing operation of the creating flow, flow_id: {flow_id}"
268 ),
269 }
270 })?;
271
272 if remote_flow_flow_name.flow_id() != flow_id {
273 info!(
274 "Trying to create flow {}.{}({}), but flow({}) already exists",
275 flow_info.catalog_name,
276 flow_info.flow_name,
277 flow_id,
278 remote_flow_flow_name.flow_id()
279 );
280
281 return error::FlowAlreadyExistsSnafu {
282 flow_name: format!("{}.{}", flow_info.catalog_name, flow_info.flow_name),
283 }
284 .fail();
285 }
286
287 let remote_flow =
288 on_create_flow_failure(&mut set)?.with_context(|| error::UnexpectedSnafu {
289 err_msg: format!(
290 "Reads the empty flow in comparing operation of creating flow, flow_id: {flow_id}"
291 ),
292 })?;
293 let op_name = "creating flow";
294 ensure_values!(*remote_flow, flow_info, op_name);
295 }
296
297 Ok(())
298 }
299
300 pub async fn update_flow_metadata(
302 &self,
303 flow_id: FlowId,
304 current_flow_info: &DeserializedValueWithBytes<FlowInfoValue>,
305 new_flow_info: &FlowInfoValue,
306 flow_routes: Vec<(FlowPartitionId, FlowRouteValue)>,
307 ) -> Result<()> {
308 let (update_flow_flow_name_txn, on_create_flow_flow_name_failure) =
309 self.flow_name_manager.build_update_txn(
310 &new_flow_info.catalog_name,
311 &new_flow_info.flow_name,
312 flow_id,
313 )?;
314
315 let (update_flow_txn, on_create_flow_failure) =
316 self.flow_info_manager
317 .build_update_txn(flow_id, current_flow_info, new_flow_info)?;
318
319 let update_flow_routes_txn = self.flow_route_manager.build_update_txn(
320 flow_id,
321 current_flow_info,
322 flow_routes.clone(),
323 )?;
324
325 let update_flownode_flow_txn = self.flownode_flow_manager.build_update_txn(
326 flow_id,
327 current_flow_info,
328 new_flow_info.flownode_ids().clone(),
329 );
330
331 let update_table_flow_txn = self.table_flow_manager.build_update_txn(
332 flow_id,
333 current_flow_info,
334 flow_routes
335 .into_iter()
336 .map(|(partition_id, route)| (partition_id, TableFlowValue { peer: route.peer }))
337 .collect(),
338 new_flow_info.source_table_ids(),
339 )?;
340
341 let txn = Txn::merge_all(vec![
342 update_flow_flow_name_txn,
343 update_flow_txn,
344 update_flow_routes_txn,
345 update_flownode_flow_txn,
346 update_table_flow_txn,
347 ]);
348 info!(
349 "Creating flow {}.{}({}), with {} txn operations",
350 new_flow_info.catalog_name,
351 new_flow_info.flow_name,
352 flow_id,
353 txn.max_operations()
354 );
355
356 let mut resp = self.kv_backend.txn(txn).await?;
357 if !resp.succeeded {
358 let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
359 let remote_flow_flow_name =
360 on_create_flow_flow_name_failure(&mut set)?.with_context(|| {
361 error::UnexpectedSnafu {
362 err_msg: format!(
363 "Reads the empty flow name in comparing operation of the updating flow, flow_id: {flow_id}"
364 ),
365 }
366 })?;
367
368 if remote_flow_flow_name.flow_id() != flow_id {
369 info!(
370 "Trying to updating flow {}.{}({}), but flow({}) already exists with a different flow id",
371 new_flow_info.catalog_name,
372 new_flow_info.flow_name,
373 flow_id,
374 remote_flow_flow_name.flow_id()
375 );
376
377 return error::UnexpectedSnafu {
378 err_msg: format!(
379 "Reads different flow id when updating flow({2}.{3}), prev flow id = {0}, updating with flow id = {1}",
380 remote_flow_flow_name.flow_id(),
381 flow_id,
382 new_flow_info.catalog_name,
383 new_flow_info.flow_name,
384 ),
385 }.fail();
386 }
387
388 let remote_flow =
389 on_create_flow_failure(&mut set)?.with_context(|| error::UnexpectedSnafu {
390 err_msg: format!(
391 "Reads the empty flow in comparing operation of the updating flow, flow_id: {flow_id}"
392 ),
393 })?;
394 let op_name = "updating flow";
395 ensure_values!(*remote_flow, new_flow_info.clone(), op_name);
396 }
397
398 Ok(())
399 }
400
401 fn flow_metadata_keys(&self, flow_id: FlowId, flow_value: &FlowInfoValue) -> Vec<Vec<u8>> {
402 let source_table_ids = flow_value.source_table_ids();
403 let mut keys =
404 Vec::with_capacity(2 + flow_value.flownode_ids.len() * (source_table_ids.len() + 2));
405 let flow_name = FlowNameKey::new(&flow_value.catalog_name, &flow_value.flow_name);
407 keys.push(flow_name.to_bytes());
408
409 let flow_info_key = FlowInfoKey::new(flow_id);
411 keys.push(flow_info_key.to_bytes());
412
413 flow_value
415 .flownode_ids
416 .iter()
417 .for_each(|(&partition_id, &flownode_id)| {
418 keys.push(FlownodeFlowKey::new(flownode_id, flow_id, partition_id).to_bytes());
419 keys.push(FlowRouteKey::new(flow_id, partition_id).to_bytes());
420 source_table_ids.iter().for_each(|&table_id| {
421 keys.push(
422 TableFlowKey::new(table_id, flownode_id, flow_id, partition_id).to_bytes(),
423 );
424 })
425 });
426 keys
427 }
428
429 pub async fn destroy_flow_metadata(
431 &self,
432 flow_id: FlowId,
433 flow_value: &FlowInfoValue,
434 ) -> Result<()> {
435 let keys = self.flow_metadata_keys(flow_id, flow_value);
436 let _ = self
437 .kv_backend
438 .batch_delete(BatchDeleteRequest::new().with_keys(keys))
439 .await?;
440 Ok(())
441 }
442}
443
444impl std::fmt::Debug for FlowMetadataManager {
445 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
446 f.debug_struct("FlowMetadataManager").finish()
447 }
448}
449
450#[cfg(test)]
451mod tests {
452 use std::assert_matches;
453 use std::collections::BTreeMap;
454 use std::sync::Arc;
455
456 use futures::TryStreamExt;
457 use table::metadata::TableId;
458 use table::table_name::TableName;
459
460 use super::*;
461 use crate::FlownodeId;
462 use crate::key::flow::table_flow::TableFlowKey;
463 use crate::key::node_address::{NodeAddressKey, NodeAddressValue};
464 use crate::key::{FlowPartitionId, MetadataValue};
465 use crate::kv_backend::KvBackend;
466 use crate::kv_backend::memory::MemoryKvBackend;
467 use crate::peer::Peer;
468 use crate::rpc::store::PutRequest;
469
470 #[derive(Debug)]
471 struct MockKey {
472 inner: Vec<u8>,
473 }
474
475 impl<'a> MetadataKey<'a, MockKey> for MockKey {
476 fn to_bytes(&self) -> Vec<u8> {
477 self.inner.clone()
478 }
479
480 fn from_bytes(bytes: &'a [u8]) -> Result<MockKey> {
481 Ok(MockKey {
482 inner: bytes.to_vec(),
483 })
484 }
485 }
486
487 #[test]
488 fn test_flow_scoped_to_bytes() {
489 let key = FlowScoped::new(MockKey {
490 inner: b"hi".to_vec(),
491 });
492 assert_eq!(b"__flow/hi".to_vec(), key.to_bytes());
493 }
494
495 #[test]
496 fn test_flow_scoped_from_bytes() {
497 let bytes = b"__flow/hi";
498 let key = FlowScoped::<MockKey>::from_bytes(bytes).unwrap();
499 assert_eq!(key.inner.inner, b"hi".to_vec());
500 }
501
502 #[test]
503 fn test_flow_scoped_from_bytes_mismatch() {
504 let bytes = b"__table/hi";
505 let err = FlowScoped::<MockKey>::from_bytes(bytes).unwrap_err();
506 assert_matches!(err, error::Error::MismatchPrefix { .. });
507 }
508
509 fn test_flow_info_value(
510 flow_name: &str,
511 flownode_ids: BTreeMap<FlowPartitionId, FlownodeId>,
512 source_table_ids: Vec<TableId>,
513 ) -> FlowInfoValue {
514 let catalog_name = "greptime";
515 let sink_table_name = TableName {
516 catalog_name: catalog_name.to_string(),
517 schema_name: "my_schema".to_string(),
518 table_name: "sink_table".to_string(),
519 };
520 FlowInfoValue {
521 catalog_name: catalog_name.to_string(),
522 query_context: None,
523 flow_name: flow_name.to_string(),
524 source_table_ids,
525 sink_table_name,
526 flownode_ids,
527 raw_sql: "raw".to_string(),
528 expire_after: Some(300),
529 eval_interval_secs: None,
530 comment: "hi".to_string(),
531 options: Default::default(),
532 created_time: chrono::Utc::now(),
533 updated_time: chrono::Utc::now(),
534 }
535 }
536
537 #[tokio::test]
538 async fn test_create_flow_metadata() {
539 let mem_kv = Arc::new(MemoryKvBackend::default());
540 let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
541 let flow_id = 10;
542 let flow_value = test_flow_info_value(
543 "flow",
544 [(0, 1u64), (1, 2u64)].into(),
545 vec![1024, 1025, 1026],
546 );
547 let flow_routes = vec![
548 (
549 1u32,
550 FlowRouteValue {
551 peer: Peer::empty(1),
552 },
553 ),
554 (
555 2,
556 FlowRouteValue {
557 peer: Peer::empty(2),
558 },
559 ),
560 ];
561 flow_metadata_manager
562 .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
563 .await
564 .unwrap();
565 flow_metadata_manager
567 .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
568 .await
569 .unwrap();
570 let got = flow_metadata_manager
571 .flow_info_manager()
572 .get(flow_id)
573 .await
574 .unwrap()
575 .unwrap();
576 let routes = flow_metadata_manager
577 .flow_route_manager()
578 .routes(flow_id)
579 .await
580 .unwrap();
581 assert_eq!(
582 routes,
583 vec![
584 (
585 FlowRouteKey::new(flow_id, 1),
586 FlowRouteValue {
587 peer: Peer::empty(1),
588 },
589 ),
590 (
591 FlowRouteKey::new(flow_id, 2),
592 FlowRouteValue {
593 peer: Peer::empty(2),
594 },
595 ),
596 ]
597 );
598 assert_eq!(got, flow_value);
599 let flows = flow_metadata_manager
600 .flownode_flow_manager()
601 .flows(1)
602 .try_collect::<Vec<_>>()
603 .await
604 .unwrap();
605 assert_eq!(flows, vec![(flow_id, 0)]);
606 for table_id in [1024, 1025, 1026] {
607 let nodes = flow_metadata_manager
608 .table_flow_manager()
609 .flows(table_id)
610 .await
611 .unwrap();
612 assert_eq!(
613 nodes,
614 vec![
615 (
616 TableFlowKey::new(table_id, 1, flow_id, 1),
617 TableFlowValue {
618 peer: Peer::empty(1)
619 }
620 ),
621 (
622 TableFlowKey::new(table_id, 2, flow_id, 2),
623 TableFlowValue {
624 peer: Peer::empty(2)
625 }
626 )
627 ]
628 );
629 }
630 }
631
632 #[tokio::test]
633 async fn test_flownode_addrs_remaps_to_latest_address() {
634 let mem_kv = Arc::new(MemoryKvBackend::default());
635 let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
636 let flow_id = 10;
637 let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024]);
638 let flow_routes = vec![(
639 0u32,
640 FlowRouteValue {
641 peer: Peer::new(1, "old-addr"),
642 },
643 )];
644
645 flow_metadata_manager
646 .create_flow_metadata(flow_id, flow_value, flow_routes)
647 .await
648 .unwrap();
649
650 mem_kv
651 .put(PutRequest {
652 key: NodeAddressKey::with_flownode(1).to_bytes(),
653 value: NodeAddressValue::new(Peer::new(1, "new-addr"))
654 .try_as_raw_value()
655 .unwrap(),
656 ..Default::default()
657 })
658 .await
659 .unwrap();
660
661 let addrs = flow_metadata_manager.flownode_addrs(flow_id).await.unwrap();
662 assert_eq!(addrs, BTreeMap::from([(0, "new-addr".to_string())]));
663 }
664
665 #[tokio::test]
666 async fn test_flownode_addrs_falls_back_to_route_address() {
667 let mem_kv = Arc::new(MemoryKvBackend::default());
668 let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
669 let flow_id = 10;
670 let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024]);
671 let flow_routes = vec![(
672 0u32,
673 FlowRouteValue {
674 peer: Peer::new(1, "route-addr"),
675 },
676 )];
677
678 flow_metadata_manager
679 .create_flow_metadata(flow_id, flow_value, flow_routes)
680 .await
681 .unwrap();
682
683 let addrs = flow_metadata_manager.flownode_addrs(flow_id).await.unwrap();
684 assert_eq!(addrs, BTreeMap::from([(0, "route-addr".to_string())]));
685 }
686
687 #[tokio::test]
688 async fn test_flownode_addrs_skips_empty_addresses() {
689 let mem_kv = Arc::new(MemoryKvBackend::default());
690 let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
691 let flow_id = 10;
692 let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024]);
693 let flow_routes = vec![(
694 0u32,
695 FlowRouteValue {
696 peer: Peer::empty(1),
697 },
698 )];
699
700 flow_metadata_manager
701 .create_flow_metadata(flow_id, flow_value, flow_routes)
702 .await
703 .unwrap();
704
705 let addrs = flow_metadata_manager.flownode_addrs(flow_id).await.unwrap();
706 assert!(addrs.is_empty());
707 }
708
709 #[tokio::test]
710 async fn test_create_flow_metadata_flow_exists_err() {
711 let mem_kv = Arc::new(MemoryKvBackend::default());
712 let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
713 let flow_id = 10;
714 let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
715 let flow_routes = vec![
716 (
717 1u32,
718 FlowRouteValue {
719 peer: Peer::empty(1),
720 },
721 ),
722 (
723 2,
724 FlowRouteValue {
725 peer: Peer::empty(2),
726 },
727 ),
728 ];
729 flow_metadata_manager
730 .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
731 .await
732 .unwrap();
733 let err = flow_metadata_manager
735 .create_flow_metadata(flow_id + 1, flow_value, flow_routes.clone())
736 .await
737 .unwrap_err();
738 assert_matches!(err, error::Error::FlowAlreadyExists { .. });
739 }
740
741 #[tokio::test]
742 async fn test_create_flow_metadata_unexpected_err() {
743 let mem_kv = Arc::new(MemoryKvBackend::default());
744 let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
745 let flow_id = 10;
746 let catalog_name = "greptime";
747 let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
748 let flow_routes = vec![
749 (
750 1u32,
751 FlowRouteValue {
752 peer: Peer::empty(1),
753 },
754 ),
755 (
756 2,
757 FlowRouteValue {
758 peer: Peer::empty(2),
759 },
760 ),
761 ];
762 flow_metadata_manager
763 .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
764 .await
765 .unwrap();
766 let another_sink_table_name = TableName {
768 catalog_name: catalog_name.to_string(),
769 schema_name: "my_schema".to_string(),
770 table_name: "another_sink_table".to_string(),
771 };
772 let flow_value = FlowInfoValue {
773 catalog_name: "greptime".to_string(),
774 query_context: None,
775 flow_name: "flow".to_string(),
776 source_table_ids: vec![1024, 1025, 1026],
777 sink_table_name: another_sink_table_name,
778 flownode_ids: [(0, 1u64)].into(),
779 raw_sql: "raw".to_string(),
780 expire_after: Some(300),
781 eval_interval_secs: None,
782 comment: "hi".to_string(),
783 options: Default::default(),
784 created_time: chrono::Utc::now(),
785 updated_time: chrono::Utc::now(),
786 };
787 let err = flow_metadata_manager
788 .create_flow_metadata(flow_id, flow_value, flow_routes.clone())
789 .await
790 .unwrap_err();
791 assert!(err.to_string().contains("Reads the different value"));
792 }
793
794 #[tokio::test]
795 async fn test_destroy_flow_metadata() {
796 let mem_kv = Arc::new(MemoryKvBackend::default());
797 let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
798 let flow_id = 10;
799 let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
800 let flow_routes = vec![(
801 0u32,
802 FlowRouteValue {
803 peer: Peer::empty(1),
804 },
805 )];
806 flow_metadata_manager
807 .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
808 .await
809 .unwrap();
810
811 flow_metadata_manager
812 .destroy_flow_metadata(flow_id, &flow_value)
813 .await
814 .unwrap();
815 flow_metadata_manager
817 .destroy_flow_metadata(flow_id, &flow_value)
818 .await
819 .unwrap();
820 assert!(mem_kv.is_empty())
822 }
823
824 #[tokio::test]
825 async fn test_update_flow_metadata() {
826 let mem_kv = Arc::new(MemoryKvBackend::default());
827 let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
828 let flow_id = 10;
829 let flow_value = test_flow_info_value(
830 "flow",
831 [(0, 1u64), (1, 2u64)].into(),
832 vec![1024, 1025, 1026],
833 );
834 let flow_routes = vec![
835 (
836 1u32,
837 FlowRouteValue {
838 peer: Peer::empty(1),
839 },
840 ),
841 (
842 2,
843 FlowRouteValue {
844 peer: Peer::empty(2),
845 },
846 ),
847 ];
848 flow_metadata_manager
849 .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
850 .await
851 .unwrap();
852
853 let new_flow_value = {
854 let mut tmp = flow_value.clone();
855 tmp.raw_sql = "new".to_string();
856 tmp
857 };
858
859 flow_metadata_manager
861 .update_flow_metadata(
862 flow_id,
863 &DeserializedValueWithBytes::from_inner(flow_value.clone()),
864 &new_flow_value,
865 flow_routes.clone(),
866 )
867 .await
868 .unwrap();
869
870 let got = flow_metadata_manager
871 .flow_info_manager()
872 .get(flow_id)
873 .await
874 .unwrap()
875 .unwrap();
876 let routes = flow_metadata_manager
877 .flow_route_manager()
878 .routes(flow_id)
879 .await
880 .unwrap();
881 assert_eq!(
882 routes,
883 vec![
884 (
885 FlowRouteKey::new(flow_id, 1),
886 FlowRouteValue {
887 peer: Peer::empty(1),
888 },
889 ),
890 (
891 FlowRouteKey::new(flow_id, 2),
892 FlowRouteValue {
893 peer: Peer::empty(2),
894 },
895 ),
896 ]
897 );
898 assert_eq!(got, new_flow_value);
899 let flows = flow_metadata_manager
900 .flownode_flow_manager()
901 .flows(1)
902 .try_collect::<Vec<_>>()
903 .await
904 .unwrap();
905 assert_eq!(flows, vec![(flow_id, 0)]);
906 for table_id in [1024, 1025, 1026] {
907 let nodes = flow_metadata_manager
908 .table_flow_manager()
909 .flows(table_id)
910 .await
911 .unwrap();
912 assert_eq!(
913 nodes,
914 vec![
915 (
916 TableFlowKey::new(table_id, 1, flow_id, 1),
917 TableFlowValue {
918 peer: Peer::empty(1)
919 }
920 ),
921 (
922 TableFlowKey::new(table_id, 2, flow_id, 2),
923 TableFlowValue {
924 peer: Peer::empty(2)
925 }
926 )
927 ]
928 );
929 }
930 }
931
932 #[tokio::test]
933 async fn test_update_flow_metadata_diff_flownode() {
934 let mem_kv = Arc::new(MemoryKvBackend::default());
935 let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
936 let flow_id = 10;
937 let flow_value = test_flow_info_value(
938 "flow",
939 [(0u32, 1u64), (1u32, 2u64)].into(),
940 vec![1024, 1025, 1026],
941 );
942 let flow_routes = vec![
943 (
944 0u32,
945 FlowRouteValue {
946 peer: Peer::empty(1),
947 },
948 ),
949 (
950 1,
951 FlowRouteValue {
952 peer: Peer::empty(2),
953 },
954 ),
955 ];
956 flow_metadata_manager
957 .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
958 .await
959 .unwrap();
960
961 let new_flow_value = {
962 let mut tmp = flow_value.clone();
963 tmp.raw_sql = "new".to_string();
964 tmp.flownode_ids = [(0, 3u64), (1, 4u64)].into();
966 tmp
967 };
968 let new_flow_routes = vec![
969 (
970 0u32,
971 FlowRouteValue {
972 peer: Peer::empty(3),
973 },
974 ),
975 (
976 1,
977 FlowRouteValue {
978 peer: Peer::empty(4),
979 },
980 ),
981 ];
982
983 flow_metadata_manager
985 .update_flow_metadata(
986 flow_id,
987 &DeserializedValueWithBytes::from_inner(flow_value.clone()),
988 &new_flow_value,
989 new_flow_routes.clone(),
990 )
991 .await
992 .unwrap();
993
994 let got = flow_metadata_manager
995 .flow_info_manager()
996 .get(flow_id)
997 .await
998 .unwrap()
999 .unwrap();
1000 let routes = flow_metadata_manager
1001 .flow_route_manager()
1002 .routes(flow_id)
1003 .await
1004 .unwrap();
1005 assert_eq!(
1006 routes,
1007 vec![
1008 (
1009 FlowRouteKey::new(flow_id, 0),
1010 FlowRouteValue {
1011 peer: Peer::empty(3),
1012 },
1013 ),
1014 (
1015 FlowRouteKey::new(flow_id, 1),
1016 FlowRouteValue {
1017 peer: Peer::empty(4),
1018 },
1019 ),
1020 ]
1021 );
1022 assert_eq!(got, new_flow_value);
1023
1024 let flows = flow_metadata_manager
1025 .flownode_flow_manager()
1026 .flows(1)
1027 .try_collect::<Vec<_>>()
1028 .await
1029 .unwrap();
1030 assert_eq!(flows, vec![]);
1032
1033 let flows = flow_metadata_manager
1034 .flownode_flow_manager()
1035 .flows(3)
1036 .try_collect::<Vec<_>>()
1037 .await
1038 .unwrap();
1039 assert_eq!(flows, vec![(flow_id, 0)]);
1040
1041 for table_id in [1024, 1025, 1026] {
1042 let nodes = flow_metadata_manager
1043 .table_flow_manager()
1044 .flows(table_id)
1045 .await
1046 .unwrap();
1047 assert_eq!(
1048 nodes,
1049 vec![
1050 (
1051 TableFlowKey::new(table_id, 3, flow_id, 0),
1052 TableFlowValue {
1053 peer: Peer::empty(3)
1054 }
1055 ),
1056 (
1057 TableFlowKey::new(table_id, 4, flow_id, 1),
1058 TableFlowValue {
1059 peer: Peer::empty(4)
1060 }
1061 )
1062 ]
1063 );
1064 }
1065 }
1066
1067 #[tokio::test]
1068 async fn test_update_flow_metadata_flow_replace_diff_id_err() {
1069 let mem_kv = Arc::new(MemoryKvBackend::default());
1070 let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
1071 let flow_id = 10;
1072 let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
1073 let flow_routes = vec![
1074 (
1075 1u32,
1076 FlowRouteValue {
1077 peer: Peer::empty(1),
1078 },
1079 ),
1080 (
1081 2,
1082 FlowRouteValue {
1083 peer: Peer::empty(2),
1084 },
1085 ),
1086 ];
1087 flow_metadata_manager
1088 .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
1089 .await
1090 .unwrap();
1091 flow_metadata_manager
1093 .update_flow_metadata(
1094 flow_id,
1095 &DeserializedValueWithBytes::from_inner(flow_value.clone()),
1096 &flow_value,
1097 flow_routes.clone(),
1098 )
1099 .await
1100 .unwrap();
1101 let err = flow_metadata_manager
1103 .update_flow_metadata(
1104 flow_id + 1,
1105 &DeserializedValueWithBytes::from_inner(flow_value.clone()),
1106 &flow_value,
1107 flow_routes,
1108 )
1109 .await
1110 .unwrap_err();
1111 assert_matches!(err, error::Error::Unexpected { .. });
1112 assert!(
1113 err.to_string()
1114 .contains("Reads different flow id when updating flow")
1115 );
1116 }
1117
1118 #[tokio::test]
1119 async fn test_update_flow_metadata_unexpected_err_prev_value_diff() {
1120 let mem_kv = Arc::new(MemoryKvBackend::default());
1121 let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
1122 let flow_id = 10;
1123 let catalog_name = "greptime";
1124 let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
1125 let flow_routes = vec![
1126 (
1127 1u32,
1128 FlowRouteValue {
1129 peer: Peer::empty(1),
1130 },
1131 ),
1132 (
1133 2,
1134 FlowRouteValue {
1135 peer: Peer::empty(2),
1136 },
1137 ),
1138 ];
1139 flow_metadata_manager
1140 .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
1141 .await
1142 .unwrap();
1143 let another_sink_table_name = TableName {
1145 catalog_name: catalog_name.to_string(),
1146 schema_name: "my_schema".to_string(),
1147 table_name: "another_sink_table".to_string(),
1148 };
1149 let flow_value = FlowInfoValue {
1150 catalog_name: "greptime".to_string(),
1151 query_context: None,
1152 flow_name: "flow".to_string(),
1153 source_table_ids: vec![1024, 1025, 1026],
1154 sink_table_name: another_sink_table_name,
1155 flownode_ids: [(0, 1u64)].into(),
1156 raw_sql: "raw".to_string(),
1157 expire_after: Some(300),
1158 eval_interval_secs: None,
1159 comment: "hi".to_string(),
1160 options: Default::default(),
1161 created_time: chrono::Utc::now(),
1162 updated_time: chrono::Utc::now(),
1163 };
1164 let err = flow_metadata_manager
1165 .update_flow_metadata(
1166 flow_id,
1167 &DeserializedValueWithBytes::from_inner(flow_value.clone()),
1168 &flow_value,
1169 flow_routes.clone(),
1170 )
1171 .await
1172 .unwrap_err();
1173 assert!(
1174 err.to_string().contains("Reads the different value"),
1175 "error: {:?}",
1176 err
1177 );
1178 }
1179}