1use api::v1::flow::FlowRequestHeader;
16use async_trait::async_trait;
17use common_error::ext::BoxedError;
18use common_function::handlers::FlowServiceHandler;
19use common_meta::key::flow::FlowMetadataManagerRef;
20use common_meta::node_manager::NodeManagerRef;
21use common_query::error::Result;
22use common_telemetry::tracing_context::TracingContext;
23use futures::StreamExt;
24use futures::stream::FuturesUnordered;
25use session::context::QueryContextRef;
26use snafu::{OptionExt, ResultExt};
27
28use crate::utils::to_meta_query_context;
29
30pub struct FlowServiceOperator {
32 flow_metadata_manager: FlowMetadataManagerRef,
33 node_manager: NodeManagerRef,
34}
35
36impl FlowServiceOperator {
37 pub fn new(
38 flow_metadata_manager: FlowMetadataManagerRef,
39 node_manager: NodeManagerRef,
40 ) -> Self {
41 Self {
42 flow_metadata_manager,
43 node_manager,
44 }
45 }
46
47 pub fn flow_metadata_manager(&self) -> FlowMetadataManagerRef {
48 self.flow_metadata_manager.clone()
49 }
50}
51
52#[async_trait]
53impl FlowServiceHandler for FlowServiceOperator {
54 async fn flush(
55 &self,
56 catalog: &str,
57 flow: &str,
58 ctx: QueryContextRef,
59 ) -> Result<api::v1::flow::FlowResponse> {
60 self.flush_inner(catalog, flow, ctx).await
61 }
62}
63
64impl FlowServiceOperator {
65 async fn flush_inner(
67 &self,
68 catalog: &str,
69 flow: &str,
70 ctx: QueryContextRef,
71 ) -> Result<api::v1::flow::FlowResponse> {
72 let id = self
73 .flow_metadata_manager
74 .flow_name_manager()
75 .get(catalog, flow)
76 .await
77 .map_err(BoxedError::new)
78 .context(common_query::error::ExecuteSnafu)?
79 .context(common_meta::error::FlowNotFoundSnafu {
80 flow_name: format!("{}.{}", catalog, flow),
81 })
82 .map_err(BoxedError::new)
83 .context(common_query::error::ExecuteSnafu)?
84 .flow_id();
85
86 let all_flownode_peers = self
87 .flow_metadata_manager
88 .flow_route_manager()
89 .routes(id)
90 .await
91 .map_err(BoxedError::new)
92 .context(common_query::error::ExecuteSnafu)?;
93
94 let all_flow_nodes = FuturesUnordered::from_iter(
96 all_flownode_peers
97 .iter()
98 .map(|(_key, peer)| self.node_manager.flownode(peer.peer())),
99 )
100 .collect::<Vec<_>>()
101 .await;
102
103 let mut final_result: Option<api::v1::flow::FlowResponse> = None;
104 for node in all_flow_nodes {
105 let res = {
106 use api::v1::flow::{FlowRequest, FlushFlow, flow_request};
107 let flush_req = FlowRequest {
108 header: Some(FlowRequestHeader {
109 tracing_context: TracingContext::from_current_span().to_w3c(),
110 query_context: Some(to_meta_query_context(ctx.clone()).into()),
111 }),
112 body: Some(flow_request::Body::Flush(FlushFlow {
113 flow_id: Some(api::v1::FlowId { id }),
114 })),
115 };
116 node.handle(flush_req)
117 .await
118 .map_err(BoxedError::new)
119 .context(common_query::error::ExecuteSnafu)?
120 };
121
122 if let Some(prev) = &mut final_result {
123 prev.affected_rows = res.affected_rows;
124 prev.affected_flows.extend(res.affected_flows);
125 prev.extensions.extend(res.extensions);
126 } else {
127 final_result = Some(res);
128 }
129 }
130
131 final_result.context(common_query::error::FlownodeNotFoundSnafu)
132 }
133}