common_meta/ddl/test_util/
datanode_handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::region::RegionResponse;
16use api::v1::region::RegionRequest;
17use common_error::ext::{BoxedError, ErrorExt, StackError};
18use common_error::status_code::StatusCode;
19use common_query::request::QueryRequest;
20use common_recordbatch::SendableRecordBatchStream;
21use common_telemetry::debug;
22use snafu::{ResultExt, Snafu};
23use tokio::sync::mpsc;
24
25use crate::error::{self, Error, Result};
26use crate::peer::Peer;
27use crate::test_util::MockDatanodeHandler;
28
29#[async_trait::async_trait]
30impl MockDatanodeHandler for () {
31    async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
32        Ok(RegionResponse {
33            affected_rows: 0,
34            extensions: Default::default(),
35        })
36    }
37
38    async fn handle_query(
39        &self,
40        _peer: &Peer,
41        _request: QueryRequest,
42    ) -> Result<SendableRecordBatchStream> {
43        unreachable!()
44    }
45}
46
47#[derive(Clone)]
48pub struct DatanodeWatcher {
49    sender: mpsc::Sender<(Peer, RegionRequest)>,
50    handler: Option<fn(Peer, RegionRequest) -> Result<RegionResponse>>,
51}
52
53impl DatanodeWatcher {
54    pub fn new(sender: mpsc::Sender<(Peer, RegionRequest)>) -> Self {
55        Self {
56            sender,
57            handler: None,
58        }
59    }
60
61    pub fn with_handler(
62        mut self,
63        user_handler: fn(Peer, RegionRequest) -> Result<RegionResponse>,
64    ) -> Self {
65        self.handler = Some(user_handler);
66        self
67    }
68}
69
70#[async_trait::async_trait]
71impl MockDatanodeHandler for DatanodeWatcher {
72    async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
73        debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}");
74        self.sender
75            .send((peer.clone(), request.clone()))
76            .await
77            .unwrap();
78        if let Some(handler) = self.handler {
79            handler(peer.clone(), request)
80        } else {
81            Ok(RegionResponse::new(0))
82        }
83    }
84
85    async fn handle_query(
86        &self,
87        _peer: &Peer,
88        _request: QueryRequest,
89    ) -> Result<SendableRecordBatchStream> {
90        unreachable!()
91    }
92}
93
94#[derive(Clone)]
95pub struct RetryErrorDatanodeHandler;
96
97#[async_trait::async_trait]
98impl MockDatanodeHandler for RetryErrorDatanodeHandler {
99    async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
100        debug!("Returning retry later for request: {request:?}, peer: {peer:?}");
101        Err(Error::RetryLater {
102            source: BoxedError::new(
103                error::UnexpectedSnafu {
104                    err_msg: "retry later",
105                }
106                .build(),
107            ),
108            clean_poisons: false,
109        })
110    }
111
112    async fn handle_query(
113        &self,
114        _peer: &Peer,
115        _request: QueryRequest,
116    ) -> Result<SendableRecordBatchStream> {
117        unreachable!()
118    }
119}
120
121#[derive(Clone)]
122pub struct UnexpectedErrorDatanodeHandler;
123
124#[async_trait::async_trait]
125impl MockDatanodeHandler for UnexpectedErrorDatanodeHandler {
126    async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
127        debug!("Returning mock error for request: {request:?}, peer: {peer:?}");
128        error::UnexpectedSnafu {
129            err_msg: "mock error",
130        }
131        .fail()
132    }
133
134    async fn handle_query(
135        &self,
136        _peer: &Peer,
137        _request: QueryRequest,
138    ) -> Result<SendableRecordBatchStream> {
139        unreachable!()
140    }
141}
142
143#[derive(Clone)]
144pub struct RequestOutdatedErrorDatanodeHandler;
145
146#[derive(Debug, Snafu)]
147#[snafu(display("A mock RequestOutdated error"))]
148struct MockRequestOutdatedError;
149
150impl StackError for MockRequestOutdatedError {
151    fn debug_fmt(&self, _: usize, _: &mut Vec<String>) {}
152
153    fn next(&self) -> Option<&dyn StackError> {
154        None
155    }
156}
157
158impl ErrorExt for MockRequestOutdatedError {
159    fn as_any(&self) -> &dyn std::any::Any {
160        self
161    }
162
163    fn status_code(&self) -> StatusCode {
164        StatusCode::RequestOutdated
165    }
166}
167
168#[async_trait::async_trait]
169impl MockDatanodeHandler for RequestOutdatedErrorDatanodeHandler {
170    async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
171        debug!("Returning mock error for request: {request:?}, peer: {peer:?}");
172        Err(BoxedError::new(MockRequestOutdatedError)).context(error::ExternalSnafu)
173    }
174
175    async fn handle_query(
176        &self,
177        _peer: &Peer,
178        _request: QueryRequest,
179    ) -> Result<SendableRecordBatchStream> {
180        unreachable!()
181    }
182}
183
184#[derive(Clone)]
185pub struct NaiveDatanodeHandler;
186
187#[async_trait::async_trait]
188impl MockDatanodeHandler for NaiveDatanodeHandler {
189    async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
190        debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}");
191        Ok(RegionResponse::new(0))
192    }
193
194    async fn handle_query(
195        &self,
196        _peer: &Peer,
197        _request: QueryRequest,
198    ) -> Result<SendableRecordBatchStream> {
199        unreachable!()
200    }
201}
202
203#[derive(Clone)]
204pub struct PartialSuccessDatanodeHandler {
205    pub retryable: bool,
206}
207
208#[async_trait::async_trait]
209impl MockDatanodeHandler for PartialSuccessDatanodeHandler {
210    async fn handle(&self, peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
211        let success = peer.id % 2 == 0;
212        if success {
213            Ok(RegionResponse::new(0))
214        } else if self.retryable {
215            Err(Error::RetryLater {
216                source: BoxedError::new(
217                    error::UnexpectedSnafu {
218                        err_msg: "retry later",
219                    }
220                    .build(),
221                ),
222                clean_poisons: false,
223            })
224        } else {
225            error::UnexpectedSnafu {
226                err_msg: "mock error",
227            }
228            .fail()
229        }
230    }
231
232    async fn handle_query(
233        &self,
234        _peer: &Peer,
235        _request: QueryRequest,
236    ) -> Result<SendableRecordBatchStream> {
237        unreachable!()
238    }
239}
240
241#[derive(Clone)]
242pub struct AllFailureDatanodeHandler {
243    pub retryable: bool,
244}
245
246#[async_trait::async_trait]
247impl MockDatanodeHandler for AllFailureDatanodeHandler {
248    async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
249        if self.retryable {
250            Err(Error::RetryLater {
251                source: BoxedError::new(
252                    error::UnexpectedSnafu {
253                        err_msg: "retry later",
254                    }
255                    .build(),
256                ),
257                clean_poisons: false,
258            })
259        } else {
260            error::UnexpectedSnafu {
261                err_msg: "mock error",
262            }
263            .fail()
264        }
265    }
266
267    async fn handle_query(
268        &self,
269        _peer: &Peer,
270        _request: QueryRequest,
271    ) -> Result<SendableRecordBatchStream> {
272        unreachable!()
273    }
274}