1use api::v1::flow::FlowRequestHeader;
16use async_trait::async_trait;
17use common_error::ext::BoxedError;
18use common_function::handlers::FlowServiceHandler;
19use common_meta::key::flow::FlowMetadataManagerRef;
20use common_meta::node_manager::NodeManagerRef;
21use common_query::error::Result;
22use common_telemetry::tracing_context::TracingContext;
23use futures::stream::FuturesUnordered;
24use futures::StreamExt;
25use session::context::QueryContextRef;
26use snafu::{OptionExt, ResultExt};
27
28pub struct FlowServiceOperator {
30 flow_metadata_manager: FlowMetadataManagerRef,
31 node_manager: NodeManagerRef,
32}
33
34impl FlowServiceOperator {
35 pub fn new(
36 flow_metadata_manager: FlowMetadataManagerRef,
37 node_manager: NodeManagerRef,
38 ) -> Self {
39 Self {
40 flow_metadata_manager,
41 node_manager,
42 }
43 }
44
45 pub fn flow_metadata_manager(&self) -> FlowMetadataManagerRef {
46 self.flow_metadata_manager.clone()
47 }
48}
49
50#[async_trait]
51impl FlowServiceHandler for FlowServiceOperator {
52 async fn flush(
53 &self,
54 catalog: &str,
55 flow: &str,
56 ctx: QueryContextRef,
57 ) -> Result<api::v1::flow::FlowResponse> {
58 self.flush_inner(catalog, flow, ctx).await
59 }
60}
61
62impl FlowServiceOperator {
63 async fn flush_inner(
65 &self,
66 catalog: &str,
67 flow: &str,
68 ctx: QueryContextRef,
69 ) -> Result<api::v1::flow::FlowResponse> {
70 let id = self
71 .flow_metadata_manager
72 .flow_name_manager()
73 .get(catalog, flow)
74 .await
75 .map_err(BoxedError::new)
76 .context(common_query::error::ExecuteSnafu)?
77 .context(common_meta::error::FlowNotFoundSnafu {
78 flow_name: format!("{}.{}", catalog, flow),
79 })
80 .map_err(BoxedError::new)
81 .context(common_query::error::ExecuteSnafu)?
82 .flow_id();
83
84 let all_flownode_peers = self
85 .flow_metadata_manager
86 .flow_route_manager()
87 .routes(id)
88 .await
89 .map_err(BoxedError::new)
90 .context(common_query::error::ExecuteSnafu)?;
91
92 let all_flow_nodes = FuturesUnordered::from_iter(
94 all_flownode_peers
95 .iter()
96 .map(|(_key, peer)| self.node_manager.flownode(peer.peer())),
97 )
98 .collect::<Vec<_>>()
99 .await;
100
101 let mut final_result: Option<api::v1::flow::FlowResponse> = None;
102 for node in all_flow_nodes {
103 let res = {
104 use api::v1::flow::{flow_request, FlowRequest, FlushFlow};
105 let flush_req = FlowRequest {
106 header: Some(FlowRequestHeader {
107 tracing_context: TracingContext::from_current_span().to_w3c(),
108 query_context: Some(
109 common_meta::rpc::ddl::QueryContext::from(ctx.clone()).into(),
110 ),
111 }),
112 body: Some(flow_request::Body::Flush(FlushFlow {
113 flow_id: Some(api::v1::FlowId { id }),
114 })),
115 };
116 node.handle(flush_req)
117 .await
118 .map_err(BoxedError::new)
119 .context(common_query::error::ExecuteSnafu)?
120 };
121
122 if let Some(prev) = &mut final_result {
123 prev.affected_rows = res.affected_rows;
124 prev.affected_flows.extend(res.affected_flows);
125 prev.extensions.extend(res.extensions);
126 } else {
127 final_result = Some(res);
128 }
129 }
130
131 final_result.context(common_query::error::FlownodeNotFoundSnafu)
132 }
133}