1pub mod flow_info;
16pub(crate) mod flow_name;
17pub mod flow_route;
18pub mod flow_state;
19mod flownode_addr_helper;
20pub(crate) mod flownode_flow;
21pub(crate) mod table_flow;
22use std::ops::Deref;
23use std::sync::Arc;
24
25use common_telemetry::info;
26use flow_route::{FlowRouteKey, FlowRouteManager, FlowRouteValue};
27use snafu::{OptionExt, ensure};
28use table_flow::TableFlowValue;
29
30use self::flow_info::{FlowInfoKey, FlowInfoValue};
31use self::flow_name::FlowNameKey;
32use self::flownode_flow::FlownodeFlowKey;
33use self::table_flow::TableFlowKey;
34use crate::ensure_values;
35use crate::error::{self, Result};
36use crate::key::flow::flow_info::FlowInfoManager;
37use crate::key::flow::flow_name::FlowNameManager;
38use crate::key::flow::flow_state::FlowStateManager;
39use crate::key::flow::flownode_flow::FlownodeFlowManager;
40pub use crate::key::flow::table_flow::{TableFlowManager, TableFlowManagerRef};
41use crate::key::txn_helper::TxnOpGetResponseSet;
42use crate::key::{DeserializedValueWithBytes, FlowId, FlowPartitionId, MetadataKey};
43use crate::kv_backend::KvBackendRef;
44use crate::kv_backend::txn::Txn;
45use crate::rpc::store::BatchDeleteRequest;
46
47#[derive(Debug, Clone, PartialEq)]
49pub struct FlowScoped<T> {
50 inner: T,
51}
52
53impl<T> Deref for FlowScoped<T> {
54 type Target = T;
55
56 fn deref(&self) -> &Self::Target {
57 &self.inner
58 }
59}
60
61impl<T> FlowScoped<T> {
62 const PREFIX: &'static str = "__flow/";
63
64 pub fn new(inner: T) -> FlowScoped<T> {
66 Self { inner }
67 }
68}
69
70impl<'a, T: MetadataKey<'a, T>> MetadataKey<'a, FlowScoped<T>> for FlowScoped<T> {
71 fn to_bytes(&self) -> Vec<u8> {
72 let prefix = FlowScoped::<T>::PREFIX.as_bytes();
73 let inner = self.inner.to_bytes();
74 let mut bytes = Vec::with_capacity(prefix.len() + inner.len());
75 bytes.extend(prefix);
76 bytes.extend(inner);
77 bytes
78 }
79
80 fn from_bytes(bytes: &'a [u8]) -> Result<FlowScoped<T>> {
81 let prefix = FlowScoped::<T>::PREFIX.as_bytes();
82 ensure!(
83 bytes.starts_with(prefix),
84 error::MismatchPrefixSnafu {
85 prefix: String::from_utf8_lossy(prefix),
86 key: String::from_utf8_lossy(bytes),
87 }
88 );
89 let inner = T::from_bytes(&bytes[prefix.len()..])?;
90 Ok(FlowScoped { inner })
91 }
92}
93
94pub type FlowMetadataManagerRef = Arc<FlowMetadataManager>;
95
96pub struct FlowMetadataManager {
101 flow_info_manager: FlowInfoManager,
102 flow_route_manager: FlowRouteManager,
103 flownode_flow_manager: FlownodeFlowManager,
104 table_flow_manager: TableFlowManager,
105 flow_name_manager: FlowNameManager,
106 flow_state_manager: Option<FlowStateManager>,
108 kv_backend: KvBackendRef,
109}
110
111impl FlowMetadataManager {
112 pub fn new(kv_backend: KvBackendRef) -> Self {
114 Self {
115 flow_info_manager: FlowInfoManager::new(kv_backend.clone()),
116 flow_route_manager: FlowRouteManager::new(kv_backend.clone()),
117 flow_name_manager: FlowNameManager::new(kv_backend.clone()),
118 flownode_flow_manager: FlownodeFlowManager::new(kv_backend.clone()),
119 table_flow_manager: TableFlowManager::new(kv_backend.clone()),
120 flow_state_manager: None,
121 kv_backend,
122 }
123 }
124
125 pub fn flow_name_manager(&self) -> &FlowNameManager {
127 &self.flow_name_manager
128 }
129
130 pub fn flow_state_manager(&self) -> Option<&FlowStateManager> {
131 self.flow_state_manager.as_ref()
132 }
133
134 pub fn flow_info_manager(&self) -> &FlowInfoManager {
136 &self.flow_info_manager
137 }
138
139 pub fn flow_route_manager(&self) -> &FlowRouteManager {
141 &self.flow_route_manager
142 }
143
144 pub fn flownode_flow_manager(&self) -> &FlownodeFlowManager {
146 &self.flownode_flow_manager
147 }
148
149 pub fn table_flow_manager(&self) -> &TableFlowManager {
151 &self.table_flow_manager
152 }
153
154 pub async fn create_flow_metadata(
156 &self,
157 flow_id: FlowId,
158 flow_info: FlowInfoValue,
159 flow_routes: Vec<(FlowPartitionId, FlowRouteValue)>,
160 ) -> Result<()> {
161 let (create_flow_flow_name_txn, on_create_flow_flow_name_failure) = self
162 .flow_name_manager
163 .build_create_txn(&flow_info.catalog_name, &flow_info.flow_name, flow_id)?;
164
165 let (create_flow_txn, on_create_flow_failure) = self
166 .flow_info_manager
167 .build_create_txn(flow_id, &flow_info)?;
168
169 let create_flow_routes_txn = self
170 .flow_route_manager
171 .build_create_txn(flow_id, flow_routes.clone())?;
172
173 let create_flownode_flow_txn = self
174 .flownode_flow_manager
175 .build_create_txn(flow_id, flow_info.flownode_ids().clone());
176
177 let create_table_flow_txn = self.table_flow_manager.build_create_txn(
178 flow_id,
179 flow_routes
180 .into_iter()
181 .map(|(partition_id, route)| (partition_id, TableFlowValue { peer: route.peer }))
182 .collect(),
183 flow_info.source_table_ids(),
184 )?;
185
186 let txn = Txn::merge_all(vec![
187 create_flow_flow_name_txn,
188 create_flow_txn,
189 create_flow_routes_txn,
190 create_flownode_flow_txn,
191 create_table_flow_txn,
192 ]);
193 info!(
194 "Creating flow {}.{}({}), with {} txn operations",
195 flow_info.catalog_name,
196 flow_info.flow_name,
197 flow_id,
198 txn.max_operations()
199 );
200
201 let mut resp = self.kv_backend.txn(txn).await?;
202 if !resp.succeeded {
203 let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
204 let remote_flow_flow_name =
205 on_create_flow_flow_name_failure(&mut set)?.with_context(|| {
206 error::UnexpectedSnafu {
207 err_msg: format!(
208 "Reads the empty flow name in comparing operation of the creating flow, flow_id: {flow_id}"
209 ),
210 }
211 })?;
212
213 if remote_flow_flow_name.flow_id() != flow_id {
214 info!(
215 "Trying to create flow {}.{}({}), but flow({}) already exists",
216 flow_info.catalog_name,
217 flow_info.flow_name,
218 flow_id,
219 remote_flow_flow_name.flow_id()
220 );
221
222 return error::FlowAlreadyExistsSnafu {
223 flow_name: format!("{}.{}", flow_info.catalog_name, flow_info.flow_name),
224 }
225 .fail();
226 }
227
228 let remote_flow =
229 on_create_flow_failure(&mut set)?.with_context(|| error::UnexpectedSnafu {
230 err_msg: format!(
231 "Reads the empty flow in comparing operation of creating flow, flow_id: {flow_id}"
232 ),
233 })?;
234 let op_name = "creating flow";
235 ensure_values!(*remote_flow, flow_info, op_name);
236 }
237
238 Ok(())
239 }
240
241 pub async fn update_flow_metadata(
243 &self,
244 flow_id: FlowId,
245 current_flow_info: &DeserializedValueWithBytes<FlowInfoValue>,
246 new_flow_info: &FlowInfoValue,
247 flow_routes: Vec<(FlowPartitionId, FlowRouteValue)>,
248 ) -> Result<()> {
249 let (update_flow_flow_name_txn, on_create_flow_flow_name_failure) =
250 self.flow_name_manager.build_update_txn(
251 &new_flow_info.catalog_name,
252 &new_flow_info.flow_name,
253 flow_id,
254 )?;
255
256 let (update_flow_txn, on_create_flow_failure) =
257 self.flow_info_manager
258 .build_update_txn(flow_id, current_flow_info, new_flow_info)?;
259
260 let update_flow_routes_txn = self.flow_route_manager.build_update_txn(
261 flow_id,
262 current_flow_info,
263 flow_routes.clone(),
264 )?;
265
266 let update_flownode_flow_txn = self.flownode_flow_manager.build_update_txn(
267 flow_id,
268 current_flow_info,
269 new_flow_info.flownode_ids().clone(),
270 );
271
272 let update_table_flow_txn = self.table_flow_manager.build_update_txn(
273 flow_id,
274 current_flow_info,
275 flow_routes
276 .into_iter()
277 .map(|(partition_id, route)| (partition_id, TableFlowValue { peer: route.peer }))
278 .collect(),
279 new_flow_info.source_table_ids(),
280 )?;
281
282 let txn = Txn::merge_all(vec![
283 update_flow_flow_name_txn,
284 update_flow_txn,
285 update_flow_routes_txn,
286 update_flownode_flow_txn,
287 update_table_flow_txn,
288 ]);
289 info!(
290 "Creating flow {}.{}({}), with {} txn operations",
291 new_flow_info.catalog_name,
292 new_flow_info.flow_name,
293 flow_id,
294 txn.max_operations()
295 );
296
297 let mut resp = self.kv_backend.txn(txn).await?;
298 if !resp.succeeded {
299 let mut set = TxnOpGetResponseSet::from(&mut resp.responses);
300 let remote_flow_flow_name =
301 on_create_flow_flow_name_failure(&mut set)?.with_context(|| {
302 error::UnexpectedSnafu {
303 err_msg: format!(
304 "Reads the empty flow name in comparing operation of the updating flow, flow_id: {flow_id}"
305 ),
306 }
307 })?;
308
309 if remote_flow_flow_name.flow_id() != flow_id {
310 info!(
311 "Trying to updating flow {}.{}({}), but flow({}) already exists with a different flow id",
312 new_flow_info.catalog_name,
313 new_flow_info.flow_name,
314 flow_id,
315 remote_flow_flow_name.flow_id()
316 );
317
318 return error::UnexpectedSnafu {
319 err_msg: format!(
320 "Reads different flow id when updating flow({2}.{3}), prev flow id = {0}, updating with flow id = {1}",
321 remote_flow_flow_name.flow_id(),
322 flow_id,
323 new_flow_info.catalog_name,
324 new_flow_info.flow_name,
325 ),
326 }.fail();
327 }
328
329 let remote_flow =
330 on_create_flow_failure(&mut set)?.with_context(|| error::UnexpectedSnafu {
331 err_msg: format!(
332 "Reads the empty flow in comparing operation of the updating flow, flow_id: {flow_id}"
333 ),
334 })?;
335 let op_name = "updating flow";
336 ensure_values!(*remote_flow, new_flow_info.clone(), op_name);
337 }
338
339 Ok(())
340 }
341
342 fn flow_metadata_keys(&self, flow_id: FlowId, flow_value: &FlowInfoValue) -> Vec<Vec<u8>> {
343 let source_table_ids = flow_value.source_table_ids();
344 let mut keys =
345 Vec::with_capacity(2 + flow_value.flownode_ids.len() * (source_table_ids.len() + 2));
346 let flow_name = FlowNameKey::new(&flow_value.catalog_name, &flow_value.flow_name);
348 keys.push(flow_name.to_bytes());
349
350 let flow_info_key = FlowInfoKey::new(flow_id);
352 keys.push(flow_info_key.to_bytes());
353
354 flow_value
356 .flownode_ids
357 .iter()
358 .for_each(|(&partition_id, &flownode_id)| {
359 keys.push(FlownodeFlowKey::new(flownode_id, flow_id, partition_id).to_bytes());
360 keys.push(FlowRouteKey::new(flow_id, partition_id).to_bytes());
361 source_table_ids.iter().for_each(|&table_id| {
362 keys.push(
363 TableFlowKey::new(table_id, flownode_id, flow_id, partition_id).to_bytes(),
364 );
365 })
366 });
367 keys
368 }
369
370 pub async fn destroy_flow_metadata(
372 &self,
373 flow_id: FlowId,
374 flow_value: &FlowInfoValue,
375 ) -> Result<()> {
376 let keys = self.flow_metadata_keys(flow_id, flow_value);
377 let _ = self
378 .kv_backend
379 .batch_delete(BatchDeleteRequest::new().with_keys(keys))
380 .await?;
381 Ok(())
382 }
383}
384
385impl std::fmt::Debug for FlowMetadataManager {
386 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
387 f.debug_struct("FlowMetadataManager").finish()
388 }
389}
390
391#[cfg(test)]
392mod tests {
393 use std::assert_matches::assert_matches;
394 use std::collections::BTreeMap;
395 use std::sync::Arc;
396
397 use futures::TryStreamExt;
398 use table::metadata::TableId;
399 use table::table_name::TableName;
400
401 use super::*;
402 use crate::FlownodeId;
403 use crate::key::FlowPartitionId;
404 use crate::key::flow::table_flow::TableFlowKey;
405 use crate::kv_backend::memory::MemoryKvBackend;
406 use crate::peer::Peer;
407
408 #[derive(Debug)]
409 struct MockKey {
410 inner: Vec<u8>,
411 }
412
413 impl<'a> MetadataKey<'a, MockKey> for MockKey {
414 fn to_bytes(&self) -> Vec<u8> {
415 self.inner.clone()
416 }
417
418 fn from_bytes(bytes: &'a [u8]) -> Result<MockKey> {
419 Ok(MockKey {
420 inner: bytes.to_vec(),
421 })
422 }
423 }
424
425 #[test]
426 fn test_flow_scoped_to_bytes() {
427 let key = FlowScoped::new(MockKey {
428 inner: b"hi".to_vec(),
429 });
430 assert_eq!(b"__flow/hi".to_vec(), key.to_bytes());
431 }
432
433 #[test]
434 fn test_flow_scoped_from_bytes() {
435 let bytes = b"__flow/hi";
436 let key = FlowScoped::<MockKey>::from_bytes(bytes).unwrap();
437 assert_eq!(key.inner.inner, b"hi".to_vec());
438 }
439
440 #[test]
441 fn test_flow_scoped_from_bytes_mismatch() {
442 let bytes = b"__table/hi";
443 let err = FlowScoped::<MockKey>::from_bytes(bytes).unwrap_err();
444 assert_matches!(err, error::Error::MismatchPrefix { .. });
445 }
446
447 fn test_flow_info_value(
448 flow_name: &str,
449 flownode_ids: BTreeMap<FlowPartitionId, FlownodeId>,
450 source_table_ids: Vec<TableId>,
451 ) -> FlowInfoValue {
452 let catalog_name = "greptime";
453 let sink_table_name = TableName {
454 catalog_name: catalog_name.to_string(),
455 schema_name: "my_schema".to_string(),
456 table_name: "sink_table".to_string(),
457 };
458 FlowInfoValue {
459 catalog_name: catalog_name.to_string(),
460 query_context: None,
461 flow_name: flow_name.to_string(),
462 source_table_ids,
463 sink_table_name,
464 flownode_ids,
465 raw_sql: "raw".to_string(),
466 expire_after: Some(300),
467 eval_interval_secs: None,
468 comment: "hi".to_string(),
469 options: Default::default(),
470 created_time: chrono::Utc::now(),
471 updated_time: chrono::Utc::now(),
472 }
473 }
474
475 #[tokio::test]
476 async fn test_create_flow_metadata() {
477 let mem_kv = Arc::new(MemoryKvBackend::default());
478 let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
479 let flow_id = 10;
480 let flow_value = test_flow_info_value(
481 "flow",
482 [(0, 1u64), (1, 2u64)].into(),
483 vec![1024, 1025, 1026],
484 );
485 let flow_routes = vec![
486 (
487 1u32,
488 FlowRouteValue {
489 peer: Peer::empty(1),
490 },
491 ),
492 (
493 2,
494 FlowRouteValue {
495 peer: Peer::empty(2),
496 },
497 ),
498 ];
499 flow_metadata_manager
500 .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
501 .await
502 .unwrap();
503 flow_metadata_manager
505 .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
506 .await
507 .unwrap();
508 let got = flow_metadata_manager
509 .flow_info_manager()
510 .get(flow_id)
511 .await
512 .unwrap()
513 .unwrap();
514 let routes = flow_metadata_manager
515 .flow_route_manager()
516 .routes(flow_id)
517 .await
518 .unwrap();
519 assert_eq!(
520 routes,
521 vec![
522 (
523 FlowRouteKey::new(flow_id, 1),
524 FlowRouteValue {
525 peer: Peer::empty(1),
526 },
527 ),
528 (
529 FlowRouteKey::new(flow_id, 2),
530 FlowRouteValue {
531 peer: Peer::empty(2),
532 },
533 ),
534 ]
535 );
536 assert_eq!(got, flow_value);
537 let flows = flow_metadata_manager
538 .flownode_flow_manager()
539 .flows(1)
540 .try_collect::<Vec<_>>()
541 .await
542 .unwrap();
543 assert_eq!(flows, vec![(flow_id, 0)]);
544 for table_id in [1024, 1025, 1026] {
545 let nodes = flow_metadata_manager
546 .table_flow_manager()
547 .flows(table_id)
548 .await
549 .unwrap();
550 assert_eq!(
551 nodes,
552 vec![
553 (
554 TableFlowKey::new(table_id, 1, flow_id, 1),
555 TableFlowValue {
556 peer: Peer::empty(1)
557 }
558 ),
559 (
560 TableFlowKey::new(table_id, 2, flow_id, 2),
561 TableFlowValue {
562 peer: Peer::empty(2)
563 }
564 )
565 ]
566 );
567 }
568 }
569
570 #[tokio::test]
571 async fn test_create_flow_metadata_flow_exists_err() {
572 let mem_kv = Arc::new(MemoryKvBackend::default());
573 let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
574 let flow_id = 10;
575 let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
576 let flow_routes = vec![
577 (
578 1u32,
579 FlowRouteValue {
580 peer: Peer::empty(1),
581 },
582 ),
583 (
584 2,
585 FlowRouteValue {
586 peer: Peer::empty(2),
587 },
588 ),
589 ];
590 flow_metadata_manager
591 .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
592 .await
593 .unwrap();
594 let err = flow_metadata_manager
596 .create_flow_metadata(flow_id + 1, flow_value, flow_routes.clone())
597 .await
598 .unwrap_err();
599 assert_matches!(err, error::Error::FlowAlreadyExists { .. });
600 }
601
602 #[tokio::test]
603 async fn test_create_flow_metadata_unexpected_err() {
604 let mem_kv = Arc::new(MemoryKvBackend::default());
605 let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
606 let flow_id = 10;
607 let catalog_name = "greptime";
608 let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
609 let flow_routes = vec![
610 (
611 1u32,
612 FlowRouteValue {
613 peer: Peer::empty(1),
614 },
615 ),
616 (
617 2,
618 FlowRouteValue {
619 peer: Peer::empty(2),
620 },
621 ),
622 ];
623 flow_metadata_manager
624 .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
625 .await
626 .unwrap();
627 let another_sink_table_name = TableName {
629 catalog_name: catalog_name.to_string(),
630 schema_name: "my_schema".to_string(),
631 table_name: "another_sink_table".to_string(),
632 };
633 let flow_value = FlowInfoValue {
634 catalog_name: "greptime".to_string(),
635 query_context: None,
636 flow_name: "flow".to_string(),
637 source_table_ids: vec![1024, 1025, 1026],
638 sink_table_name: another_sink_table_name,
639 flownode_ids: [(0, 1u64)].into(),
640 raw_sql: "raw".to_string(),
641 expire_after: Some(300),
642 eval_interval_secs: None,
643 comment: "hi".to_string(),
644 options: Default::default(),
645 created_time: chrono::Utc::now(),
646 updated_time: chrono::Utc::now(),
647 };
648 let err = flow_metadata_manager
649 .create_flow_metadata(flow_id, flow_value, flow_routes.clone())
650 .await
651 .unwrap_err();
652 assert!(err.to_string().contains("Reads the different value"));
653 }
654
655 #[tokio::test]
656 async fn test_destroy_flow_metadata() {
657 let mem_kv = Arc::new(MemoryKvBackend::default());
658 let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
659 let flow_id = 10;
660 let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
661 let flow_routes = vec![(
662 0u32,
663 FlowRouteValue {
664 peer: Peer::empty(1),
665 },
666 )];
667 flow_metadata_manager
668 .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
669 .await
670 .unwrap();
671
672 flow_metadata_manager
673 .destroy_flow_metadata(flow_id, &flow_value)
674 .await
675 .unwrap();
676 flow_metadata_manager
678 .destroy_flow_metadata(flow_id, &flow_value)
679 .await
680 .unwrap();
681 assert!(mem_kv.is_empty())
683 }
684
685 #[tokio::test]
686 async fn test_update_flow_metadata() {
687 let mem_kv = Arc::new(MemoryKvBackend::default());
688 let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
689 let flow_id = 10;
690 let flow_value = test_flow_info_value(
691 "flow",
692 [(0, 1u64), (1, 2u64)].into(),
693 vec![1024, 1025, 1026],
694 );
695 let flow_routes = vec![
696 (
697 1u32,
698 FlowRouteValue {
699 peer: Peer::empty(1),
700 },
701 ),
702 (
703 2,
704 FlowRouteValue {
705 peer: Peer::empty(2),
706 },
707 ),
708 ];
709 flow_metadata_manager
710 .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
711 .await
712 .unwrap();
713
714 let new_flow_value = {
715 let mut tmp = flow_value.clone();
716 tmp.raw_sql = "new".to_string();
717 tmp
718 };
719
720 flow_metadata_manager
722 .update_flow_metadata(
723 flow_id,
724 &DeserializedValueWithBytes::from_inner(flow_value.clone()),
725 &new_flow_value,
726 flow_routes.clone(),
727 )
728 .await
729 .unwrap();
730
731 let got = flow_metadata_manager
732 .flow_info_manager()
733 .get(flow_id)
734 .await
735 .unwrap()
736 .unwrap();
737 let routes = flow_metadata_manager
738 .flow_route_manager()
739 .routes(flow_id)
740 .await
741 .unwrap();
742 assert_eq!(
743 routes,
744 vec![
745 (
746 FlowRouteKey::new(flow_id, 1),
747 FlowRouteValue {
748 peer: Peer::empty(1),
749 },
750 ),
751 (
752 FlowRouteKey::new(flow_id, 2),
753 FlowRouteValue {
754 peer: Peer::empty(2),
755 },
756 ),
757 ]
758 );
759 assert_eq!(got, new_flow_value);
760 let flows = flow_metadata_manager
761 .flownode_flow_manager()
762 .flows(1)
763 .try_collect::<Vec<_>>()
764 .await
765 .unwrap();
766 assert_eq!(flows, vec![(flow_id, 0)]);
767 for table_id in [1024, 1025, 1026] {
768 let nodes = flow_metadata_manager
769 .table_flow_manager()
770 .flows(table_id)
771 .await
772 .unwrap();
773 assert_eq!(
774 nodes,
775 vec![
776 (
777 TableFlowKey::new(table_id, 1, flow_id, 1),
778 TableFlowValue {
779 peer: Peer::empty(1)
780 }
781 ),
782 (
783 TableFlowKey::new(table_id, 2, flow_id, 2),
784 TableFlowValue {
785 peer: Peer::empty(2)
786 }
787 )
788 ]
789 );
790 }
791 }
792
793 #[tokio::test]
794 async fn test_update_flow_metadata_diff_flownode() {
795 let mem_kv = Arc::new(MemoryKvBackend::default());
796 let flow_metadata_manager = FlowMetadataManager::new(mem_kv.clone());
797 let flow_id = 10;
798 let flow_value = test_flow_info_value(
799 "flow",
800 [(0u32, 1u64), (1u32, 2u64)].into(),
801 vec![1024, 1025, 1026],
802 );
803 let flow_routes = vec![
804 (
805 0u32,
806 FlowRouteValue {
807 peer: Peer::empty(1),
808 },
809 ),
810 (
811 1,
812 FlowRouteValue {
813 peer: Peer::empty(2),
814 },
815 ),
816 ];
817 flow_metadata_manager
818 .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
819 .await
820 .unwrap();
821
822 let new_flow_value = {
823 let mut tmp = flow_value.clone();
824 tmp.raw_sql = "new".to_string();
825 tmp.flownode_ids = [(0, 3u64), (1, 4u64)].into();
827 tmp
828 };
829 let new_flow_routes = vec![
830 (
831 0u32,
832 FlowRouteValue {
833 peer: Peer::empty(3),
834 },
835 ),
836 (
837 1,
838 FlowRouteValue {
839 peer: Peer::empty(4),
840 },
841 ),
842 ];
843
844 flow_metadata_manager
846 .update_flow_metadata(
847 flow_id,
848 &DeserializedValueWithBytes::from_inner(flow_value.clone()),
849 &new_flow_value,
850 new_flow_routes.clone(),
851 )
852 .await
853 .unwrap();
854
855 let got = flow_metadata_manager
856 .flow_info_manager()
857 .get(flow_id)
858 .await
859 .unwrap()
860 .unwrap();
861 let routes = flow_metadata_manager
862 .flow_route_manager()
863 .routes(flow_id)
864 .await
865 .unwrap();
866 assert_eq!(
867 routes,
868 vec![
869 (
870 FlowRouteKey::new(flow_id, 0),
871 FlowRouteValue {
872 peer: Peer::empty(3),
873 },
874 ),
875 (
876 FlowRouteKey::new(flow_id, 1),
877 FlowRouteValue {
878 peer: Peer::empty(4),
879 },
880 ),
881 ]
882 );
883 assert_eq!(got, new_flow_value);
884
885 let flows = flow_metadata_manager
886 .flownode_flow_manager()
887 .flows(1)
888 .try_collect::<Vec<_>>()
889 .await
890 .unwrap();
891 assert_eq!(flows, vec![]);
893
894 let flows = flow_metadata_manager
895 .flownode_flow_manager()
896 .flows(3)
897 .try_collect::<Vec<_>>()
898 .await
899 .unwrap();
900 assert_eq!(flows, vec![(flow_id, 0)]);
901
902 for table_id in [1024, 1025, 1026] {
903 let nodes = flow_metadata_manager
904 .table_flow_manager()
905 .flows(table_id)
906 .await
907 .unwrap();
908 assert_eq!(
909 nodes,
910 vec![
911 (
912 TableFlowKey::new(table_id, 3, flow_id, 0),
913 TableFlowValue {
914 peer: Peer::empty(3)
915 }
916 ),
917 (
918 TableFlowKey::new(table_id, 4, flow_id, 1),
919 TableFlowValue {
920 peer: Peer::empty(4)
921 }
922 )
923 ]
924 );
925 }
926 }
927
928 #[tokio::test]
929 async fn test_update_flow_metadata_flow_replace_diff_id_err() {
930 let mem_kv = Arc::new(MemoryKvBackend::default());
931 let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
932 let flow_id = 10;
933 let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
934 let flow_routes = vec![
935 (
936 1u32,
937 FlowRouteValue {
938 peer: Peer::empty(1),
939 },
940 ),
941 (
942 2,
943 FlowRouteValue {
944 peer: Peer::empty(2),
945 },
946 ),
947 ];
948 flow_metadata_manager
949 .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
950 .await
951 .unwrap();
952 flow_metadata_manager
954 .update_flow_metadata(
955 flow_id,
956 &DeserializedValueWithBytes::from_inner(flow_value.clone()),
957 &flow_value,
958 flow_routes.clone(),
959 )
960 .await
961 .unwrap();
962 let err = flow_metadata_manager
964 .update_flow_metadata(
965 flow_id + 1,
966 &DeserializedValueWithBytes::from_inner(flow_value.clone()),
967 &flow_value,
968 flow_routes,
969 )
970 .await
971 .unwrap_err();
972 assert_matches!(err, error::Error::Unexpected { .. });
973 assert!(
974 err.to_string()
975 .contains("Reads different flow id when updating flow")
976 );
977 }
978
979 #[tokio::test]
980 async fn test_update_flow_metadata_unexpected_err_prev_value_diff() {
981 let mem_kv = Arc::new(MemoryKvBackend::default());
982 let flow_metadata_manager = FlowMetadataManager::new(mem_kv);
983 let flow_id = 10;
984 let catalog_name = "greptime";
985 let flow_value = test_flow_info_value("flow", [(0, 1u64)].into(), vec![1024, 1025, 1026]);
986 let flow_routes = vec![
987 (
988 1u32,
989 FlowRouteValue {
990 peer: Peer::empty(1),
991 },
992 ),
993 (
994 2,
995 FlowRouteValue {
996 peer: Peer::empty(2),
997 },
998 ),
999 ];
1000 flow_metadata_manager
1001 .create_flow_metadata(flow_id, flow_value.clone(), flow_routes.clone())
1002 .await
1003 .unwrap();
1004 let another_sink_table_name = TableName {
1006 catalog_name: catalog_name.to_string(),
1007 schema_name: "my_schema".to_string(),
1008 table_name: "another_sink_table".to_string(),
1009 };
1010 let flow_value = FlowInfoValue {
1011 catalog_name: "greptime".to_string(),
1012 query_context: None,
1013 flow_name: "flow".to_string(),
1014 source_table_ids: vec![1024, 1025, 1026],
1015 sink_table_name: another_sink_table_name,
1016 flownode_ids: [(0, 1u64)].into(),
1017 raw_sql: "raw".to_string(),
1018 expire_after: Some(300),
1019 eval_interval_secs: None,
1020 comment: "hi".to_string(),
1021 options: Default::default(),
1022 created_time: chrono::Utc::now(),
1023 updated_time: chrono::Utc::now(),
1024 };
1025 let err = flow_metadata_manager
1026 .update_flow_metadata(
1027 flow_id,
1028 &DeserializedValueWithBytes::from_inner(flow_value.clone()),
1029 &flow_value,
1030 flow_routes.clone(),
1031 )
1032 .await
1033 .unwrap_err();
1034 assert!(
1035 err.to_string().contains("Reads the different value"),
1036 "error: {:?}",
1037 err
1038 );
1039 }
1040}