common_meta/ddl/test_util/
datanode_handler.rs1use api::region::RegionResponse;
16use api::v1::region::RegionRequest;
17use common_error::ext::{BoxedError, ErrorExt, StackError};
18use common_error::status_code::StatusCode;
19use common_query::request::QueryRequest;
20use common_recordbatch::SendableRecordBatchStream;
21use common_telemetry::debug;
22use snafu::{ResultExt, Snafu};
23use tokio::sync::mpsc;
24
25use crate::error::{self, Error, Result};
26use crate::peer::Peer;
27use crate::test_util::MockDatanodeHandler;
28
29#[async_trait::async_trait]
30impl MockDatanodeHandler for () {
31 async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
32 Ok(RegionResponse {
33 affected_rows: 0,
34 extensions: Default::default(),
35 })
36 }
37
38 async fn handle_query(
39 &self,
40 _peer: &Peer,
41 _request: QueryRequest,
42 ) -> Result<SendableRecordBatchStream> {
43 unreachable!()
44 }
45}
46
47#[derive(Clone)]
48pub struct DatanodeWatcher {
49 sender: mpsc::Sender<(Peer, RegionRequest)>,
50 handler: Option<fn(Peer, RegionRequest) -> Result<RegionResponse>>,
51}
52
53impl DatanodeWatcher {
54 pub fn new(sender: mpsc::Sender<(Peer, RegionRequest)>) -> Self {
55 Self {
56 sender,
57 handler: None,
58 }
59 }
60
61 pub fn with_handler(
62 mut self,
63 user_handler: fn(Peer, RegionRequest) -> Result<RegionResponse>,
64 ) -> Self {
65 self.handler = Some(user_handler);
66 self
67 }
68}
69
70#[async_trait::async_trait]
71impl MockDatanodeHandler for DatanodeWatcher {
72 async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
73 debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}");
74 self.sender
75 .send((peer.clone(), request.clone()))
76 .await
77 .unwrap();
78 if let Some(handler) = self.handler {
79 handler(peer.clone(), request)
80 } else {
81 Ok(RegionResponse::new(0))
82 }
83 }
84
85 async fn handle_query(
86 &self,
87 _peer: &Peer,
88 _request: QueryRequest,
89 ) -> Result<SendableRecordBatchStream> {
90 unreachable!()
91 }
92}
93
94#[derive(Clone)]
95pub struct RetryErrorDatanodeHandler;
96
97#[async_trait::async_trait]
98impl MockDatanodeHandler for RetryErrorDatanodeHandler {
99 async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
100 debug!("Returning retry later for request: {request:?}, peer: {peer:?}");
101 Err(Error::RetryLater {
102 source: BoxedError::new(
103 error::UnexpectedSnafu {
104 err_msg: "retry later",
105 }
106 .build(),
107 ),
108 })
109 }
110
111 async fn handle_query(
112 &self,
113 _peer: &Peer,
114 _request: QueryRequest,
115 ) -> Result<SendableRecordBatchStream> {
116 unreachable!()
117 }
118}
119
120#[derive(Clone)]
121pub struct UnexpectedErrorDatanodeHandler;
122
123#[async_trait::async_trait]
124impl MockDatanodeHandler for UnexpectedErrorDatanodeHandler {
125 async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
126 debug!("Returning mock error for request: {request:?}, peer: {peer:?}");
127 error::UnexpectedSnafu {
128 err_msg: "mock error",
129 }
130 .fail()
131 }
132
133 async fn handle_query(
134 &self,
135 _peer: &Peer,
136 _request: QueryRequest,
137 ) -> Result<SendableRecordBatchStream> {
138 unreachable!()
139 }
140}
141
142#[derive(Clone)]
143pub struct RequestOutdatedErrorDatanodeHandler;
144
145#[derive(Debug, Snafu)]
146#[snafu(display("A mock RequestOutdated error"))]
147struct MockRequestOutdatedError;
148
149impl StackError for MockRequestOutdatedError {
150 fn debug_fmt(&self, _: usize, _: &mut Vec<String>) {}
151
152 fn next(&self) -> Option<&dyn StackError> {
153 None
154 }
155}
156
157impl ErrorExt for MockRequestOutdatedError {
158 fn as_any(&self) -> &dyn std::any::Any {
159 self
160 }
161
162 fn status_code(&self) -> StatusCode {
163 StatusCode::RequestOutdated
164 }
165}
166
167#[async_trait::async_trait]
168impl MockDatanodeHandler for RequestOutdatedErrorDatanodeHandler {
169 async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
170 debug!("Returning mock error for request: {request:?}, peer: {peer:?}");
171 Err(BoxedError::new(MockRequestOutdatedError)).context(error::ExternalSnafu)
172 }
173
174 async fn handle_query(
175 &self,
176 _peer: &Peer,
177 _request: QueryRequest,
178 ) -> Result<SendableRecordBatchStream> {
179 unreachable!()
180 }
181}
182
183#[derive(Clone)]
184pub struct NaiveDatanodeHandler;
185
186#[async_trait::async_trait]
187impl MockDatanodeHandler for NaiveDatanodeHandler {
188 async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
189 debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}");
190 Ok(RegionResponse::new(0))
191 }
192
193 async fn handle_query(
194 &self,
195 _peer: &Peer,
196 _request: QueryRequest,
197 ) -> Result<SendableRecordBatchStream> {
198 unreachable!()
199 }
200}
201
202#[derive(Clone)]
203pub struct PartialSuccessDatanodeHandler {
204 pub retryable: bool,
205}
206
207#[async_trait::async_trait]
208impl MockDatanodeHandler for PartialSuccessDatanodeHandler {
209 async fn handle(&self, peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
210 let success = peer.id % 2 == 0;
211 if success {
212 Ok(RegionResponse::new(0))
213 } else if self.retryable {
214 Err(Error::RetryLater {
215 source: BoxedError::new(
216 error::UnexpectedSnafu {
217 err_msg: "retry later",
218 }
219 .build(),
220 ),
221 })
222 } else {
223 error::UnexpectedSnafu {
224 err_msg: "mock error",
225 }
226 .fail()
227 }
228 }
229
230 async fn handle_query(
231 &self,
232 _peer: &Peer,
233 _request: QueryRequest,
234 ) -> Result<SendableRecordBatchStream> {
235 unreachable!()
236 }
237}
238
239#[derive(Clone)]
240pub struct AllFailureDatanodeHandler {
241 pub retryable: bool,
242}
243
244#[async_trait::async_trait]
245impl MockDatanodeHandler for AllFailureDatanodeHandler {
246 async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
247 if self.retryable {
248 Err(Error::RetryLater {
249 source: BoxedError::new(
250 error::UnexpectedSnafu {
251 err_msg: "retry later",
252 }
253 .build(),
254 ),
255 })
256 } else {
257 error::UnexpectedSnafu {
258 err_msg: "mock error",
259 }
260 .fail()
261 }
262 }
263
264 async fn handle_query(
265 &self,
266 _peer: &Peer,
267 _request: QueryRequest,
268 ) -> Result<SendableRecordBatchStream> {
269 unreachable!()
270 }
271}