common_meta/ddl/test_util/
datanode_handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use api::region::RegionResponse;
19use api::v1::region::region_request::Body;
20use api::v1::region::RegionRequest;
21use common_error::ext::{BoxedError, ErrorExt, StackError};
22use common_error::status_code::StatusCode;
23use common_query::request::QueryRequest;
24use common_recordbatch::SendableRecordBatchStream;
25use common_telemetry::debug;
26use snafu::{ResultExt, Snafu};
27use store_api::metadata::RegionMetadata;
28use store_api::storage::RegionId;
29use tokio::sync::mpsc;
30
31use crate::error::{self, Error, Result};
32use crate::peer::Peer;
33use crate::test_util::MockDatanodeHandler;
34
35#[async_trait::async_trait]
36impl MockDatanodeHandler for () {
37    async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
38        Ok(RegionResponse {
39            affected_rows: 0,
40            extensions: Default::default(),
41            metadata: Vec::new(),
42        })
43    }
44
45    async fn handle_query(
46        &self,
47        _peer: &Peer,
48        _request: QueryRequest,
49    ) -> Result<SendableRecordBatchStream> {
50        unreachable!()
51    }
52}
53
54type RegionRequestHandler =
55    Arc<dyn Fn(Peer, RegionRequest) -> Result<RegionResponse> + Send + Sync>;
56
57#[derive(Clone)]
58pub struct DatanodeWatcher {
59    sender: mpsc::Sender<(Peer, RegionRequest)>,
60    handler: Option<RegionRequestHandler>,
61}
62
63impl DatanodeWatcher {
64    pub fn new(sender: mpsc::Sender<(Peer, RegionRequest)>) -> Self {
65        Self {
66            sender,
67            handler: None,
68        }
69    }
70
71    pub fn with_handler(
72        mut self,
73        user_handler: impl Fn(Peer, RegionRequest) -> Result<RegionResponse> + Send + Sync + 'static,
74    ) -> Self {
75        self.handler = Some(Arc::new(user_handler));
76        self
77    }
78}
79
80#[async_trait::async_trait]
81impl MockDatanodeHandler for DatanodeWatcher {
82    async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
83        debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}");
84        self.sender
85            .send((peer.clone(), request.clone()))
86            .await
87            .unwrap();
88        if let Some(handler) = self.handler.as_ref() {
89            handler(peer.clone(), request)
90        } else {
91            Ok(RegionResponse::new(0))
92        }
93    }
94
95    async fn handle_query(
96        &self,
97        _peer: &Peer,
98        _request: QueryRequest,
99    ) -> Result<SendableRecordBatchStream> {
100        unreachable!()
101    }
102}
103
104#[derive(Clone)]
105pub struct RetryErrorDatanodeHandler;
106
107#[async_trait::async_trait]
108impl MockDatanodeHandler for RetryErrorDatanodeHandler {
109    async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
110        debug!("Returning retry later for request: {request:?}, peer: {peer:?}");
111        Err(Error::RetryLater {
112            source: BoxedError::new(
113                error::UnexpectedSnafu {
114                    err_msg: "retry later",
115                }
116                .build(),
117            ),
118            clean_poisons: false,
119        })
120    }
121
122    async fn handle_query(
123        &self,
124        _peer: &Peer,
125        _request: QueryRequest,
126    ) -> Result<SendableRecordBatchStream> {
127        unreachable!()
128    }
129}
130
131#[derive(Clone)]
132pub struct UnexpectedErrorDatanodeHandler;
133
134#[async_trait::async_trait]
135impl MockDatanodeHandler for UnexpectedErrorDatanodeHandler {
136    async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
137        debug!("Returning mock error for request: {request:?}, peer: {peer:?}");
138        error::UnexpectedSnafu {
139            err_msg: "mock error",
140        }
141        .fail()
142    }
143
144    async fn handle_query(
145        &self,
146        _peer: &Peer,
147        _request: QueryRequest,
148    ) -> Result<SendableRecordBatchStream> {
149        unreachable!()
150    }
151}
152
153#[derive(Clone)]
154pub struct RequestOutdatedErrorDatanodeHandler;
155
156#[derive(Debug, Snafu)]
157#[snafu(display("A mock RequestOutdated error"))]
158struct MockRequestOutdatedError;
159
160impl StackError for MockRequestOutdatedError {
161    fn debug_fmt(&self, _: usize, _: &mut Vec<String>) {}
162
163    fn next(&self) -> Option<&dyn StackError> {
164        None
165    }
166}
167
168impl ErrorExt for MockRequestOutdatedError {
169    fn as_any(&self) -> &dyn std::any::Any {
170        self
171    }
172
173    fn status_code(&self) -> StatusCode {
174        StatusCode::RequestOutdated
175    }
176}
177
178#[async_trait::async_trait]
179impl MockDatanodeHandler for RequestOutdatedErrorDatanodeHandler {
180    async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
181        debug!("Returning mock error for request: {request:?}, peer: {peer:?}");
182        Err(BoxedError::new(MockRequestOutdatedError)).context(error::ExternalSnafu)
183    }
184
185    async fn handle_query(
186        &self,
187        _peer: &Peer,
188        _request: QueryRequest,
189    ) -> Result<SendableRecordBatchStream> {
190        unreachable!()
191    }
192}
193
194#[derive(Clone)]
195pub struct NaiveDatanodeHandler;
196
197#[async_trait::async_trait]
198impl MockDatanodeHandler for NaiveDatanodeHandler {
199    async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
200        debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}");
201        Ok(RegionResponse::new(0))
202    }
203
204    async fn handle_query(
205        &self,
206        _peer: &Peer,
207        _request: QueryRequest,
208    ) -> Result<SendableRecordBatchStream> {
209        unreachable!()
210    }
211}
212
213#[derive(Clone)]
214pub struct PartialSuccessDatanodeHandler {
215    pub retryable: bool,
216}
217
218#[async_trait::async_trait]
219impl MockDatanodeHandler for PartialSuccessDatanodeHandler {
220    async fn handle(&self, peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
221        let success = peer.id % 2 == 0;
222        if success {
223            Ok(RegionResponse::new(0))
224        } else if self.retryable {
225            Err(Error::RetryLater {
226                source: BoxedError::new(
227                    error::UnexpectedSnafu {
228                        err_msg: "retry later",
229                    }
230                    .build(),
231                ),
232                clean_poisons: false,
233            })
234        } else {
235            error::UnexpectedSnafu {
236                err_msg: "mock error",
237            }
238            .fail()
239        }
240    }
241
242    async fn handle_query(
243        &self,
244        _peer: &Peer,
245        _request: QueryRequest,
246    ) -> Result<SendableRecordBatchStream> {
247        unreachable!()
248    }
249}
250
251#[derive(Clone)]
252pub struct AllFailureDatanodeHandler {
253    pub retryable: bool,
254}
255
256#[async_trait::async_trait]
257impl MockDatanodeHandler for AllFailureDatanodeHandler {
258    async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
259        if self.retryable {
260            Err(Error::RetryLater {
261                source: BoxedError::new(
262                    error::UnexpectedSnafu {
263                        err_msg: "retry later",
264                    }
265                    .build(),
266                ),
267                clean_poisons: false,
268            })
269        } else {
270            error::UnexpectedSnafu {
271                err_msg: "mock error",
272            }
273            .fail()
274        }
275    }
276
277    async fn handle_query(
278        &self,
279        _peer: &Peer,
280        _request: QueryRequest,
281    ) -> Result<SendableRecordBatchStream> {
282        unreachable!()
283    }
284}
285
286#[derive(Clone)]
287pub struct ListMetadataDatanodeHandler {
288    pub region_metadatas: HashMap<RegionId, Option<RegionMetadata>>,
289}
290
291impl ListMetadataDatanodeHandler {
292    pub fn new(region_metadatas: HashMap<RegionId, Option<RegionMetadata>>) -> Self {
293        Self { region_metadatas }
294    }
295}
296
297#[async_trait::async_trait]
298impl MockDatanodeHandler for ListMetadataDatanodeHandler {
299    async fn handle(&self, _peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
300        let Some(Body::ListMetadata(req)) = request.body else {
301            unreachable!()
302        };
303        let mut response = RegionResponse::new(0);
304
305        let mut output = Vec::with_capacity(req.region_ids.len());
306        for region_id in req.region_ids {
307            match self.region_metadatas.get(&RegionId::from_u64(region_id)) {
308                Some(metadata) => {
309                    output.push(metadata.clone());
310                }
311                None => {
312                    output.push(None);
313                }
314            }
315        }
316
317        response.metadata = serde_json::to_vec(&output).unwrap();
318        Ok(response)
319    }
320
321    async fn handle_query(
322        &self,
323        _peer: &Peer,
324        _request: QueryRequest,
325    ) -> Result<SendableRecordBatchStream> {
326        unreachable!()
327    }
328}