common_meta/ddl/test_util/
datanode_handler.rs1use api::region::RegionResponse;
16use api::v1::region::RegionRequest;
17use common_error::ext::{BoxedError, ErrorExt, StackError};
18use common_error::status_code::StatusCode;
19use common_query::request::QueryRequest;
20use common_recordbatch::SendableRecordBatchStream;
21use common_telemetry::debug;
22use snafu::{ResultExt, Snafu};
23use tokio::sync::mpsc;
24
25use crate::error::{self, Error, Result};
26use crate::peer::Peer;
27use crate::test_util::MockDatanodeHandler;
28
29#[async_trait::async_trait]
30impl MockDatanodeHandler for () {
31 async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
32 Ok(RegionResponse {
33 affected_rows: 0,
34 extensions: Default::default(),
35 })
36 }
37
38 async fn handle_query(
39 &self,
40 _peer: &Peer,
41 _request: QueryRequest,
42 ) -> Result<SendableRecordBatchStream> {
43 unreachable!()
44 }
45}
46
47#[derive(Clone)]
48pub struct DatanodeWatcher {
49 sender: mpsc::Sender<(Peer, RegionRequest)>,
50 handler: Option<fn(Peer, RegionRequest) -> Result<RegionResponse>>,
51}
52
53impl DatanodeWatcher {
54 pub fn new(sender: mpsc::Sender<(Peer, RegionRequest)>) -> Self {
55 Self {
56 sender,
57 handler: None,
58 }
59 }
60
61 pub fn with_handler(
62 mut self,
63 user_handler: fn(Peer, RegionRequest) -> Result<RegionResponse>,
64 ) -> Self {
65 self.handler = Some(user_handler);
66 self
67 }
68}
69
70#[async_trait::async_trait]
71impl MockDatanodeHandler for DatanodeWatcher {
72 async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
73 debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}");
74 self.sender
75 .send((peer.clone(), request.clone()))
76 .await
77 .unwrap();
78 if let Some(handler) = self.handler {
79 handler(peer.clone(), request)
80 } else {
81 Ok(RegionResponse::new(0))
82 }
83 }
84
85 async fn handle_query(
86 &self,
87 _peer: &Peer,
88 _request: QueryRequest,
89 ) -> Result<SendableRecordBatchStream> {
90 unreachable!()
91 }
92}
93
94#[derive(Clone)]
95pub struct RetryErrorDatanodeHandler;
96
97#[async_trait::async_trait]
98impl MockDatanodeHandler for RetryErrorDatanodeHandler {
99 async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
100 debug!("Returning retry later for request: {request:?}, peer: {peer:?}");
101 Err(Error::RetryLater {
102 source: BoxedError::new(
103 error::UnexpectedSnafu {
104 err_msg: "retry later",
105 }
106 .build(),
107 ),
108 clean_poisons: false,
109 })
110 }
111
112 async fn handle_query(
113 &self,
114 _peer: &Peer,
115 _request: QueryRequest,
116 ) -> Result<SendableRecordBatchStream> {
117 unreachable!()
118 }
119}
120
121#[derive(Clone)]
122pub struct UnexpectedErrorDatanodeHandler;
123
124#[async_trait::async_trait]
125impl MockDatanodeHandler for UnexpectedErrorDatanodeHandler {
126 async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
127 debug!("Returning mock error for request: {request:?}, peer: {peer:?}");
128 error::UnexpectedSnafu {
129 err_msg: "mock error",
130 }
131 .fail()
132 }
133
134 async fn handle_query(
135 &self,
136 _peer: &Peer,
137 _request: QueryRequest,
138 ) -> Result<SendableRecordBatchStream> {
139 unreachable!()
140 }
141}
142
143#[derive(Clone)]
144pub struct RequestOutdatedErrorDatanodeHandler;
145
146#[derive(Debug, Snafu)]
147#[snafu(display("A mock RequestOutdated error"))]
148struct MockRequestOutdatedError;
149
150impl StackError for MockRequestOutdatedError {
151 fn debug_fmt(&self, _: usize, _: &mut Vec<String>) {}
152
153 fn next(&self) -> Option<&dyn StackError> {
154 None
155 }
156}
157
158impl ErrorExt for MockRequestOutdatedError {
159 fn as_any(&self) -> &dyn std::any::Any {
160 self
161 }
162
163 fn status_code(&self) -> StatusCode {
164 StatusCode::RequestOutdated
165 }
166}
167
168#[async_trait::async_trait]
169impl MockDatanodeHandler for RequestOutdatedErrorDatanodeHandler {
170 async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
171 debug!("Returning mock error for request: {request:?}, peer: {peer:?}");
172 Err(BoxedError::new(MockRequestOutdatedError)).context(error::ExternalSnafu)
173 }
174
175 async fn handle_query(
176 &self,
177 _peer: &Peer,
178 _request: QueryRequest,
179 ) -> Result<SendableRecordBatchStream> {
180 unreachable!()
181 }
182}
183
184#[derive(Clone)]
185pub struct NaiveDatanodeHandler;
186
187#[async_trait::async_trait]
188impl MockDatanodeHandler for NaiveDatanodeHandler {
189 async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
190 debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}");
191 Ok(RegionResponse::new(0))
192 }
193
194 async fn handle_query(
195 &self,
196 _peer: &Peer,
197 _request: QueryRequest,
198 ) -> Result<SendableRecordBatchStream> {
199 unreachable!()
200 }
201}
202
203#[derive(Clone)]
204pub struct PartialSuccessDatanodeHandler {
205 pub retryable: bool,
206}
207
208#[async_trait::async_trait]
209impl MockDatanodeHandler for PartialSuccessDatanodeHandler {
210 async fn handle(&self, peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
211 let success = peer.id % 2 == 0;
212 if success {
213 Ok(RegionResponse::new(0))
214 } else if self.retryable {
215 Err(Error::RetryLater {
216 source: BoxedError::new(
217 error::UnexpectedSnafu {
218 err_msg: "retry later",
219 }
220 .build(),
221 ),
222 clean_poisons: false,
223 })
224 } else {
225 error::UnexpectedSnafu {
226 err_msg: "mock error",
227 }
228 .fail()
229 }
230 }
231
232 async fn handle_query(
233 &self,
234 _peer: &Peer,
235 _request: QueryRequest,
236 ) -> Result<SendableRecordBatchStream> {
237 unreachable!()
238 }
239}
240
241#[derive(Clone)]
242pub struct AllFailureDatanodeHandler {
243 pub retryable: bool,
244}
245
246#[async_trait::async_trait]
247impl MockDatanodeHandler for AllFailureDatanodeHandler {
248 async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
249 if self.retryable {
250 Err(Error::RetryLater {
251 source: BoxedError::new(
252 error::UnexpectedSnafu {
253 err_msg: "retry later",
254 }
255 .build(),
256 ),
257 clean_poisons: false,
258 })
259 } else {
260 error::UnexpectedSnafu {
261 err_msg: "mock error",
262 }
263 .fail()
264 }
265 }
266
267 async fn handle_query(
268 &self,
269 _peer: &Peer,
270 _request: QueryRequest,
271 ) -> Result<SendableRecordBatchStream> {
272 unreachable!()
273 }
274}