1use api::v1::flow::{FlowRequest, FlowResponse};
16use api::v1::region::InsertRequests;
17use common_error::ext::BoxedError;
18use common_meta::node_manager::Flownode;
19use snafu::ResultExt;
20
21use crate::error::{FlowServerSnafu, Result};
22use crate::Client;
23
24#[derive(Debug)]
25pub struct FlowRequester {
26 client: Client,
27}
28
29#[async_trait::async_trait]
30impl Flownode for FlowRequester {
31 async fn handle(&self, request: FlowRequest) -> common_meta::error::Result<FlowResponse> {
32 self.handle_inner(request)
33 .await
34 .map_err(BoxedError::new)
35 .context(common_meta::error::ExternalSnafu)
36 }
37
38 async fn handle_inserts(
39 &self,
40 request: InsertRequests,
41 ) -> common_meta::error::Result<FlowResponse> {
42 self.handle_inserts_inner(request)
43 .await
44 .map_err(BoxedError::new)
45 .context(common_meta::error::ExternalSnafu)
46 }
47}
48
49impl FlowRequester {
50 pub fn new(client: Client) -> Self {
51 Self { client }
52 }
53
54 async fn handle_inner(&self, request: FlowRequest) -> Result<FlowResponse> {
55 let (addr, mut client) = self.client.raw_flow_client()?;
56
57 let response = client
58 .handle_create_remove(request)
59 .await
60 .or_else(|e| {
61 let code = e.code();
62 let err: crate::error::Error = e.into();
63 Err(BoxedError::new(err)).context(FlowServerSnafu { addr, code })
64 })?
65 .into_inner();
66 Ok(response)
67 }
68
69 async fn handle_inserts_inner(&self, request: InsertRequests) -> Result<FlowResponse> {
70 let (addr, mut client) = self.client.raw_flow_client()?;
71
72 let requests = api::v1::flow::InsertRequests {
73 requests: request
74 .requests
75 .into_iter()
76 .map(|insert| api::v1::flow::InsertRequest {
77 region_id: insert.region_id,
78 rows: insert.rows,
79 })
80 .collect(),
81 };
82
83 let response = client
84 .handle_mirror_request(requests)
85 .await
86 .or_else(|e| {
87 let code = e.code();
88 let err: crate::error::Error = e.into();
89 Err(BoxedError::new(err)).context(FlowServerSnafu { addr, code })
90 })?
91 .into_inner();
92 Ok(response)
93 }
94}