datanode/heartbeat/handler/
upgrade_region.rs1use common_meta::instruction::{Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply};
16use common_telemetry::{info, warn};
17use store_api::region_request::{RegionCatchupRequest, RegionRequest, ReplayCheckpoint};
18
19use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
20use crate::heartbeat::task_tracker::WaitResult;
21
22#[derive(Debug, Clone, Copy, Default)]
23pub struct UpgradeRegionsHandler;
24
25#[async_trait::async_trait]
26impl InstructionHandler for UpgradeRegionsHandler {
27 async fn handle(
28 &self,
29 ctx: &HandlerContext,
30 instruction: Instruction,
31 ) -> Option<InstructionReply> {
32 let UpgradeRegion {
33 region_id,
34 last_entry_id,
35 metadata_last_entry_id,
36 replay_timeout,
37 location_id,
38 replay_entry_id,
39 metadata_replay_entry_id,
40 } = instruction.into_upgrade_regions().unwrap();
41
42 let Some(writable) = ctx.region_server.is_region_leader(region_id) else {
43 return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
44 ready: false,
45 exists: false,
46 error: None,
47 }));
48 };
49
50 if writable {
51 return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
52 ready: true,
53 exists: true,
54 error: None,
55 }));
56 }
57
58 let region_server_moved = ctx.region_server.clone();
59
60 let checkpoint = match (replay_entry_id, metadata_replay_entry_id) {
61 (Some(entry_id), metadata_entry_id) => Some(ReplayCheckpoint {
62 entry_id,
63 metadata_entry_id,
64 }),
65 _ => None,
66 };
67
68 let register_result = ctx
71 .catchup_tasks
72 .try_register(
73 region_id,
74 Box::pin(async move {
75 info!(
76 "Executing region: {region_id} catchup to: last entry id {last_entry_id:?}"
77 );
78 region_server_moved
79 .handle_request(
80 region_id,
81 RegionRequest::Catchup(RegionCatchupRequest {
82 set_writable: true,
83 entry_id: last_entry_id,
84 metadata_entry_id: metadata_last_entry_id,
85 location_id,
86 checkpoint,
87 }),
88 )
89 .await?;
90
91 Ok(())
92 }),
93 )
94 .await;
95
96 if register_result.is_busy() {
97 warn!("Another catchup task is running for the region: {region_id}");
98 }
99
100 let Some(replay_timeout) = replay_timeout else {
102 return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
103 ready: false,
104 exists: true,
105 error: None,
106 }));
107 };
108
109 let mut watcher = register_result.into_watcher();
111 let result = ctx.catchup_tasks.wait(&mut watcher, replay_timeout).await;
112
113 match result {
114 WaitResult::Timeout => Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
115 ready: false,
116 exists: true,
117 error: None,
118 })),
119 WaitResult::Finish(Ok(_)) => {
120 Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
121 ready: true,
122 exists: true,
123 error: None,
124 }))
125 }
126 WaitResult::Finish(Err(err)) => {
127 Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
128 ready: false,
129 exists: true,
130 error: Some(format!("{err:?}")),
131 }))
132 }
133 }
134 }
135}
136
137#[cfg(test)]
138mod tests {
139 use std::time::Duration;
140
141 use common_meta::instruction::{Instruction, UpgradeRegion};
142 use mito2::engine::MITO_ENGINE_NAME;
143 use store_api::region_engine::RegionRole;
144 use store_api::storage::RegionId;
145 use tokio::time::Instant;
146
147 use crate::error;
148 use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler;
149 use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
150 use crate::tests::{MockRegionEngine, mock_region_server};
151
152 #[tokio::test]
153 async fn test_region_not_exist() {
154 let mut mock_region_server = mock_region_server();
155 let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
156 mock_region_server.register_engine(mock_engine);
157
158 let handler_context = HandlerContext::new_for_test(mock_region_server);
159
160 let region_id = RegionId::new(1024, 1);
161 let waits = vec![None, Some(Duration::from_millis(100u64))];
162
163 for replay_timeout in waits {
164 let reply = UpgradeRegionsHandler
165 .handle(
166 &handler_context,
167 Instruction::UpgradeRegion(UpgradeRegion {
168 region_id,
169 replay_timeout,
170 ..Default::default()
171 }),
172 )
173 .await;
174
175 let reply = reply.unwrap().expect_upgrade_region_reply();
176 assert!(!reply.exists);
177 assert!(reply.error.is_none());
178 }
179 }
180
181 #[tokio::test]
182 async fn test_region_writable() {
183 let mock_region_server = mock_region_server();
184 let region_id = RegionId::new(1024, 1);
185
186 let (mock_engine, _) =
187 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
188 region_engine.mock_role = Some(Some(RegionRole::Leader));
189 region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
190 unreachable!();
192 }));
193 });
194 mock_region_server.register_test_region(region_id, mock_engine);
195
196 let handler_context = HandlerContext::new_for_test(mock_region_server);
197
198 let waits = vec![None, Some(Duration::from_millis(100u64))];
199
200 for replay_timeout in waits {
201 let reply = UpgradeRegionsHandler
202 .handle(
203 &handler_context,
204 Instruction::UpgradeRegion(UpgradeRegion {
205 region_id,
206 replay_timeout,
207 ..Default::default()
208 }),
209 )
210 .await;
211
212 let reply = reply.unwrap().expect_upgrade_region_reply();
213 assert!(reply.ready);
214 assert!(reply.exists);
215 assert!(reply.error.is_none());
216 }
217 }
218
219 #[tokio::test]
220 async fn test_region_not_ready() {
221 let mock_region_server = mock_region_server();
222 let region_id = RegionId::new(1024, 1);
223
224 let (mock_engine, _) =
225 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
226 region_engine.mock_role = Some(Some(RegionRole::Follower));
228 region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
229 region_engine.handle_request_delay = Some(Duration::from_secs(100));
231 });
232 mock_region_server.register_test_region(region_id, mock_engine);
233
234 let handler_context = HandlerContext::new_for_test(mock_region_server);
235
236 let waits = vec![None, Some(Duration::from_millis(100u64))];
237
238 for replay_timeout in waits {
239 let reply = UpgradeRegionsHandler
240 .handle(
241 &handler_context,
242 Instruction::UpgradeRegion(UpgradeRegion {
243 region_id,
244 replay_timeout,
245 ..Default::default()
246 }),
247 )
248 .await;
249
250 let reply = reply.unwrap().expect_upgrade_region_reply();
251 assert!(!reply.ready);
252 assert!(reply.exists);
253 assert!(reply.error.is_none());
254 }
255 }
256
257 #[tokio::test]
258 async fn test_region_not_ready_with_retry() {
259 let mock_region_server = mock_region_server();
260 let region_id = RegionId::new(1024, 1);
261
262 let (mock_engine, _) =
263 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
264 region_engine.mock_role = Some(Some(RegionRole::Follower));
266 region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
267 region_engine.handle_request_delay = Some(Duration::from_millis(300));
269 });
270 mock_region_server.register_test_region(region_id, mock_engine);
271
272 let waits = vec![
273 Some(Duration::from_millis(100u64)),
274 Some(Duration::from_millis(100u64)),
275 ];
276
277 let handler_context = HandlerContext::new_for_test(mock_region_server);
278
279 for replay_timeout in waits {
280 let reply = UpgradeRegionsHandler
281 .handle(
282 &handler_context,
283 Instruction::UpgradeRegion(UpgradeRegion {
284 region_id,
285 replay_timeout,
286 ..Default::default()
287 }),
288 )
289 .await;
290
291 let reply = reply.unwrap().expect_upgrade_region_reply();
292 assert!(!reply.ready);
293 assert!(reply.exists);
294 assert!(reply.error.is_none());
295 }
296
297 let timer = Instant::now();
298 let reply = UpgradeRegionsHandler
299 .handle(
300 &handler_context,
301 Instruction::UpgradeRegion(UpgradeRegion {
302 region_id,
303 replay_timeout: Some(Duration::from_millis(500)),
304 ..Default::default()
305 }),
306 )
307 .await;
308 assert!(timer.elapsed().as_millis() < 300);
310
311 let reply = reply.unwrap().expect_upgrade_region_reply();
312 assert!(reply.ready);
313 assert!(reply.exists);
314 assert!(reply.error.is_none());
315 }
316
317 #[tokio::test]
318 async fn test_region_error() {
319 let mock_region_server = mock_region_server();
320 let region_id = RegionId::new(1024, 1);
321
322 let (mock_engine, _) =
323 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
324 region_engine.mock_role = Some(Some(RegionRole::Follower));
326 region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
327 error::UnexpectedSnafu {
328 violated: "mock_error".to_string(),
329 }
330 .fail()
331 }));
332 region_engine.handle_request_delay = Some(Duration::from_millis(100));
334 });
335 mock_region_server.register_test_region(region_id, mock_engine);
336
337 let handler_context = HandlerContext::new_for_test(mock_region_server);
338
339 let reply = UpgradeRegionsHandler
340 .handle(
341 &handler_context,
342 Instruction::UpgradeRegion(UpgradeRegion {
343 region_id,
344 ..Default::default()
345 }),
346 )
347 .await;
348
349 let reply = reply.unwrap().expect_upgrade_region_reply();
351 assert!(!reply.ready);
352 assert!(reply.exists);
353 assert!(reply.error.is_none());
354
355 let reply = UpgradeRegionsHandler
356 .handle(
357 &handler_context,
358 Instruction::UpgradeRegion(UpgradeRegion {
359 region_id,
360 replay_timeout: Some(Duration::from_millis(200)),
361 ..Default::default()
362 }),
363 )
364 .await;
365
366 let reply = reply.unwrap().expect_upgrade_region_reply();
367 assert!(!reply.ready);
368 assert!(reply.exists);
369 assert!(reply.error.is_some());
370 assert!(reply.error.unwrap().contains("mock_error"));
371 }
372}