common_meta/ddl/test_util/
datanode_handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::region::RegionResponse;
16use api::v1::region::RegionRequest;
17use common_error::ext::{BoxedError, ErrorExt, StackError};
18use common_error::status_code::StatusCode;
19use common_query::request::QueryRequest;
20use common_recordbatch::SendableRecordBatchStream;
21use common_telemetry::debug;
22use snafu::{ResultExt, Snafu};
23use tokio::sync::mpsc;
24
25use crate::error::{self, Error, Result};
26use crate::peer::Peer;
27use crate::test_util::MockDatanodeHandler;
28
29#[async_trait::async_trait]
30impl MockDatanodeHandler for () {
31    async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
32        Ok(RegionResponse {
33            affected_rows: 0,
34            extensions: Default::default(),
35        })
36    }
37
38    async fn handle_query(
39        &self,
40        _peer: &Peer,
41        _request: QueryRequest,
42    ) -> Result<SendableRecordBatchStream> {
43        unreachable!()
44    }
45}
46
47#[derive(Clone)]
48pub struct DatanodeWatcher {
49    sender: mpsc::Sender<(Peer, RegionRequest)>,
50    handler: Option<fn(Peer, RegionRequest) -> Result<RegionResponse>>,
51}
52
53impl DatanodeWatcher {
54    pub fn new(sender: mpsc::Sender<(Peer, RegionRequest)>) -> Self {
55        Self {
56            sender,
57            handler: None,
58        }
59    }
60
61    pub fn with_handler(
62        mut self,
63        user_handler: fn(Peer, RegionRequest) -> Result<RegionResponse>,
64    ) -> Self {
65        self.handler = Some(user_handler);
66        self
67    }
68}
69
70#[async_trait::async_trait]
71impl MockDatanodeHandler for DatanodeWatcher {
72    async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
73        debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}");
74        self.sender
75            .send((peer.clone(), request.clone()))
76            .await
77            .unwrap();
78        if let Some(handler) = self.handler {
79            handler(peer.clone(), request)
80        } else {
81            Ok(RegionResponse::new(0))
82        }
83    }
84
85    async fn handle_query(
86        &self,
87        _peer: &Peer,
88        _request: QueryRequest,
89    ) -> Result<SendableRecordBatchStream> {
90        unreachable!()
91    }
92}
93
94#[derive(Clone)]
95pub struct RetryErrorDatanodeHandler;
96
97#[async_trait::async_trait]
98impl MockDatanodeHandler for RetryErrorDatanodeHandler {
99    async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
100        debug!("Returning retry later for request: {request:?}, peer: {peer:?}");
101        Err(Error::RetryLater {
102            source: BoxedError::new(
103                error::UnexpectedSnafu {
104                    err_msg: "retry later",
105                }
106                .build(),
107            ),
108        })
109    }
110
111    async fn handle_query(
112        &self,
113        _peer: &Peer,
114        _request: QueryRequest,
115    ) -> Result<SendableRecordBatchStream> {
116        unreachable!()
117    }
118}
119
120#[derive(Clone)]
121pub struct UnexpectedErrorDatanodeHandler;
122
123#[async_trait::async_trait]
124impl MockDatanodeHandler for UnexpectedErrorDatanodeHandler {
125    async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
126        debug!("Returning mock error for request: {request:?}, peer: {peer:?}");
127        error::UnexpectedSnafu {
128            err_msg: "mock error",
129        }
130        .fail()
131    }
132
133    async fn handle_query(
134        &self,
135        _peer: &Peer,
136        _request: QueryRequest,
137    ) -> Result<SendableRecordBatchStream> {
138        unreachable!()
139    }
140}
141
142#[derive(Clone)]
143pub struct RequestOutdatedErrorDatanodeHandler;
144
145#[derive(Debug, Snafu)]
146#[snafu(display("A mock RequestOutdated error"))]
147struct MockRequestOutdatedError;
148
149impl StackError for MockRequestOutdatedError {
150    fn debug_fmt(&self, _: usize, _: &mut Vec<String>) {}
151
152    fn next(&self) -> Option<&dyn StackError> {
153        None
154    }
155}
156
157impl ErrorExt for MockRequestOutdatedError {
158    fn as_any(&self) -> &dyn std::any::Any {
159        self
160    }
161
162    fn status_code(&self) -> StatusCode {
163        StatusCode::RequestOutdated
164    }
165}
166
167#[async_trait::async_trait]
168impl MockDatanodeHandler for RequestOutdatedErrorDatanodeHandler {
169    async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
170        debug!("Returning mock error for request: {request:?}, peer: {peer:?}");
171        Err(BoxedError::new(MockRequestOutdatedError)).context(error::ExternalSnafu)
172    }
173
174    async fn handle_query(
175        &self,
176        _peer: &Peer,
177        _request: QueryRequest,
178    ) -> Result<SendableRecordBatchStream> {
179        unreachable!()
180    }
181}
182
183#[derive(Clone)]
184pub struct NaiveDatanodeHandler;
185
186#[async_trait::async_trait]
187impl MockDatanodeHandler for NaiveDatanodeHandler {
188    async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
189        debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}");
190        Ok(RegionResponse::new(0))
191    }
192
193    async fn handle_query(
194        &self,
195        _peer: &Peer,
196        _request: QueryRequest,
197    ) -> Result<SendableRecordBatchStream> {
198        unreachable!()
199    }
200}
201
202#[derive(Clone)]
203pub struct PartialSuccessDatanodeHandler {
204    pub retryable: bool,
205}
206
207#[async_trait::async_trait]
208impl MockDatanodeHandler for PartialSuccessDatanodeHandler {
209    async fn handle(&self, peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
210        let success = peer.id % 2 == 0;
211        if success {
212            Ok(RegionResponse::new(0))
213        } else if self.retryable {
214            Err(Error::RetryLater {
215                source: BoxedError::new(
216                    error::UnexpectedSnafu {
217                        err_msg: "retry later",
218                    }
219                    .build(),
220                ),
221            })
222        } else {
223            error::UnexpectedSnafu {
224                err_msg: "mock error",
225            }
226            .fail()
227        }
228    }
229
230    async fn handle_query(
231        &self,
232        _peer: &Peer,
233        _request: QueryRequest,
234    ) -> Result<SendableRecordBatchStream> {
235        unreachable!()
236    }
237}
238
239#[derive(Clone)]
240pub struct AllFailureDatanodeHandler {
241    pub retryable: bool,
242}
243
244#[async_trait::async_trait]
245impl MockDatanodeHandler for AllFailureDatanodeHandler {
246    async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
247        if self.retryable {
248            Err(Error::RetryLater {
249                source: BoxedError::new(
250                    error::UnexpectedSnafu {
251                        err_msg: "retry later",
252                    }
253                    .build(),
254                ),
255            })
256        } else {
257            error::UnexpectedSnafu {
258                err_msg: "mock error",
259            }
260            .fail()
261        }
262    }
263
264    async fn handle_query(
265        &self,
266        _peer: &Peer,
267        _request: QueryRequest,
268    ) -> Result<SendableRecordBatchStream> {
269        unreachable!()
270    }
271}