client/
flow.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::flow::{DirtyWindowRequest, DirtyWindowRequests, FlowRequest, FlowResponse};
16use api::v1::region::InsertRequests;
17use common_error::ext::BoxedError;
18use common_meta::node_manager::Flownode;
19use snafu::ResultExt;
20
21use crate::error::{FlowServerSnafu, Result};
22use crate::Client;
23
24#[derive(Debug)]
25pub struct FlowRequester {
26    client: Client,
27}
28
29#[async_trait::async_trait]
30impl Flownode for FlowRequester {
31    async fn handle(&self, request: FlowRequest) -> common_meta::error::Result<FlowResponse> {
32        self.handle_inner(request)
33            .await
34            .map_err(BoxedError::new)
35            .context(common_meta::error::ExternalSnafu)
36    }
37
38    async fn handle_inserts(
39        &self,
40        request: InsertRequests,
41    ) -> common_meta::error::Result<FlowResponse> {
42        self.handle_inserts_inner(request)
43            .await
44            .map_err(BoxedError::new)
45            .context(common_meta::error::ExternalSnafu)
46    }
47
48    async fn handle_mark_window_dirty(
49        &self,
50        req: DirtyWindowRequest,
51    ) -> common_meta::error::Result<FlowResponse> {
52        self.handle_mark_window_dirty(req)
53            .await
54            .map_err(BoxedError::new)
55            .context(common_meta::error::ExternalSnafu)
56    }
57}
58
59impl FlowRequester {
60    pub fn new(client: Client) -> Self {
61        Self { client }
62    }
63
64    async fn handle_inner(&self, request: FlowRequest) -> Result<FlowResponse> {
65        let (addr, mut client) = self.client.raw_flow_client()?;
66
67        let response = client
68            .handle_create_remove(request)
69            .await
70            .or_else(|e| {
71                let code = e.code();
72                let err: crate::error::Error = e.into();
73                Err(BoxedError::new(err)).context(FlowServerSnafu { addr, code })
74            })?
75            .into_inner();
76        Ok(response)
77    }
78
79    async fn handle_inserts_inner(&self, request: InsertRequests) -> Result<FlowResponse> {
80        let (addr, mut client) = self.client.raw_flow_client()?;
81
82        let requests = api::v1::flow::InsertRequests {
83            requests: request
84                .requests
85                .into_iter()
86                .map(|insert| api::v1::flow::InsertRequest {
87                    region_id: insert.region_id,
88                    rows: insert.rows,
89                })
90                .collect(),
91        };
92
93        let response = client
94            .handle_mirror_request(requests)
95            .await
96            .or_else(|e| {
97                let code = e.code();
98                let err: crate::error::Error = e.into();
99                Err(BoxedError::new(err)).context(FlowServerSnafu { addr, code })
100            })?
101            .into_inner();
102        Ok(response)
103    }
104
105    async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse> {
106        let (addr, mut client) = self.client.raw_flow_client()?;
107        let response = client
108            .handle_mark_dirty_time_window(DirtyWindowRequests {
109                requests: vec![req],
110            })
111            .await
112            .or_else(|e| {
113                let code = e.code();
114                let err: crate::error::Error = e.into();
115                Err(BoxedError::new(err)).context(FlowServerSnafu { addr, code })
116            })?
117            .into_inner();
118        Ok(response)
119    }
120}