1pub mod flow_info;
16pub(crate) mod flow_name;
17pub mod flow_route;
18pub mod flow_state;
19mod flownode_addr_helper;
20pub(crate) mod flownode_flow;
21pub(crate) mod table_flow;
22use std::ops::Deref;
23use std::sync::Arc;
24
25use common_telemetry::info;
26use flow_route::{FlowRouteKey, FlowRouteManager, FlowRouteValue};
27use snafu::{ensure, OptionExt};
28use table_flow::TableFlowValue;
29
30use self::flow_info::{FlowInfoKey, FlowInfoValue};
31use self::flow_name::FlowNameKey;
32use self::flownode_flow::FlownodeFlowKey;
33use self::table_flow::TableFlowKey;
34use crate::ensure_values;
35use crate::error::{self, Result};
36use crate::key::flow::flow_info::FlowInfoManager;
37use crate::key::flow::flow_name::FlowNameManager;
38use crate::key::flow::flow_state::FlowStateManager;
39use crate::key::flow::flownode_flow::FlownodeFlowManager;
40pub use crate::key::flow::table_flow::{TableFlowManager, TableFlowManagerRef};
41use crate::key::txn_helper::TxnOpGetResponseSet;
42use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey};
43use crate::kv_backend::txn::Txn;
44use crate::kv_backend::KvBackendRef;
45use crate::rpc::store::BatchDeleteRequest;
46
47#[derive(Debug, Clone, PartialEq)]
49pub struct FlowScoped<T> {
50 inner: T,
51}
52
53impl<T> Deref for FlowScoped<T> {
54 type Target = T;
55
56 fn deref(&self) -> &Self::Target {
57 &self.inner
58 }
59}
60
61impl<T> FlowScoped<T> {
62 const PREFIX: &'static str = "__flow/";
63
64 pub fn new(inner: T) -> FlowScoped<T> {
66 Self { inner }
67 }
68}
69
70impl<'a, T: MetadataKey<'a, T>> MetadataKey<'a, FlowScoped<T>> for FlowScoped<T> {
71 fn to_bytes(&self) -> Vec<u8> {
72 let prefix = FlowScoped::<T>::PREFIX.as_bytes();
73 let inner = self.inner.to_bytes();
74 let mut bytes = Vec::with_capacity(prefix.len() + inner.len());
75 bytes.extend(prefix);
76 bytes.extend(inner);
77 bytes
78 }
79
80 fn from_bytes(bytes: &'a [u8]) -> Result<FlowScoped<T>> {
81 let prefix = FlowScoped::<T>::PREFIX.as_bytes();
82 ensure!(
83 bytes.starts_with(prefix),
84 error::MismatchPrefixSnafu {
85 prefix: String::from_utf8_lossy(prefix),
86 key: String::from_utf8_lossy(bytes),
87 }
88 );
89 let inner = T::from_bytes(&bytes[prefix.len()..])?;
90 Ok(FlowScoped { inner })
91 }
92}
93
94pub type FlowMetadataManagerRef = Arc<FlowMetadataManager>;
95
96pub struct FlowMetadataManager {
101 flow_info_manager: FlowInfoManager,
102 flow_route_manager: FlowRouteManager,
103 flownode_flow_manager: FlownodeFlowManager,
104 table_flow_manager: TableFlowManager,
105 flow_name_manager: FlowNameManager,
106 flow_state_manager: Option<FlowStateManager>,
108 kv_backend: KvBackendRef,
109}
110
111impl FlowMetadataManager {
112 pub fn new(kv_backend: KvBackendRef) -> Self {
114 Self {
115 flow_info_manager: FlowInfoManager::new(kv_backend.clone()),
116 flow_route_manager: FlowRouteManager::new(kv_backend.clone()),
117 flow_name_manager: FlowNameManager::new(kv_backend.clone()),
118 flownode_flow_manager: FlownodeFlowManager::new(kv_backend.clone()),
119 table_flow_manager: TableFlowManager::new(kv_backend.clone()),
120 flow_state_manager: None,
121 kv_backend,
122 }
123 }
124
125 pub fn flow_name_manager(&self) -> &FlowNameManager {
127 &self.flow_name_manager
128 }
129
130 pub fn flow_state_manager(&self) -> Option<&FlowStateManager> {
131 self.flow_state_manager.as_ref()
132 }
133
134 pub fn flow_info_manager(&self) -> &FlowInfoManager {
136 &self.flow_info_manager
137 }
138
139 pub fn flow_route_manager(&self) -> &FlowRouteManager {
141 &self.flow_route_manager
142 }
143
144 pub fn flownode_flow_manager(&self) -> &FlownodeFlowManager {
146 &self.flownode_flow_manager
147 }
148
149 pub fn table_flow_manager(&self) -> &TableFlowManager {
151 &self.table_flow_manager
152 }
153
154 pub async fn create_flow_metadata(
156 &self,
157 flow_id: FlowId,
158 flow_info: FlowInfoValue,
159 flow_routes: Vec<(FlowPartitionId, FlowRouteValue)>,
160 ) -> Result<()> {
161 let (create_flow_flow_name_txn, on_create_flow_flow_name_failure) = self
162 .flow_name_manager
163 .build_create_txn(&flow_info.catalog_name, &flow_info.flow_name, flow_id)?;
164
165 let (create_flow_txn, on_create_flow_failure) = self
166 .flow_info_manager
167 .build_create_txn(flow_id, &flow_info)?;
168
169 let create_flow_routes_txn = self
170 .flow_route_manager
171 .build_create_txn(flow_id, flow_routes.clone())?;
172
173 let create_flownode_flow_txn = self
174 .flownode_flow_manager
175 .build_create_txn(flow_id, flow_info.flownode_ids().clone());
176
177 let create_table_flow_txn = self.table_flow_manager.build_create_txn(
178 flow_id,
179 flow_routes
180 .into_iter()
181 .map(|(partition_id, route)| (partition_id, TableFlowValue { peer: route.peer }))
182 .collect(),
183 flow_info.source_table_ids(),
184 )?;
185
186 let txn = Txn::merge_all(vec![
187 create_flow_flow_name_txn,
188 create_flow_txn,
189 create_flow_routes_txn,
190 create_flownode_flow_txn,
191 create_table_flow_txn,
192 ]);
193 info!(
194 "Creating flow {}.{}({}), with {} txn operations",
195 flow_info.catalog_name,
196 flow_info.flow_name,
197 flow_id,
198 txn.max_operations()
199 );
200
201 let mut resp = self.kv_backend.txn(txn).await?;
202 if !resp.succeeded {
203 let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
204 let remote_flow_flow_name =
205 on_create_flow_flow_name_failure(&mut set)?.with_context(|| {
206 error::UnexpectedSnafu {
207 err_msg: format!(
208 "Reads the empty flow name in comparing operation of the creating flow, flow_id: {flow_id}"
209 ),
210 }
211 })?;
212
213 if remote_flow_flow_name.flow_id() != flow_id {
214 info!(
215 "Trying to create flow {}.{}({}), but flow({}) already exists",
216 flow_info.catalog_name,
217 flow_info.flow_name,
218 flow_id,
219 remote_flow_flow_name.flow_id()
220 );
221
222 return error::FlowAlreadyExistsSnafu {
223 flow_name: format!("{}.{}", flow_info.catalog_name, flow_info.flow_name),
224 }
225 .fail();
226 }
227
228 let remote_flow =
229 on_create_flow_failure(&mut set)?.with_context(|| error::UnexpectedSnafu {
230 err_msg: format!(
231 "Reads the empty flow in comparing operation of creating flow, flow_id: {flow_id}"
232 ),
233 })?;
234 let op_name = "creating flow";
235 ensure_values!(*remote_flow, flow_info, op_name);
236 }
237
238 Ok(())
239 }
240
241 pub async fn update_flow_metadata(
243 &self,
244 flow_id: FlowId,
245 current_flow_info: &DeserializedValueWithBytes<FlowInfoValue>,
246 new_flow_info: &FlowInfoValue,
247 flow_routes: Vec<(FlowPartitionId, FlowRouteValue)>,
248 ) -> Result<()> {
249 let (update_flow_flow_name_txn, on_create_flow_flow_name_failure) =
250 self.flow_name_manager.build_update_txn(
251 &new_flow_info.catalog_name,
252 &new_flow_info.flow_name,
253 flow_id,
254 )?;
255
256 let (update_flow_txn, on_create_flow_failure) =
257 self.flow_info_manager
258 .build_update_txn(flow_id, current_flow_info, new_flow_info)?;
259
260 let update_flow_routes_txn = self.flow_route_manager.build_update_txn(
261 flow_id,
262 current_flow_info,
263 flow_routes.clone(),
264 )?;
265
266 let update_flownode_flow_txn = self.flownode_flow_manager.build_update_txn(
267 flow_id,
268 current_flow_info,
269 new_flow_info.flownode_ids().clone(),
270 );
271
272 let update_table_flow_txn = self.table_flow_manager.build_update_txn(
273 flow_id,
274 current_flow_info,
275 flow_routes
276 .into_iter()
277 .map(|(partition_id, route)| (partition_id, TableFlowValue { peer: route.peer }))
278 .collect(),
279 new_flow_info.source_table_ids(),
280 )?;
281
282 let txn = Txn::merge_all(vec![
283 update_flow_flow_name_txn,
284 update_flow_txn,
285 update_flow_routes_txn,
286 update_flownode_flow_txn,
287 update_table_flow_txn,
288 ]);
289 info!(
290 "Creating flow {}.{}({}), with {} txn operations",
291 new_flow_info.catalog_name,
292 new_flow_info.flow_name,
293 flow_id,
294 txn.max_operations()
295 );
296
297 let mut resp = self.kv_backend.txn(txn).await?;
298 if !resp.succeeded {
299 let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
300 let remote_flow_flow_name =
301 on_create_flow_flow_name_failure(&mut set)?.with_context(|| {
302 error::UnexpectedSnafu {
303 err_msg: format!(
304 "Reads the empty flow name in comparing operation of the updating flow, flow_id: {flow_id}"
305 ),
306 }
307 })?;
308
309 if remote_flow_flow_name.flow_id() != flow_id {
310 info!(
311 "Trying to updating flow {}.{}({}), but flow({}) already exists with a different flow id",
312 new_flow_info.catalog_name,
313 new_flow_info.flow_name,
314 flow_id,
315 remote_flow_flow_name.flow_id()
316 );
317
318 return error::UnexpectedSnafu {
319 err_msg: format!(
320 "Reads different flow id when updating flow({2}.{3}), prev flow id = {0}, updating with flow id = {1}",
321 remote_flow_flow_name.flow_id(),
322 flow_id,
323 new_flow_info.catalog_name,
324 new_flow_info.flow_name,
325 ),
326 }.fail();
327 }
328
329 let remote_flow =
330 on_create_flow_failure(&mut set)?.with_context(|| error::UnexpectedSnafu {
331 err_msg: format!(
332 "Reads the empty flow in comparing operation of the updating flow, flow_id: {flow_id}"
333 ),
334 })?;
335 let op_name = "updating flow";
336 ensure_values!(*remote_flow, new_flow_info.clone(), op_name);
337 }
338
339 Ok(())
340 }
341
342 fn flow_metadata_keys(&self, flow_id: FlowId, flow_value: &FlowInfoValue) -> Vec<Vec<u8>> {
343 let source_table_ids = flow_value.source_table_ids();
344 let mut keys =
345 Vec::with_capacity(2 + flow_value.flownode_ids.len() * (source_table_ids.len() + 2));
346 let flow_name = FlowNameKey::new(&flow_value.catalog_name, &flow_value.flow_name);
348 keys.push(flow_name.to_bytes());
349
350 let flow_info_key = FlowInfoKey::new(flow_id);
352 keys.push(flow_info_key.to_bytes());
353
354 flow_value
356 .flownode_ids
357 .iter()
358 .for_each(|(&partition_id, &flownode_id)| {
359 keys.push(FlownodeFlowKey::new(flownode_id, flow_id, partition_id).to_bytes());
360 keys.push(FlowRouteKey::new(flow_id, partition_id).to_bytes());
361 source_table_ids.iter().for_each(|&table_id| {
362 keys.push(
363 TableFlowKey::new(table_id, flownode_id, flow_id, partition_id).to_bytes(),
364 );
365 })
366 });
367 keys
368 }
369
370 pub async fn destroy_flow_metadata(
372 &self,
373 flow_id: FlowId,
374 flow_value: &FlowInfoValue,
375 ) -> Result<()> {
376 let keys = self.flow_metadata_keys(flow_id, flow_value);
377 let _ = self
378 .kv_backend
379 .batch_delete(BatchDeleteRequest::new().with_keys(keys))
380 .await?;
381 Ok(())
382 }
383}
384
385impl std::fmt::Debug for FlowMetadataManager {
386 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
387 f.debug_struct("FlowMetadataManager").finish()
388 }
389}
390
391#[cfg(test)]
392mod tests {
393 use std::assert_matches::assert_matches;
394 use std::collections::BTreeMap;
395 use std::sync::Arc;
396
397 use futures::TryStreamExt;
398 use table::metadata::TableId;
399 use table::table_name::TableName;
400
401 use super::*;
402 use crate::key::flow::table_flow::TableFlowKey;
403 use crate::key::FlowPartitionId;
404 use crate::kv_backend::memory::MemoryKvBackend;
405 use crate::peer::Peer;
406 use crate::FlownodeId;
407
408 #[derive(Debug)]
409 struct MockKey {
410 inner: Vec<u8>,
411 }
412
413 impl<'a> MetadataKey<'a, MockKey> for MockKey {
414 fn to_bytes(&self) -> Vec<u8> {
415 self.inner.clone()
416 }
417
418 fn from_bytes(bytes: &'a [u8]) -> Result<MockKey> {
419 Ok(MockKey {
420 inner: bytes.to_vec(),
421 })
422 }
423 }
424
425 #[test]
426 fn test_flow_scoped_to_bytes() {
427 let key = FlowScoped::new(MockKey {
428 inner: b"hi".to_vec(),
429 });
430 assert_eq!(b"__flow/hi".to_vec(), key.to_bytes());
431 }
432
433 #[test]
434 fn test_flow_scoped_from_bytes() {
435 let bytes = b"__flow/hi";
436 let key = FlowScoped::<MockKey>::from_bytes(bytes).unwrap();
437 assert_eq!(key.inner.inner, b"hi".to_vec());
438 }
439
440 #[test]
441 fn test_flow_scoped_from_bytes_mismatch() {
442 let bytes = b"__table/hi";
443 let err = FlowScoped::<MockKey>::from_bytes(bytes).unwrap_err();
444 assert_matches!(err, error::Error::MismatchPrefix { .. });
445 }
446
447 fn test_flow_info_value(
448 flow_name: &str,
449 flownode_ids: BTreeMap<FlowPartitionId, FlownodeId>,
450 source_table_ids: Vec<TableId>,
451 ) -> FlowInfoValue {
452 let catalog_name = "greptime";
453 let sink_table_name = TableName {
454 catalog_name: catalog_name.to_string(),
455 schema_name: "my_schema".to_string(),
456 table_name: "sink_table".to_string(),
457 };
458 FlowInfoValue {
459 catalog_name: catalog_name.to_string(),
460 query_context: None,
461 flow_name: flow_name.to_string(),
462 source_table_ids,
463 sink_table_name,
464 flownode_ids,
465 raw_sql: "raw".to_string(),
466 expire_after: Some(300),
467 comment: "hi".to_string(),
468 options: Default::default(),
469 created_time: chrono::Utc::now(),
470 updated_time: chrono::Utc::now(),
471 }
472 }
473
474 #[tokio::test]
475 async fn test_create_flow_metadata() {
476 let mem_kv = Arc::new(MemoryKvBackend::default());
477 let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
478 let flow_id = 10;
479 let flow_value = test_flow_info_value(
480 "flow",
481 [(0, 1u64), (1, 2u64)].into(),
482 vec![1024, 1025, 1026],
483 );
484 let flow_routes = vec![
485 (
486 1u32,
487 FlowRouteValue {
488 peer: Peer::empty(1),
489 },
490 ),
491 (
492 2,
493 FlowRouteValue {
494 peer: Peer::empty(2),
495 },
496 ),
497 ];
498 flow_metadata_manager
499 .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
500 .await
501 .unwrap();
502 flow_metadata_manager
504 .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
505 .await
506 .unwrap();
507 let got = flow_metadata_manager
508 .flow_info_manager()
509 .get(flow_id)
510 .await
511 .unwrap()
512 .unwrap();
513 let routes = flow_metadata_manager
514 .flow_route_manager()
515 .routes(flow_id)
516 .await
517 .unwrap();
518 assert_eq!(
519 routes,
520 vec![
521 (
522 FlowRouteKey::new(flow_id, 1),
523 FlowRouteValue {
524 peer: Peer::empty(1),
525 },
526 ),
527 (
528 FlowRouteKey::new(flow_id, 2),
529 FlowRouteValue {
530 peer: Peer::empty(2),
531 },
532 ),
533 ]
534 );
535 assert_eq!(got, flow_value);
536 let flows = flow_metadata_manager
537 .flownode_flow_manager()
538 .flows(1)
539 .try_collect::<Vec<_>>()
540 .await
541 .unwrap();
542 assert_eq!(flows, vec![(flow_id, 0)]);
543 for table_id in [1024, 1025, 1026] {
544 let nodes = flow_metadata_manager
545 .table_flow_manager()
546 .flows(table_id)
547 .await
548 .unwrap();
549 assert_eq!(
550 nodes,
551 vec![
552 (
553 TableFlowKey::new(table_id, 1, flow_id, 1),
554 TableFlowValue {
555 peer: Peer::empty(1)
556 }
557 ),
558 (
559 TableFlowKey::new(table_id, 2, flow_id, 2),
560 TableFlowValue {
561 peer: Peer::empty(2)
562 }
563 )
564 ]
565 );
566 }
567 }
568
569 #[tokio::test]
570 async fn test_create_flow_metadata_flow_exists_err() {
571 let mem_kv = Arc::new(MemoryKvBackend::default());
572 let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
573 let flow_id = 10;
574 let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
575 let flow_routes = vec![
576 (
577 1u32,
578 FlowRouteValue {
579 peer: Peer::empty(1),
580 },
581 ),
582 (
583 2,
584 FlowRouteValue {
585 peer: Peer::empty(2),
586 },
587 ),
588 ];
589 flow_metadata_manager
590 .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
591 .await
592 .unwrap();
593 let err = flow_metadata_manager
595 .create_flow_metadata(flow_id + 1, flow_value, flow_routes.clone())
596 .await
597 .unwrap_err();
598 assert_matches!(err, error::Error::FlowAlreadyExists { .. });
599 }
600
601 #[tokio::test]
602 async fn test_create_flow_metadata_unexpected_err() {
603 let mem_kv = Arc::new(MemoryKvBackend::default());
604 let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
605 let flow_id = 10;
606 let catalog_name = "greptime";
607 let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
608 let flow_routes = vec![
609 (
610 1u32,
611 FlowRouteValue {
612 peer: Peer::empty(1),
613 },
614 ),
615 (
616 2,
617 FlowRouteValue {
618 peer: Peer::empty(2),
619 },
620 ),
621 ];
622 flow_metadata_manager
623 .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
624 .await
625 .unwrap();
626 let another_sink_table_name = TableName {
628 catalog_name: catalog_name.to_string(),
629 schema_name: "my_schema".to_string(),
630 table_name: "another_sink_table".to_string(),
631 };
632 let flow_value = FlowInfoValue {
633 catalog_name: "greptime".to_string(),
634 query_context: None,
635 flow_name: "flow".to_string(),
636 source_table_ids: vec![1024, 1025, 1026],
637 sink_table_name: another_sink_table_name,
638 flownode_ids: [(0, 1u64)].into(),
639 raw_sql: "raw".to_string(),
640 expire_after: Some(300),
641 comment: "hi".to_string(),
642 options: Default::default(),
643 created_time: chrono::Utc::now(),
644 updated_time: chrono::Utc::now(),
645 };
646 let err = flow_metadata_manager
647 .create_flow_metadata(flow_id, flow_value, flow_routes.clone())
648 .await
649 .unwrap_err();
650 assert!(err.to_string().contains("Reads the different value"));
651 }
652
653 #[tokio::test]
654 async fn test_destroy_flow_metadata() {
655 let mem_kv = Arc::new(MemoryKvBackend::default());
656 let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
657 let flow_id = 10;
658 let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
659 let flow_routes = vec![(
660 0u32,
661 FlowRouteValue {
662 peer: Peer::empty(1),
663 },
664 )];
665 flow_metadata_manager
666 .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
667 .await
668 .unwrap();
669
670 flow_metadata_manager
671 .destroy_flow_metadata(flow_id, &flow_value)
672 .await
673 .unwrap();
674 flow_metadata_manager
676 .destroy_flow_metadata(flow_id, &flow_value)
677 .await
678 .unwrap();
679 assert!(mem_kv.is_empty())
681 }
682
683 #[tokio::test]
684 async fn test_update_flow_metadata() {
685 let mem_kv = Arc::new(MemoryKvBackend::default());
686 let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
687 let flow_id = 10;
688 let flow_value = test_flow_info_value(
689 "flow",
690 [(0, 1u64), (1, 2u64)].into(),
691 vec![1024, 1025, 1026],
692 );
693 let flow_routes = vec![
694 (
695 1u32,
696 FlowRouteValue {
697 peer: Peer::empty(1),
698 },
699 ),
700 (
701 2,
702 FlowRouteValue {
703 peer: Peer::empty(2),
704 },
705 ),
706 ];
707 flow_metadata_manager
708 .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
709 .await
710 .unwrap();
711
712 let new_flow_value = {
713 let mut tmp = flow_value.clone();
714 tmp.raw_sql = "new".to_string();
715 tmp
716 };
717
718 flow_metadata_manager
720 .update_flow_metadata(
721 flow_id,
722 &DeserializedValueWithBytes::from_inner(flow_value.clone()),
723 &new_flow_value,
724 flow_routes.clone(),
725 )
726 .await
727 .unwrap();
728
729 let got = flow_metadata_manager
730 .flow_info_manager()
731 .get(flow_id)
732 .await
733 .unwrap()
734 .unwrap();
735 let routes = flow_metadata_manager
736 .flow_route_manager()
737 .routes(flow_id)
738 .await
739 .unwrap();
740 assert_eq!(
741 routes,
742 vec![
743 (
744 FlowRouteKey::new(flow_id, 1),
745 FlowRouteValue {
746 peer: Peer::empty(1),
747 },
748 ),
749 (
750 FlowRouteKey::new(flow_id, 2),
751 FlowRouteValue {
752 peer: Peer::empty(2),
753 },
754 ),
755 ]
756 );
757 assert_eq!(got, new_flow_value);
758 let flows = flow_metadata_manager
759 .flownode_flow_manager()
760 .flows(1)
761 .try_collect::<Vec<_>>()
762 .await
763 .unwrap();
764 assert_eq!(flows, vec![(flow_id, 0)]);
765 for table_id in [1024, 1025, 1026] {
766 let nodes = flow_metadata_manager
767 .table_flow_manager()
768 .flows(table_id)
769 .await
770 .unwrap();
771 assert_eq!(
772 nodes,
773 vec![
774 (
775 TableFlowKey::new(table_id, 1, flow_id, 1),
776 TableFlowValue {
777 peer: Peer::empty(1)
778 }
779 ),
780 (
781 TableFlowKey::new(table_id, 2, flow_id, 2),
782 TableFlowValue {
783 peer: Peer::empty(2)
784 }
785 )
786 ]
787 );
788 }
789 }
790
791 #[tokio::test]
792 async fn test_update_flow_metadata_diff_flownode() {
793 let mem_kv = Arc::new(MemoryKvBackend::default());
794 let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
795 let flow_id = 10;
796 let flow_value = test_flow_info_value(
797 "flow",
798 [(0u32, 1u64), (1u32, 2u64)].into(),
799 vec![1024, 1025, 1026],
800 );
801 let flow_routes = vec![
802 (
803 0u32,
804 FlowRouteValue {
805 peer: Peer::empty(1),
806 },
807 ),
808 (
809 1,
810 FlowRouteValue {
811 peer: Peer::empty(2),
812 },
813 ),
814 ];
815 flow_metadata_manager
816 .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
817 .await
818 .unwrap();
819
820 let new_flow_value = {
821 let mut tmp = flow_value.clone();
822 tmp.raw_sql = "new".to_string();
823 tmp.flownode_ids = [(0, 3u64), (1, 4u64)].into();
825 tmp
826 };
827 let new_flow_routes = vec![
828 (
829 0u32,
830 FlowRouteValue {
831 peer: Peer::empty(3),
832 },
833 ),
834 (
835 1,
836 FlowRouteValue {
837 peer: Peer::empty(4),
838 },
839 ),
840 ];
841
842 flow_metadata_manager
844 .update_flow_metadata(
845 flow_id,
846 &DeserializedValueWithBytes::from_inner(flow_value.clone()),
847 &new_flow_value,
848 new_flow_routes.clone(),
849 )
850 .await
851 .unwrap();
852
853 let got = flow_metadata_manager
854 .flow_info_manager()
855 .get(flow_id)
856 .await
857 .unwrap()
858 .unwrap();
859 let routes = flow_metadata_manager
860 .flow_route_manager()
861 .routes(flow_id)
862 .await
863 .unwrap();
864 assert_eq!(
865 routes,
866 vec![
867 (
868 FlowRouteKey::new(flow_id, 0),
869 FlowRouteValue {
870 peer: Peer::empty(3),
871 },
872 ),
873 (
874 FlowRouteKey::new(flow_id, 1),
875 FlowRouteValue {
876 peer: Peer::empty(4),
877 },
878 ),
879 ]
880 );
881 assert_eq!(got, new_flow_value);
882
883 let flows = flow_metadata_manager
884 .flownode_flow_manager()
885 .flows(1)
886 .try_collect::<Vec<_>>()
887 .await
888 .unwrap();
889 assert_eq!(flows, vec![]);
891
892 let flows = flow_metadata_manager
893 .flownode_flow_manager()
894 .flows(3)
895 .try_collect::<Vec<_>>()
896 .await
897 .unwrap();
898 assert_eq!(flows, vec![(flow_id, 0)]);
899
900 for table_id in [1024, 1025, 1026] {
901 let nodes = flow_metadata_manager
902 .table_flow_manager()
903 .flows(table_id)
904 .await
905 .unwrap();
906 assert_eq!(
907 nodes,
908 vec![
909 (
910 TableFlowKey::new(table_id, 3, flow_id, 0),
911 TableFlowValue {
912 peer: Peer::empty(3)
913 }
914 ),
915 (
916 TableFlowKey::new(table_id, 4, flow_id, 1),
917 TableFlowValue {
918 peer: Peer::empty(4)
919 }
920 )
921 ]
922 );
923 }
924 }
925
926 #[tokio::test]
927 async fn test_update_flow_metadata_flow_replace_diff_id_err() {
928 let mem_kv = Arc::new(MemoryKvBackend::default());
929 let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
930 let flow_id = 10;
931 let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
932 let flow_routes = vec![
933 (
934 1u32,
935 FlowRouteValue {
936 peer: Peer::empty(1),
937 },
938 ),
939 (
940 2,
941 FlowRouteValue {
942 peer: Peer::empty(2),
943 },
944 ),
945 ];
946 flow_metadata_manager
947 .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
948 .await
949 .unwrap();
950 flow_metadata_manager
952 .update_flow_metadata(
953 flow_id,
954 &DeserializedValueWithBytes::from_inner(flow_value.clone()),
955 &flow_value,
956 flow_routes.clone(),
957 )
958 .await
959 .unwrap();
960 let err = flow_metadata_manager
962 .update_flow_metadata(
963 flow_id + 1,
964 &DeserializedValueWithBytes::from_inner(flow_value.clone()),
965 &flow_value,
966 flow_routes,
967 )
968 .await
969 .unwrap_err();
970 assert_matches!(err, error::Error::Unexpected { .. });
971 assert!(err
972 .to_string()
973 .contains("Reads different flow id when updating flow"));
974 }
975
976 #[tokio::test]
977 async fn test_update_flow_metadata_unexpected_err_prev_value_diff() {
978 let mem_kv = Arc::new(MemoryKvBackend::default());
979 let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
980 let flow_id = 10;
981 let catalog_name = "greptime";
982 let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
983 let flow_routes = vec![
984 (
985 1u32,
986 FlowRouteValue {
987 peer: Peer::empty(1),
988 },
989 ),
990 (
991 2,
992 FlowRouteValue {
993 peer: Peer::empty(2),
994 },
995 ),
996 ];
997 flow_metadata_manager
998 .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
999 .await
1000 .unwrap();
1001 let another_sink_table_name = TableName {
1003 catalog_name: catalog_name.to_string(),
1004 schema_name: "my_schema".to_string(),
1005 table_name: "another_sink_table".to_string(),
1006 };
1007 let flow_value = FlowInfoValue {
1008 catalog_name: "greptime".to_string(),
1009 query_context: None,
1010 flow_name: "flow".to_string(),
1011 source_table_ids: vec![1024, 1025, 1026],
1012 sink_table_name: another_sink_table_name,
1013 flownode_ids: [(0, 1u64)].into(),
1014 raw_sql: "raw".to_string(),
1015 expire_after: Some(300),
1016 comment: "hi".to_string(),
1017 options: Default::default(),
1018 created_time: chrono::Utc::now(),
1019 updated_time: chrono::Utc::now(),
1020 };
1021 let err = flow_metadata_manager
1022 .update_flow_metadata(
1023 flow_id,
1024 &DeserializedValueWithBytes::from_inner(flow_value.clone()),
1025 &flow_value,
1026 flow_routes.clone(),
1027 )
1028 .await
1029 .unwrap_err();
1030 assert!(
1031 err.to_string().contains("Reads the different value"),
1032 "error: {:?}",
1033 err
1034 );
1035 }
1036}