common_meta/ddl/test_util/
datanode_handler.rs1use std::collections::HashMap;
16use std::sync::Arc;
17
18use api::region::RegionResponse;
19use api::v1::region::region_request::Body;
20use api::v1::region::RegionRequest;
21use common_error::ext::{BoxedError, ErrorExt, StackError};
22use common_error::status_code::StatusCode;
23use common_query::request::QueryRequest;
24use common_recordbatch::SendableRecordBatchStream;
25use common_telemetry::debug;
26use snafu::{ResultExt, Snafu};
27use store_api::metadata::RegionMetadata;
28use store_api::storage::RegionId;
29use tokio::sync::mpsc;
30
31use crate::error::{self, Error, Result};
32use crate::peer::Peer;
33use crate::test_util::MockDatanodeHandler;
34
35#[async_trait::async_trait]
36impl MockDatanodeHandler for () {
37 async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
38 Ok(RegionResponse {
39 affected_rows: 0,
40 extensions: Default::default(),
41 metadata: Vec::new(),
42 })
43 }
44
45 async fn handle_query(
46 &self,
47 _peer: &Peer,
48 _request: QueryRequest,
49 ) -> Result<SendableRecordBatchStream> {
50 unreachable!()
51 }
52}
53
54type RegionRequestHandler =
55 Arc<dyn Fn(Peer, RegionRequest) -> Result<RegionResponse> + Send + Sync>;
56
57#[derive(Clone)]
58pub struct DatanodeWatcher {
59 sender: mpsc::Sender<(Peer, RegionRequest)>,
60 handler: Option<RegionRequestHandler>,
61}
62
63impl DatanodeWatcher {
64 pub fn new(sender: mpsc::Sender<(Peer, RegionRequest)>) -> Self {
65 Self {
66 sender,
67 handler: None,
68 }
69 }
70
71 pub fn with_handler(
72 mut self,
73 user_handler: impl Fn(Peer, RegionRequest) -> Result<RegionResponse> + Send + Sync + 'static,
74 ) -> Self {
75 self.handler = Some(Arc::new(user_handler));
76 self
77 }
78}
79
80#[async_trait::async_trait]
81impl MockDatanodeHandler for DatanodeWatcher {
82 async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
83 debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}");
84 self.sender
85 .send((peer.clone(), request.clone()))
86 .await
87 .unwrap();
88 if let Some(handler) = self.handler.as_ref() {
89 handler(peer.clone(), request)
90 } else {
91 Ok(RegionResponse::new(0))
92 }
93 }
94
95 async fn handle_query(
96 &self,
97 _peer: &Peer,
98 _request: QueryRequest,
99 ) -> Result<SendableRecordBatchStream> {
100 unreachable!()
101 }
102}
103
104#[derive(Clone)]
105pub struct RetryErrorDatanodeHandler;
106
107#[async_trait::async_trait]
108impl MockDatanodeHandler for RetryErrorDatanodeHandler {
109 async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
110 debug!("Returning retry later for request: {request:?}, peer: {peer:?}");
111 Err(Error::RetryLater {
112 source: BoxedError::new(
113 error::UnexpectedSnafu {
114 err_msg: "retry later",
115 }
116 .build(),
117 ),
118 clean_poisons: false,
119 })
120 }
121
122 async fn handle_query(
123 &self,
124 _peer: &Peer,
125 _request: QueryRequest,
126 ) -> Result<SendableRecordBatchStream> {
127 unreachable!()
128 }
129}
130
131#[derive(Clone)]
132pub struct UnexpectedErrorDatanodeHandler;
133
134#[async_trait::async_trait]
135impl MockDatanodeHandler for UnexpectedErrorDatanodeHandler {
136 async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
137 debug!("Returning mock error for request: {request:?}, peer: {peer:?}");
138 error::UnexpectedSnafu {
139 err_msg: "mock error",
140 }
141 .fail()
142 }
143
144 async fn handle_query(
145 &self,
146 _peer: &Peer,
147 _request: QueryRequest,
148 ) -> Result<SendableRecordBatchStream> {
149 unreachable!()
150 }
151}
152
153#[derive(Clone)]
154pub struct RequestOutdatedErrorDatanodeHandler;
155
156#[derive(Debug, Snafu)]
157#[snafu(display("A mock RequestOutdated error"))]
158struct MockRequestOutdatedError;
159
160impl StackError for MockRequestOutdatedError {
161 fn debug_fmt(&self, _: usize, _: &mut Vec<String>) {}
162
163 fn next(&self) -> Option<&dyn StackError> {
164 None
165 }
166}
167
168impl ErrorExt for MockRequestOutdatedError {
169 fn as_any(&self) -> &dyn std::any::Any {
170 self
171 }
172
173 fn status_code(&self) -> StatusCode {
174 StatusCode::RequestOutdated
175 }
176}
177
178#[async_trait::async_trait]
179impl MockDatanodeHandler for RequestOutdatedErrorDatanodeHandler {
180 async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
181 debug!("Returning mock error for request: {request:?}, peer: {peer:?}");
182 Err(BoxedError::new(MockRequestOutdatedError)).context(error::ExternalSnafu)
183 }
184
185 async fn handle_query(
186 &self,
187 _peer: &Peer,
188 _request: QueryRequest,
189 ) -> Result<SendableRecordBatchStream> {
190 unreachable!()
191 }
192}
193
194#[derive(Clone)]
195pub struct NaiveDatanodeHandler;
196
197#[async_trait::async_trait]
198impl MockDatanodeHandler for NaiveDatanodeHandler {
199 async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
200 debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}");
201 Ok(RegionResponse::new(0))
202 }
203
204 async fn handle_query(
205 &self,
206 _peer: &Peer,
207 _request: QueryRequest,
208 ) -> Result<SendableRecordBatchStream> {
209 unreachable!()
210 }
211}
212
213#[derive(Clone)]
214pub struct PartialSuccessDatanodeHandler {
215 pub retryable: bool,
216}
217
218#[async_trait::async_trait]
219impl MockDatanodeHandler for PartialSuccessDatanodeHandler {
220 async fn handle(&self, peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
221 let success = peer.id % 2 == 0;
222 if success {
223 Ok(RegionResponse::new(0))
224 } else if self.retryable {
225 Err(Error::RetryLater {
226 source: BoxedError::new(
227 error::UnexpectedSnafu {
228 err_msg: "retry later",
229 }
230 .build(),
231 ),
232 clean_poisons: false,
233 })
234 } else {
235 error::UnexpectedSnafu {
236 err_msg: "mock error",
237 }
238 .fail()
239 }
240 }
241
242 async fn handle_query(
243 &self,
244 _peer: &Peer,
245 _request: QueryRequest,
246 ) -> Result<SendableRecordBatchStream> {
247 unreachable!()
248 }
249}
250
251#[derive(Clone)]
252pub struct AllFailureDatanodeHandler {
253 pub retryable: bool,
254}
255
256#[async_trait::async_trait]
257impl MockDatanodeHandler for AllFailureDatanodeHandler {
258 async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
259 if self.retryable {
260 Err(Error::RetryLater {
261 source: BoxedError::new(
262 error::UnexpectedSnafu {
263 err_msg: "retry later",
264 }
265 .build(),
266 ),
267 clean_poisons: false,
268 })
269 } else {
270 error::UnexpectedSnafu {
271 err_msg: "mock error",
272 }
273 .fail()
274 }
275 }
276
277 async fn handle_query(
278 &self,
279 _peer: &Peer,
280 _request: QueryRequest,
281 ) -> Result<SendableRecordBatchStream> {
282 unreachable!()
283 }
284}
285
286#[derive(Clone)]
287pub struct ListMetadataDatanodeHandler {
288 pub region_metadatas: HashMap<RegionId, Option<RegionMetadata>>,
289}
290
291impl ListMetadataDatanodeHandler {
292 pub fn new(region_metadatas: HashMap<RegionId, Option<RegionMetadata>>) -> Self {
293 Self { region_metadatas }
294 }
295}
296
297#[async_trait::async_trait]
298impl MockDatanodeHandler for ListMetadataDatanodeHandler {
299 async fn handle(&self, _peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
300 let Some(Body::ListMetadata(req)) = request.body else {
301 unreachable!()
302 };
303 let mut response = RegionResponse::new(0);
304
305 let mut output = Vec::with_capacity(req.region_ids.len());
306 for region_id in req.region_ids {
307 match self.region_metadatas.get(&RegionId::from_u64(region_id)) {
308 Some(metadata) => {
309 output.push(metadata.clone());
310 }
311 None => {
312 output.push(None);
313 }
314 }
315 }
316
317 response.metadata = serde_json::to_vec(&output).unwrap();
318 Ok(response)
319 }
320
321 async fn handle_query(
322 &self,
323 _peer: &Peer,
324 _request: QueryRequest,
325 ) -> Result<SendableRecordBatchStream> {
326 unreachable!()
327 }
328}