client/
flow.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::v1::flow::{FlowRequest, FlowResponse};
16use api::v1::region::InsertRequests;
17use common_error::ext::BoxedError;
18use common_meta::node_manager::Flownode;
19use snafu::ResultExt;
20
21use crate::error::{FlowServerSnafu, Result};
22use crate::Client;
23
24#[derive(Debug)]
25pub struct FlowRequester {
26    client: Client,
27}
28
29#[async_trait::async_trait]
30impl Flownode for FlowRequester {
31    async fn handle(&self, request: FlowRequest) -> common_meta::error::Result<FlowResponse> {
32        self.handle_inner(request)
33            .await
34            .map_err(BoxedError::new)
35            .context(common_meta::error::ExternalSnafu)
36    }
37
38    async fn handle_inserts(
39        &self,
40        request: InsertRequests,
41    ) -> common_meta::error::Result<FlowResponse> {
42        self.handle_inserts_inner(request)
43            .await
44            .map_err(BoxedError::new)
45            .context(common_meta::error::ExternalSnafu)
46    }
47}
48
49impl FlowRequester {
50    pub fn new(client: Client) -> Self {
51        Self { client }
52    }
53
54    async fn handle_inner(&self, request: FlowRequest) -> Result<FlowResponse> {
55        let (addr, mut client) = self.client.raw_flow_client()?;
56
57        let response = client
58            .handle_create_remove(request)
59            .await
60            .or_else(|e| {
61                let code = e.code();
62                let err: crate::error::Error = e.into();
63                Err(BoxedError::new(err)).context(FlowServerSnafu { addr, code })
64            })?
65            .into_inner();
66        Ok(response)
67    }
68
69    async fn handle_inserts_inner(&self, request: InsertRequests) -> Result<FlowResponse> {
70        let (addr, mut client) = self.client.raw_flow_client()?;
71
72        let requests = api::v1::flow::InsertRequests {
73            requests: request
74                .requests
75                .into_iter()
76                .map(|insert| api::v1::flow::InsertRequest {
77                    region_id: insert.region_id,
78                    rows: insert.rows,
79                })
80                .collect(),
81        };
82
83        let response = client
84            .handle_mirror_request(requests)
85            .await
86            .or_else(|e| {
87                let code = e.code();
88                let err: crate::error::Error = e.into();
89                Err(BoxedError::new(err)).context(FlowServerSnafu { addr, code })
90            })?
91            .into_inner();
92        Ok(response)
93    }
94}