operator/
flow.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::flow::FlowRequestHeader;
16use async_trait::async_trait;
17use common_error::ext::BoxedError;
18use common_function::handlers::FlowServiceHandler;
19use common_meta::key::flow::FlowMetadataManagerRef;
20use common_meta::node_manager::NodeManagerRef;
21use common_query::error::Result;
22use common_telemetry::tracing_context::TracingContext;
23use futures::stream::FuturesUnordered;
24use futures::StreamExt;
25use session::context::QueryContextRef;
26use snafu::{OptionExt, ResultExt};
27
28/// The operator for flow service which implements [`FlowServiceHandler`].
29pub struct FlowServiceOperator {
30    flow_metadata_manager: FlowMetadataManagerRef,
31    node_manager: NodeManagerRef,
32}
33
34impl FlowServiceOperator {
35    pub fn new(
36        flow_metadata_manager: FlowMetadataManagerRef,
37        node_manager: NodeManagerRef,
38    ) -> Self {
39        Self {
40            flow_metadata_manager,
41            node_manager,
42        }
43    }
44
45    pub fn flow_metadata_manager(&self) -> FlowMetadataManagerRef {
46        self.flow_metadata_manager.clone()
47    }
48}
49
50#[async_trait]
51impl FlowServiceHandler for FlowServiceOperator {
52    async fn flush(
53        &self,
54        catalog: &str,
55        flow: &str,
56        ctx: QueryContextRef,
57    ) -> Result<api::v1::flow::FlowResponse> {
58        self.flush_inner(catalog, flow, ctx).await
59    }
60}
61
62impl FlowServiceOperator {
63    /// Flush the flownodes according to the flow id.
64    async fn flush_inner(
65        &self,
66        catalog: &str,
67        flow: &str,
68        ctx: QueryContextRef,
69    ) -> Result<api::v1::flow::FlowResponse> {
70        let id = self
71            .flow_metadata_manager
72            .flow_name_manager()
73            .get(catalog, flow)
74            .await
75            .map_err(BoxedError::new)
76            .context(common_query::error::ExecuteSnafu)?
77            .context(common_meta::error::FlowNotFoundSnafu {
78                flow_name: format!("{}.{}", catalog, flow),
79            })
80            .map_err(BoxedError::new)
81            .context(common_query::error::ExecuteSnafu)?
82            .flow_id();
83
84        let all_flownode_peers = self
85            .flow_metadata_manager
86            .flow_route_manager()
87            .routes(id)
88            .await
89            .map_err(BoxedError::new)
90            .context(common_query::error::ExecuteSnafu)?;
91
92        // order of flownodes doesn't matter here
93        let all_flow_nodes = FuturesUnordered::from_iter(
94            all_flownode_peers
95                .iter()
96                .map(|(_key, peer)| self.node_manager.flownode(peer.peer())),
97        )
98        .collect::<Vec<_>>()
99        .await;
100
101        let mut final_result: Option<api::v1::flow::FlowResponse> = None;
102        for node in all_flow_nodes {
103            let res = {
104                use api::v1::flow::{flow_request, FlowRequest, FlushFlow};
105                let flush_req = FlowRequest {
106                    header: Some(FlowRequestHeader {
107                        tracing_context: TracingContext::from_current_span().to_w3c(),
108                        query_context: Some(
109                            common_meta::rpc::ddl::QueryContext::from(ctx.clone()).into(),
110                        ),
111                    }),
112                    body: Some(flow_request::Body::Flush(FlushFlow {
113                        flow_id: Some(api::v1::FlowId { id }),
114                    })),
115                };
116                node.handle(flush_req)
117                    .await
118                    .map_err(BoxedError::new)
119                    .context(common_query::error::ExecuteSnafu)?
120            };
121
122            if let Some(prev) = &mut final_result {
123                prev.affected_rows = res.affected_rows;
124                prev.affected_flows.extend(res.affected_flows);
125                prev.extensions.extend(res.extensions);
126            } else {
127                final_result = Some(res);
128            }
129        }
130
131        final_result.context(common_query::error::FlownodeNotFoundSnafu)
132    }
133}