1use api::v1::flow::{DirtyWindowRequests, FlowRequest, FlowResponse};
16use api::v1::region::InsertRequests;
17use common_error::ext::BoxedError;
18use common_meta::node_manager::Flownode;
19use snafu::ResultExt;
20
21use crate::Client;
22use crate::error::{FlowServerSnafu, Result};
23
24#[derive(Debug)]
25pub struct FlowRequester {
26 client: Client,
27}
28
29#[async_trait::async_trait]
30impl Flownode for FlowRequester {
31 async fn handle(&self, request: FlowRequest) -> common_meta::error::Result<FlowResponse> {
32 self.handle_inner(request)
33 .await
34 .map_err(BoxedError::new)
35 .context(common_meta::error::ExternalSnafu)
36 }
37
38 async fn handle_inserts(
39 &self,
40 request: InsertRequests,
41 ) -> common_meta::error::Result<FlowResponse> {
42 self.handle_inserts_inner(request)
43 .await
44 .map_err(BoxedError::new)
45 .context(common_meta::error::ExternalSnafu)
46 }
47
48 async fn handle_mark_window_dirty(
49 &self,
50 req: DirtyWindowRequests,
51 ) -> common_meta::error::Result<FlowResponse> {
52 self.handle_mark_window_dirty(req)
53 .await
54 .map_err(BoxedError::new)
55 .context(common_meta::error::ExternalSnafu)
56 }
57}
58
59impl FlowRequester {
60 pub fn new(client: Client) -> Self {
61 Self { client }
62 }
63
64 async fn handle_inner(&self, request: FlowRequest) -> Result<FlowResponse> {
65 let (addr, mut client) = self.client.raw_flow_client()?;
66
67 let response = client
68 .handle_create_remove(request)
69 .await
70 .or_else(|e| {
71 let code = e.code();
72 let err: crate::error::Error = e.into();
73 Err(BoxedError::new(err)).context(FlowServerSnafu { addr, code })
74 })?
75 .into_inner();
76 Ok(response)
77 }
78
79 async fn handle_inserts_inner(&self, request: InsertRequests) -> Result<FlowResponse> {
80 let (addr, mut client) = self.client.raw_flow_client()?;
81
82 let requests = api::v1::flow::InsertRequests {
83 requests: request
84 .requests
85 .into_iter()
86 .map(|insert| api::v1::flow::InsertRequest {
87 region_id: insert.region_id,
88 rows: insert.rows,
89 partition_expr_version: insert.partition_expr_version,
90 })
91 .collect(),
92 };
93
94 let response = client
95 .handle_mirror_request(requests)
96 .await
97 .or_else(|e| {
98 let code = e.code();
99 let err: crate::error::Error = e.into();
100 Err(BoxedError::new(err)).context(FlowServerSnafu { addr, code })
101 })?
102 .into_inner();
103 Ok(response)
104 }
105
106 async fn handle_mark_window_dirty(&self, req: DirtyWindowRequests) -> Result<FlowResponse> {
107 let (addr, mut client) = self.client.raw_flow_client()?;
108 let response = client
109 .handle_mark_dirty_time_window(req)
110 .await
111 .or_else(|e| {
112 let code = e.code();
113 let err: crate::error::Error = e.into();
114 Err(BoxedError::new(err)).context(FlowServerSnafu { addr, code })
115 })?
116 .into_inner();
117 Ok(response)
118 }
119}