datanode/heartbeat/handler/
upgrade_region.rs1use common_meta::instruction::{InstructionReply, UpgradeRegion, UpgradeRegionReply};
16use common_telemetry::{info, warn};
17use futures_util::future::BoxFuture;
18use store_api::region_request::{RegionCatchupRequest, RegionRequest};
19
20use crate::heartbeat::handler::HandlerContext;
21use crate::heartbeat::task_tracker::WaitResult;
22
23impl HandlerContext {
24 pub(crate) fn handle_upgrade_region_instruction(
25 self,
26 UpgradeRegion {
27 region_id,
28 last_entry_id,
29 metadata_last_entry_id,
30 replay_timeout,
31 location_id,
32 }: UpgradeRegion,
33 ) -> BoxFuture<'static, Option<InstructionReply>> {
34 Box::pin(async move {
35 let Some(writable) = self.region_server.is_region_leader(region_id) else {
36 return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
37 ready: false,
38 exists: false,
39 error: None,
40 }));
41 };
42
43 if writable {
44 return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
45 ready: true,
46 exists: true,
47 error: None,
48 }));
49 }
50
51 let region_server_moved = self.region_server.clone();
52
53 let register_result = self
56 .catchup_tasks
57 .try_register(
58 region_id,
59 Box::pin(async move {
60 info!("Executing region: {region_id} catchup to: last entry id {last_entry_id:?}");
61 region_server_moved
62 .handle_request(
63 region_id,
64 RegionRequest::Catchup(RegionCatchupRequest {
65 set_writable: true,
66 entry_id: last_entry_id,
67 metadata_entry_id: metadata_last_entry_id,
68 location_id,
69 }),
70 )
71 .await?;
72
73 Ok(())
74 }),
75 )
76 .await;
77
78 if register_result.is_busy() {
79 warn!("Another catchup task is running for the region: {region_id}");
80 }
81
82 let Some(replay_timeout) = replay_timeout else {
84 return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
85 ready: false,
86 exists: true,
87 error: None,
88 }));
89 };
90
91 let mut watcher = register_result.into_watcher();
93 let result = self.catchup_tasks.wait(&mut watcher, replay_timeout).await;
94
95 match result {
96 WaitResult::Timeout => Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
97 ready: false,
98 exists: true,
99 error: None,
100 })),
101 WaitResult::Finish(Ok(_)) => {
102 Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
103 ready: true,
104 exists: true,
105 error: None,
106 }))
107 }
108 WaitResult::Finish(Err(err)) => {
109 Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
110 ready: false,
111 exists: true,
112 error: Some(format!("{err:?}")),
113 }))
114 }
115 }
116 })
117 }
118}
119
120#[cfg(test)]
121mod tests {
122 use std::assert_matches::assert_matches;
123 use std::time::Duration;
124
125 use common_meta::instruction::{InstructionReply, UpgradeRegion};
126 use mito2::engine::MITO_ENGINE_NAME;
127 use store_api::region_engine::RegionRole;
128 use store_api::storage::RegionId;
129 use tokio::time::Instant;
130
131 use crate::error;
132 use crate::heartbeat::handler::HandlerContext;
133 use crate::tests::{mock_region_server, MockRegionEngine};
134
135 #[tokio::test]
136 async fn test_region_not_exist() {
137 let mut mock_region_server = mock_region_server();
138 let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
139 mock_region_server.register_engine(mock_engine);
140
141 let handler_context = HandlerContext::new_for_test(mock_region_server);
142
143 let region_id = RegionId::new(1024, 1);
144 let waits = vec![None, Some(Duration::from_millis(100u64))];
145
146 for replay_timeout in waits {
147 let reply = handler_context
148 .clone()
149 .handle_upgrade_region_instruction(UpgradeRegion {
150 region_id,
151 last_entry_id: None,
152 metadata_last_entry_id: None,
153 replay_timeout,
154 location_id: None,
155 })
156 .await;
157 assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
158
159 if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
160 assert!(!reply.exists);
161 assert!(reply.error.is_none());
162 }
163 }
164 }
165
166 #[tokio::test]
167 async fn test_region_writable() {
168 let mock_region_server = mock_region_server();
169 let region_id = RegionId::new(1024, 1);
170
171 let (mock_engine, _) =
172 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
173 region_engine.mock_role = Some(Some(RegionRole::Leader));
174 region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
175 unreachable!();
177 }));
178 });
179 mock_region_server.register_test_region(region_id, mock_engine);
180
181 let handler_context = HandlerContext::new_for_test(mock_region_server);
182
183 let waits = vec![None, Some(Duration::from_millis(100u64))];
184
185 for replay_timeout in waits {
186 let reply = handler_context
187 .clone()
188 .handle_upgrade_region_instruction(UpgradeRegion {
189 region_id,
190 last_entry_id: None,
191 metadata_last_entry_id: None,
192 replay_timeout,
193 location_id: None,
194 })
195 .await;
196 assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
197
198 if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
199 assert!(reply.ready);
200 assert!(reply.exists);
201 assert!(reply.error.is_none());
202 }
203 }
204 }
205
206 #[tokio::test]
207 async fn test_region_not_ready() {
208 let mock_region_server = mock_region_server();
209 let region_id = RegionId::new(1024, 1);
210
211 let (mock_engine, _) =
212 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
213 region_engine.mock_role = Some(Some(RegionRole::Follower));
215 region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
216 region_engine.handle_request_delay = Some(Duration::from_secs(100));
218 });
219 mock_region_server.register_test_region(region_id, mock_engine);
220
221 let handler_context = HandlerContext::new_for_test(mock_region_server);
222
223 let waits = vec![None, Some(Duration::from_millis(100u64))];
224
225 for replay_timeout in waits {
226 let reply = handler_context
227 .clone()
228 .handle_upgrade_region_instruction(UpgradeRegion {
229 region_id,
230 last_entry_id: None,
231 metadata_last_entry_id: None,
232 replay_timeout,
233 location_id: None,
234 })
235 .await;
236 assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
237
238 if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
239 assert!(!reply.ready);
240 assert!(reply.exists);
241 assert!(reply.error.is_none());
242 }
243 }
244 }
245
246 #[tokio::test]
247 async fn test_region_not_ready_with_retry() {
248 let mock_region_server = mock_region_server();
249 let region_id = RegionId::new(1024, 1);
250
251 let (mock_engine, _) =
252 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
253 region_engine.mock_role = Some(Some(RegionRole::Follower));
255 region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
256 region_engine.handle_request_delay = Some(Duration::from_millis(300));
258 });
259 mock_region_server.register_test_region(region_id, mock_engine);
260
261 let waits = vec![
262 Some(Duration::from_millis(100u64)),
263 Some(Duration::from_millis(100u64)),
264 ];
265
266 let handler_context = HandlerContext::new_for_test(mock_region_server);
267
268 for replay_timeout in waits {
269 let reply = handler_context
270 .clone()
271 .handle_upgrade_region_instruction(UpgradeRegion {
272 region_id,
273 replay_timeout,
274 last_entry_id: None,
275 metadata_last_entry_id: None,
276 location_id: None,
277 })
278 .await;
279 assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
280
281 if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
282 assert!(!reply.ready);
283 assert!(reply.exists);
284 assert!(reply.error.is_none());
285 }
286 }
287
288 let timer = Instant::now();
289 let reply = handler_context
290 .handle_upgrade_region_instruction(UpgradeRegion {
291 region_id,
292 last_entry_id: None,
293 metadata_last_entry_id: None,
294 replay_timeout: Some(Duration::from_millis(500)),
295 location_id: None,
296 })
297 .await;
298 assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
299 assert!(timer.elapsed().as_millis() < 300);
301
302 if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
303 assert!(reply.ready);
304 assert!(reply.exists);
305 assert!(reply.error.is_none());
306 }
307 }
308
309 #[tokio::test]
310 async fn test_region_error() {
311 let mock_region_server = mock_region_server();
312 let region_id = RegionId::new(1024, 1);
313
314 let (mock_engine, _) =
315 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
316 region_engine.mock_role = Some(Some(RegionRole::Follower));
318 region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
319 error::UnexpectedSnafu {
320 violated: "mock_error".to_string(),
321 }
322 .fail()
323 }));
324 region_engine.handle_request_delay = Some(Duration::from_millis(100));
326 });
327 mock_region_server.register_test_region(region_id, mock_engine);
328
329 let handler_context = HandlerContext::new_for_test(mock_region_server);
330
331 let reply = handler_context
332 .clone()
333 .handle_upgrade_region_instruction(UpgradeRegion {
334 region_id,
335 last_entry_id: None,
336 metadata_last_entry_id: None,
337 replay_timeout: None,
338 location_id: None,
339 })
340 .await;
341 assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
342
343 if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
345 assert!(!reply.ready);
346 assert!(reply.exists);
347 assert!(reply.error.is_none());
348 }
349
350 let reply = handler_context
351 .clone()
352 .handle_upgrade_region_instruction(UpgradeRegion {
353 region_id,
354 last_entry_id: None,
355 metadata_last_entry_id: None,
356 replay_timeout: Some(Duration::from_millis(200)),
357 location_id: None,
358 })
359 .await;
360 assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
361
362 if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
363 assert!(!reply.ready);
364 assert!(reply.exists);
365 assert!(reply.error.is_some());
366 assert!(reply.error.unwrap().contains("mock_error"));
367 }
368 }
369}