common_meta/ddl/test_util/
datanode_handler.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use api::region::RegionResponse;
16use api::v1::region::RegionRequest;
17use common_error::ext::{BoxedError, ErrorExt, StackError};
18use common_error::status_code::StatusCode;
19use common_query::request::QueryRequest;
20use common_recordbatch::SendableRecordBatchStream;
21use common_telemetry::debug;
22use snafu::{ResultExt, Snafu};
23use tokio::sync::mpsc;
24
25use crate::error::{self, Error, Result};
26use crate::peer::Peer;
27use crate::test_util::MockDatanodeHandler;
28
29#[async_trait::async_trait]
30impl MockDatanodeHandler for () {
31    async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
32        Ok(RegionResponse {
33            affected_rows: 0,
34            extensions: Default::default(),
35            metadata: Vec::new(),
36        })
37    }
38
39    async fn handle_query(
40        &self,
41        _peer: &Peer,
42        _request: QueryRequest,
43    ) -> Result<SendableRecordBatchStream> {
44        unreachable!()
45    }
46}
47
48#[derive(Clone)]
49pub struct DatanodeWatcher {
50    sender: mpsc::Sender<(Peer, RegionRequest)>,
51    handler: Option<fn(Peer, RegionRequest) -> Result<RegionResponse>>,
52}
53
54impl DatanodeWatcher {
55    pub fn new(sender: mpsc::Sender<(Peer, RegionRequest)>) -> Self {
56        Self {
57            sender,
58            handler: None,
59        }
60    }
61
62    pub fn with_handler(
63        mut self,
64        user_handler: fn(Peer, RegionRequest) -> Result<RegionResponse>,
65    ) -> Self {
66        self.handler = Some(user_handler);
67        self
68    }
69}
70
71#[async_trait::async_trait]
72impl MockDatanodeHandler for DatanodeWatcher {
73    async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
74        debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}");
75        self.sender
76            .send((peer.clone(), request.clone()))
77            .await
78            .unwrap();
79        if let Some(handler) = self.handler {
80            handler(peer.clone(), request)
81        } else {
82            Ok(RegionResponse::new(0))
83        }
84    }
85
86    async fn handle_query(
87        &self,
88        _peer: &Peer,
89        _request: QueryRequest,
90    ) -> Result<SendableRecordBatchStream> {
91        unreachable!()
92    }
93}
94
95#[derive(Clone)]
96pub struct RetryErrorDatanodeHandler;
97
98#[async_trait::async_trait]
99impl MockDatanodeHandler for RetryErrorDatanodeHandler {
100    async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
101        debug!("Returning retry later for request: {request:?}, peer: {peer:?}");
102        Err(Error::RetryLater {
103            source: BoxedError::new(
104                error::UnexpectedSnafu {
105                    err_msg: "retry later",
106                }
107                .build(),
108            ),
109            clean_poisons: false,
110        })
111    }
112
113    async fn handle_query(
114        &self,
115        _peer: &Peer,
116        _request: QueryRequest,
117    ) -> Result<SendableRecordBatchStream> {
118        unreachable!()
119    }
120}
121
122#[derive(Clone)]
123pub struct UnexpectedErrorDatanodeHandler;
124
125#[async_trait::async_trait]
126impl MockDatanodeHandler for UnexpectedErrorDatanodeHandler {
127    async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
128        debug!("Returning mock error for request: {request:?}, peer: {peer:?}");
129        error::UnexpectedSnafu {
130            err_msg: "mock error",
131        }
132        .fail()
133    }
134
135    async fn handle_query(
136        &self,
137        _peer: &Peer,
138        _request: QueryRequest,
139    ) -> Result<SendableRecordBatchStream> {
140        unreachable!()
141    }
142}
143
144#[derive(Clone)]
145pub struct RequestOutdatedErrorDatanodeHandler;
146
147#[derive(Debug, Snafu)]
148#[snafu(display("A mock RequestOutdated error"))]
149struct MockRequestOutdatedError;
150
151impl StackError for MockRequestOutdatedError {
152    fn debug_fmt(&self, _: usize, _: &mut Vec<String>) {}
153
154    fn next(&self) -> Option<&dyn StackError> {
155        None
156    }
157}
158
159impl ErrorExt for MockRequestOutdatedError {
160    fn as_any(&self) -> &dyn std::any::Any {
161        self
162    }
163
164    fn status_code(&self) -> StatusCode {
165        StatusCode::RequestOutdated
166    }
167}
168
169#[async_trait::async_trait]
170impl MockDatanodeHandler for RequestOutdatedErrorDatanodeHandler {
171    async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
172        debug!("Returning mock error for request: {request:?}, peer: {peer:?}");
173        Err(BoxedError::new(MockRequestOutdatedError)).context(error::ExternalSnafu)
174    }
175
176    async fn handle_query(
177        &self,
178        _peer: &Peer,
179        _request: QueryRequest,
180    ) -> Result<SendableRecordBatchStream> {
181        unreachable!()
182    }
183}
184
185#[derive(Clone)]
186pub struct NaiveDatanodeHandler;
187
188#[async_trait::async_trait]
189impl MockDatanodeHandler for NaiveDatanodeHandler {
190    async fn handle(&self, peer: &Peer, request: RegionRequest) -> Result<RegionResponse> {
191        debug!("Returning Ok(0) for request: {request:?}, peer: {peer:?}");
192        Ok(RegionResponse::new(0))
193    }
194
195    async fn handle_query(
196        &self,
197        _peer: &Peer,
198        _request: QueryRequest,
199    ) -> Result<SendableRecordBatchStream> {
200        unreachable!()
201    }
202}
203
204#[derive(Clone)]
205pub struct PartialSuccessDatanodeHandler {
206    pub retryable: bool,
207}
208
209#[async_trait::async_trait]
210impl MockDatanodeHandler for PartialSuccessDatanodeHandler {
211    async fn handle(&self, peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
212        let success = peer.id % 2 == 0;
213        if success {
214            Ok(RegionResponse::new(0))
215        } else if self.retryable {
216            Err(Error::RetryLater {
217                source: BoxedError::new(
218                    error::UnexpectedSnafu {
219                        err_msg: "retry later",
220                    }
221                    .build(),
222                ),
223                clean_poisons: false,
224            })
225        } else {
226            error::UnexpectedSnafu {
227                err_msg: "mock error",
228            }
229            .fail()
230        }
231    }
232
233    async fn handle_query(
234        &self,
235        _peer: &Peer,
236        _request: QueryRequest,
237    ) -> Result<SendableRecordBatchStream> {
238        unreachable!()
239    }
240}
241
242#[derive(Clone)]
243pub struct AllFailureDatanodeHandler {
244    pub retryable: bool,
245}
246
247#[async_trait::async_trait]
248impl MockDatanodeHandler for AllFailureDatanodeHandler {
249    async fn handle(&self, _peer: &Peer, _request: RegionRequest) -> Result<RegionResponse> {
250        if self.retryable {
251            Err(Error::RetryLater {
252                source: BoxedError::new(
253                    error::UnexpectedSnafu {
254                        err_msg: "retry later",
255                    }
256                    .build(),
257                ),
258                clean_poisons: false,
259            })
260        } else {
261            error::UnexpectedSnafu {
262                err_msg: "mock error",
263            }
264            .fail()
265        }
266    }
267
268    async fn handle_query(
269        &self,
270        _peer: &Peer,
271        _request: QueryRequest,
272    ) -> Result<SendableRecordBatchStream> {
273        unreachable!()
274    }
275}