operator/
flow.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::flow::FlowRequestHeader;
16use async_trait::async_trait;
17use common_error::ext::BoxedError;
18use common_function::handlers::FlowServiceHandler;
19use common_meta::key::flow::FlowMetadataManagerRef;
20use common_meta::node_manager::NodeManagerRef;
21use common_query::error::Result;
22use common_telemetry::tracing_context::TracingContext;
23use futures::StreamExt;
24use futures::stream::FuturesUnordered;
25use session::context::QueryContextRef;
26use snafu::{OptionExt, ResultExt};
27
28use crate::utils::to_meta_query_context;
29
30/// The operator for flow service which implements [`FlowServiceHandler`].
31pub struct FlowServiceOperator {
32    flow_metadata_manager: FlowMetadataManagerRef,
33    node_manager: NodeManagerRef,
34}
35
36impl FlowServiceOperator {
37    pub fn new(
38        flow_metadata_manager: FlowMetadataManagerRef,
39        node_manager: NodeManagerRef,
40    ) -> Self {
41        Self {
42            flow_metadata_manager,
43            node_manager,
44        }
45    }
46
47    pub fn flow_metadata_manager(&self) -> FlowMetadataManagerRef {
48        self.flow_metadata_manager.clone()
49    }
50}
51
52#[async_trait]
53impl FlowServiceHandler for FlowServiceOperator {
54    async fn flush(
55        &self,
56        catalog: &str,
57        flow: &str,
58        ctx: QueryContextRef,
59    ) -> Result<api::v1::flow::FlowResponse> {
60        self.flush_inner(catalog, flow, ctx).await
61    }
62}
63
64impl FlowServiceOperator {
65    /// Flush the flownodes according to the flow id.
66    async fn flush_inner(
67        &self,
68        catalog: &str,
69        flow: &str,
70        ctx: QueryContextRef,
71    ) -> Result<api::v1::flow::FlowResponse> {
72        let id = self
73            .flow_metadata_manager
74            .flow_name_manager()
75            .get(catalog, flow)
76            .await
77            .map_err(BoxedError::new)
78            .context(common_query::error::ExecuteSnafu)?
79            .context(common_meta::error::FlowNotFoundSnafu {
80                flow_name: format!("{}.{}", catalog, flow),
81            })
82            .map_err(BoxedError::new)
83            .context(common_query::error::ExecuteSnafu)?
84            .flow_id();
85
86        let all_flownode_peers = self
87            .flow_metadata_manager
88            .flow_route_manager()
89            .routes(id)
90            .await
91            .map_err(BoxedError::new)
92            .context(common_query::error::ExecuteSnafu)?;
93
94        // order of flownodes doesn't matter here
95        let all_flow_nodes = FuturesUnordered::from_iter(
96            all_flownode_peers
97                .iter()
98                .map(|(_key, peer)| self.node_manager.flownode(peer.peer())),
99        )
100        .collect::<Vec<_>>()
101        .await;
102
103        let mut final_result: Option<api::v1::flow::FlowResponse> = None;
104        for node in all_flow_nodes {
105            let res = {
106                use api::v1::flow::{FlowRequest, FlushFlow, flow_request};
107                let flush_req = FlowRequest {
108                    header: Some(FlowRequestHeader {
109                        tracing_context: TracingContext::from_current_span().to_w3c(),
110                        query_context: Some(to_meta_query_context(ctx.clone()).into()),
111                    }),
112                    body: Some(flow_request::Body::Flush(FlushFlow {
113                        flow_id: Some(api::v1::FlowId { id }),
114                    })),
115                };
116                node.handle(flush_req)
117                    .await
118                    .map_err(BoxedError::new)
119                    .context(common_query::error::ExecuteSnafu)?
120            };
121
122            if let Some(prev) = &mut final_result {
123                prev.affected_rows = res.affected_rows;
124                prev.affected_flows.extend(res.affected_flows);
125                prev.extensions.extend(res.extensions);
126            } else {
127                final_result = Some(res);
128            }
129        }
130
131        final_result.context(common_query::error::FlownodeNotFoundSnafu)
132    }
133}