1use api::v1::flow::{DirtyWindowRequest, DirtyWindowRequests, FlowRequest, FlowResponse};
16use api::v1::region::InsertRequests;
17use common_error::ext::BoxedError;
18use common_meta::node_manager::Flownode;
19use snafu::ResultExt;
20
21use crate::error::{FlowServerSnafu, Result};
22use crate::Client;
23
24#[derive(Debug)]
25pub struct FlowRequester {
26 client: Client,
27}
28
29#[async_trait::async_trait]
30impl Flownode for FlowRequester {
31 async fn handle(&self, request: FlowRequest) -> common_meta::error::Result<FlowResponse> {
32 self.handle_inner(request)
33 .await
34 .map_err(BoxedError::new)
35 .context(common_meta::error::ExternalSnafu)
36 }
37
38 async fn handle_inserts(
39 &self,
40 request: InsertRequests,
41 ) -> common_meta::error::Result<FlowResponse> {
42 self.handle_inserts_inner(request)
43 .await
44 .map_err(BoxedError::new)
45 .context(common_meta::error::ExternalSnafu)
46 }
47
48 async fn handle_mark_window_dirty(
49 &self,
50 req: DirtyWindowRequest,
51 ) -> common_meta::error::Result<FlowResponse> {
52 self.handle_mark_window_dirty(req)
53 .await
54 .map_err(BoxedError::new)
55 .context(common_meta::error::ExternalSnafu)
56 }
57}
58
59impl FlowRequester {
60 pub fn new(client: Client) -> Self {
61 Self { client }
62 }
63
64 async fn handle_inner(&self, request: FlowRequest) -> Result<FlowResponse> {
65 let (addr, mut client) = self.client.raw_flow_client()?;
66
67 let response = client
68 .handle_create_remove(request)
69 .await
70 .or_else(|e| {
71 let code = e.code();
72 let err: crate::error::Error = e.into();
73 Err(BoxedError::new(err)).context(FlowServerSnafu { addr, code })
74 })?
75 .into_inner();
76 Ok(response)
77 }
78
79 async fn handle_inserts_inner(&self, request: InsertRequests) -> Result<FlowResponse> {
80 let (addr, mut client) = self.client.raw_flow_client()?;
81
82 let requests = api::v1::flow::InsertRequests {
83 requests: request
84 .requests
85 .into_iter()
86 .map(|insert| api::v1::flow::InsertRequest {
87 region_id: insert.region_id,
88 rows: insert.rows,
89 })
90 .collect(),
91 };
92
93 let response = client
94 .handle_mirror_request(requests)
95 .await
96 .or_else(|e| {
97 let code = e.code();
98 let err: crate::error::Error = e.into();
99 Err(BoxedError::new(err)).context(FlowServerSnafu { addr, code })
100 })?
101 .into_inner();
102 Ok(response)
103 }
104
105 async fn handle_mark_window_dirty(&self, req: DirtyWindowRequest) -> Result<FlowResponse> {
106 let (addr, mut client) = self.client.raw_flow_client()?;
107 let response = client
108 .handle_mark_dirty_time_window(DirtyWindowRequests {
109 requests: vec![req],
110 })
111 .await
112 .or_else(|e| {
113 let code = e.code();
114 let err: crate::error::Error = e.into();
115 Err(BoxedError::new(err)).context(FlowServerSnafu { addr, code })
116 })?
117 .into_inner();
118 Ok(response)
119 }
120}