common_meta/ddl/test_util/
datanode_handler.rs1use api::region::RegionResponse;
16use api::v1::region::RegionRequest;
17use common_error::ext::{BoxedError, ErrorExt, StackError};
18use common_error::status_code::StatusCode;
19use common_query::request::QueryRequest;
20use common_recordbatch::SendableRecordBatchStream;
21use common_telemetry::debug;
22use snafu::{ResultExt, Snafu};
23use tokio::sync::mpsc;
24
25use crate::error::{self, Error, Result};
26use crate::peer::Peer;
27use crate::test_util::MockDatanodeHandler;
28
29#[async_trait::async_trait]
30impl MockDatanodeHandler for () {
31 async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
32 Ok(RegionResponse {
33 affected_rows: 0,
34 extensions: Default::default(),
35 metadata: Vec::new(),
36 })
37 }
38
39 async fn handle_query(
40 &self,
41 _peer: &Peer,
42 _request: QueryRequest,
43 ) -> Result<SendableRecordBatchStream> {
44 unreachable!()
45 }
46}
47
48#[derive(Clone)]
49pub struct DatanodeWatcher {
50 sender: mpsc::Sender<(Peer, RegionRequest)>,
51 handler: Option<fn(Peer, RegionRequest) -> Result<RegionResponse>>,
52}
53
54impl DatanodeWatcher {
55 pub fn new(sender: mpsc::Sender<(Peer, RegionRequest)>) -> Self {
56 Self {
57 sender,
58 handler: None,
59 }
60 }
61
62 pub fn with_handler(
63 mut self,
64 user_handler: fn(Peer, RegionRequest) -> Result<RegionResponse>,
65 ) -> Self {
66 self.handler = Some(user_handler);
67 self
68 }
69}
70
71#[async_trait::async_trait]
72impl MockDatanodeHandler for DatanodeWatcher {
73 async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
74 debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}");
75 self.sender
76 .send((peer.clone(), request.clone()))
77 .await
78 .unwrap();
79 if let Some(handler) = self.handler {
80 handler(peer.clone(), request)
81 } else {
82 Ok(RegionResponse::new(0))
83 }
84 }
85
86 async fn handle_query(
87 &self,
88 _peer: &Peer,
89 _request: QueryRequest,
90 ) -> Result<SendableRecordBatchStream> {
91 unreachable!()
92 }
93}
94
95#[derive(Clone)]
96pub struct RetryErrorDatanodeHandler;
97
98#[async_trait::async_trait]
99impl MockDatanodeHandler for RetryErrorDatanodeHandler {
100 async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
101 debug!("Returning retry later for request: {request:?}, peer: {peer:?}");
102 Err(Error::RetryLater {
103 source: BoxedError::new(
104 error::UnexpectedSnafu {
105 err_msg: "retry later",
106 }
107 .build(),
108 ),
109 clean_poisons: false,
110 })
111 }
112
113 async fn handle_query(
114 &self,
115 _peer: &Peer,
116 _request: QueryRequest,
117 ) -> Result<SendableRecordBatchStream> {
118 unreachable!()
119 }
120}
121
122#[derive(Clone)]
123pub struct UnexpectedErrorDatanodeHandler;
124
125#[async_trait::async_trait]
126impl MockDatanodeHandler for UnexpectedErrorDatanodeHandler {
127 async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
128 debug!("Returning mock error for request: {request:?}, peer: {peer:?}");
129 error::UnexpectedSnafu {
130 err_msg: "mock error",
131 }
132 .fail()
133 }
134
135 async fn handle_query(
136 &self,
137 _peer: &Peer,
138 _request: QueryRequest,
139 ) -> Result<SendableRecordBatchStream> {
140 unreachable!()
141 }
142}
143
144#[derive(Clone)]
145pub struct RequestOutdatedErrorDatanodeHandler;
146
147#[derive(Debug, Snafu)]
148#[snafu(display("A mock RequestOutdated error"))]
149struct MockRequestOutdatedError;
150
151impl StackError for MockRequestOutdatedError {
152 fn debug_fmt(&self, _: usize, _: &mut Vec<String>) {}
153
154 fn next(&self) -> Option<&dyn StackError> {
155 None
156 }
157}
158
159impl ErrorExt for MockRequestOutdatedError {
160 fn as_any(&self) -> &dyn std::any::Any {
161 self
162 }
163
164 fn status_code(&self) -> StatusCode {
165 StatusCode::RequestOutdated
166 }
167}
168
169#[async_trait::async_trait]
170impl MockDatanodeHandler for RequestOutdatedErrorDatanodeHandler {
171 async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
172 debug!("Returning mock error for request: {request:?}, peer: {peer:?}");
173 Err(BoxedError::new(MockRequestOutdatedError)).context(error::ExternalSnafu)
174 }
175
176 async fn handle_query(
177 &self,
178 _peer: &Peer,
179 _request: QueryRequest,
180 ) -> Result<SendableRecordBatchStream> {
181 unreachable!()
182 }
183}
184
185#[derive(Clone)]
186pub struct NaiveDatanodeHandler;
187
188#[async_trait::async_trait]
189impl MockDatanodeHandler for NaiveDatanodeHandler {
190 async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
191 debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}");
192 Ok(RegionResponse::new(0))
193 }
194
195 async fn handle_query(
196 &self,
197 _peer: &Peer,
198 _request: QueryRequest,
199 ) -> Result<SendableRecordBatchStream> {
200 unreachable!()
201 }
202}
203
204#[derive(Clone)]
205pub struct PartialSuccessDatanodeHandler {
206 pub retryable: bool,
207}
208
209#[async_trait::async_trait]
210impl MockDatanodeHandler for PartialSuccessDatanodeHandler {
211 async fn handle(&self, peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
212 let success = peer.id % 2 == 0;
213 if success {
214 Ok(RegionResponse::new(0))
215 } else if self.retryable {
216 Err(Error::RetryLater {
217 source: BoxedError::new(
218 error::UnexpectedSnafu {
219 err_msg: "retry later",
220 }
221 .build(),
222 ),
223 clean_poisons: false,
224 })
225 } else {
226 error::UnexpectedSnafu {
227 err_msg: "mock error",
228 }
229 .fail()
230 }
231 }
232
233 async fn handle_query(
234 &self,
235 _peer: &Peer,
236 _request: QueryRequest,
237 ) -> Result<SendableRecordBatchStream> {
238 unreachable!()
239 }
240}
241
242#[derive(Clone)]
243pub struct AllFailureDatanodeHandler {
244 pub retryable: bool,
245}
246
247#[async_trait::async_trait]
248impl MockDatanodeHandler for AllFailureDatanodeHandler {
249 async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
250 if self.retryable {
251 Err(Error::RetryLater {
252 source: BoxedError::new(
253 error::UnexpectedSnafu {
254 err_msg: "retry later",
255 }
256 .build(),
257 ),
258 clean_poisons: false,
259 })
260 } else {
261 error::UnexpectedSnafu {
262 err_msg: "mock error",
263 }
264 .fail()
265 }
266 }
267
268 async fn handle_query(
269 &self,
270 _peer: &Peer,
271 _request: QueryRequest,
272 ) -> Result<SendableRecordBatchStream> {
273 unreachable!()
274 }
275}