datanode/heartbeat/handler/
upgrade_region.rs1use common_error::ext::{BoxedError, ErrorExt};
16use common_error::status_code::StatusCode;
17use common_meta::instruction::{
18 InstructionReply, UpgradeRegion, UpgradeRegionReply, UpgradeRegionsReply,
19};
20use common_telemetry::{debug, info, warn};
21use store_api::region_request::{RegionCatchupRequest, ReplayCheckpoint};
22use store_api::storage::RegionId;
23
24use crate::error::Result;
25use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
26
27#[derive(Debug, Clone, Copy, Default)]
28pub struct UpgradeRegionsHandler {
29 pub upgrade_region_parallelism: usize,
30}
31
32#[cfg(test)]
33impl UpgradeRegionsHandler {
34 fn new_test() -> UpgradeRegionsHandler {
35 UpgradeRegionsHandler {
36 upgrade_region_parallelism: 8,
37 }
38 }
39}
40
41impl UpgradeRegionsHandler {
42 fn convert_responses_to_replies(
43 responses: Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>>,
44 catchup_regions: &[RegionId],
45 ) -> Vec<UpgradeRegionReply> {
46 match responses {
47 Ok(responses) => responses
48 .into_iter()
49 .map(|(region_id, result)| match result {
50 Ok(()) => UpgradeRegionReply {
51 region_id,
52 ready: true,
53 exists: true,
54 error: None,
55 },
56 Err(err) => {
57 if err.status_code() == StatusCode::RegionNotFound {
58 UpgradeRegionReply {
59 region_id,
60 ready: false,
61 exists: false,
62 error: Some(format!("{err:?}")),
63 }
64 } else {
65 UpgradeRegionReply {
66 region_id,
67 ready: false,
68 exists: true,
69 error: Some(format!("{err:?}")),
70 }
71 }
72 }
73 })
74 .collect::<Vec<_>>(),
75 Err(err) => catchup_regions
76 .iter()
77 .map(|region_id| UpgradeRegionReply {
78 region_id: *region_id,
79 ready: false,
80 exists: true,
81 error: Some(format!("{err:?}")),
82 })
83 .collect::<Vec<_>>(),
84 }
85 }
86}
87
88impl UpgradeRegionsHandler {
89 async fn handle_upgrade_regions(
93 &self,
94 ctx: &HandlerContext,
95 upgrade_regions: Vec<UpgradeRegion>,
96 ) -> Vec<UpgradeRegionReply> {
97 let num_upgrade_regions = upgrade_regions.len();
98 let mut replies = Vec::with_capacity(num_upgrade_regions);
99 let mut catchup_requests = Vec::with_capacity(num_upgrade_regions);
100 let mut catchup_regions = Vec::with_capacity(num_upgrade_regions);
101 let mut timeout = None;
102
103 for upgrade_region in upgrade_regions {
104 let Some(writable) = ctx.region_server.is_region_leader(upgrade_region.region_id)
105 else {
106 debug!("Region {} is not found", upgrade_region.region_id);
108 replies.push(UpgradeRegionReply {
109 region_id: upgrade_region.region_id,
110 ready: false,
111 exists: false,
112 error: None,
113 });
114 continue;
115 };
116
117 if writable {
119 warn!(
120 "Region {} is writable, ignores the catchup request",
121 upgrade_region.region_id
122 );
123 replies.push(UpgradeRegionReply {
124 region_id: upgrade_region.region_id,
125 ready: true,
126 exists: true,
127 error: None,
128 });
129 } else {
130 let UpgradeRegion {
131 last_entry_id,
132 metadata_last_entry_id,
133 location_id,
134 replay_entry_id,
135 metadata_replay_entry_id,
136 replay_timeout,
137 ..
138 } = upgrade_region;
139 match timeout {
140 Some(timeout) => {
141 debug_assert_eq!(timeout, replay_timeout);
142 }
143 None => {
144 timeout = Some(replay_timeout);
146 }
147 }
148
149 let checkpoint = match (replay_entry_id, metadata_replay_entry_id) {
150 (Some(entry_id), metadata_entry_id) => Some(ReplayCheckpoint {
151 entry_id,
152 metadata_entry_id,
153 }),
154 _ => None,
155 };
156
157 catchup_regions.push(upgrade_region.region_id);
158 catchup_requests.push((
159 upgrade_region.region_id,
160 RegionCatchupRequest {
161 set_writable: true,
162 entry_id: last_entry_id,
163 metadata_entry_id: metadata_last_entry_id,
164 location_id,
165 checkpoint,
166 },
167 ));
168 }
169 }
170
171 let Some(timeout) = timeout else {
172 info!("All regions are writable, no need to catchup");
174 debug_assert_eq!(replies.len(), num_upgrade_regions);
175 return replies;
176 };
177
178 match tokio::time::timeout(
179 timeout,
180 ctx.region_server
181 .handle_batch_catchup_requests(self.upgrade_region_parallelism, catchup_requests),
182 )
183 .await
184 {
185 Ok(responses) => {
186 replies.extend(
187 Self::convert_responses_to_replies(responses, &catchup_regions).into_iter(),
188 );
189 }
190 Err(_) => {
191 replies.extend(catchup_regions.iter().map(|region_id| UpgradeRegionReply {
192 region_id: *region_id,
193 ready: false,
194 exists: true,
195 error: None,
196 }));
197 }
198 }
199
200 replies
201 }
202}
203
204#[async_trait::async_trait]
205impl InstructionHandler for UpgradeRegionsHandler {
206 type Instruction = Vec<UpgradeRegion>;
207
208 async fn handle(
209 &self,
210 ctx: &HandlerContext,
211 upgrade_regions: Self::Instruction,
212 ) -> Option<InstructionReply> {
213 let replies = self.handle_upgrade_regions(ctx, upgrade_regions).await;
214
215 Some(InstructionReply::UpgradeRegions(UpgradeRegionsReply::new(
216 replies,
217 )))
218 }
219}
220
221#[cfg(test)]
222mod tests {
223 use std::time::Duration;
224
225 use common_meta::instruction::UpgradeRegion;
226 use mito2::engine::MITO_ENGINE_NAME;
227 use store_api::region_engine::RegionRole;
228 use store_api::storage::RegionId;
229
230 use crate::error;
231 use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler;
232 use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
233 use crate::tests::{MockRegionEngine, mock_region_server};
234
235 #[tokio::test]
236 async fn test_region_not_exist() {
237 let mut mock_region_server = mock_region_server();
238 let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
239 mock_region_server.register_engine(mock_engine);
240
241 let handler_context = HandlerContext::new_for_test(mock_region_server);
242
243 let region_id = RegionId::new(1024, 1);
244 let region_id2 = RegionId::new(1024, 2);
245 let replay_timeout = Duration::from_millis(100u64);
246 let reply = UpgradeRegionsHandler::new_test()
247 .handle(
248 &handler_context,
249 vec![
250 UpgradeRegion {
251 region_id,
252 replay_timeout,
253 ..Default::default()
254 },
255 UpgradeRegion {
256 region_id: region_id2,
257 replay_timeout,
258 ..Default::default()
259 },
260 ],
261 )
262 .await;
263
264 let replies = &reply.unwrap().expect_upgrade_regions_reply();
265 assert_eq!(replies[0].region_id, region_id);
266 assert_eq!(replies[1].region_id, region_id2);
267 for reply in replies {
268 assert!(!reply.exists);
269 assert!(reply.error.is_none());
270 }
271 }
272
273 #[tokio::test]
274 async fn test_region_writable() {
275 let mock_region_server = mock_region_server();
276 let region_id = RegionId::new(1024, 1);
277 let region_id2 = RegionId::new(1024, 2);
278
279 let (mock_engine, _) =
280 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
281 region_engine.mock_role = Some(Some(RegionRole::Leader));
282 region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
283 unreachable!();
285 }));
286 });
287 mock_region_server.register_test_region(region_id, mock_engine.clone());
288 mock_region_server.register_test_region(region_id2, mock_engine);
289 let handler_context = HandlerContext::new_for_test(mock_region_server);
290 let replay_timeout = Duration::from_millis(100u64);
291 let reply = UpgradeRegionsHandler::new_test()
292 .handle(
293 &handler_context,
294 vec![
295 UpgradeRegion {
296 region_id,
297 replay_timeout,
298 ..Default::default()
299 },
300 UpgradeRegion {
301 region_id: region_id2,
302 replay_timeout,
303 ..Default::default()
304 },
305 ],
306 )
307 .await;
308
309 let replies = &reply.unwrap().expect_upgrade_regions_reply();
310 assert_eq!(replies[0].region_id, region_id);
311 assert_eq!(replies[1].region_id, region_id2);
312 for reply in replies {
313 assert!(reply.ready);
314 assert!(reply.exists);
315 assert!(reply.error.is_none());
316 }
317 }
318
319 #[tokio::test]
320 async fn test_region_not_ready() {
321 let mock_region_server = mock_region_server();
322 let region_id = RegionId::new(1024, 1);
323
324 let (mock_engine, _) =
325 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
326 region_engine.mock_role = Some(Some(RegionRole::Follower));
328 region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
329 region_engine.handle_request_delay = Some(Duration::from_secs(100));
331 });
332 mock_region_server.register_test_region(region_id, mock_engine);
333
334 let handler_context = HandlerContext::new_for_test(mock_region_server);
335 let replay_timeout = Duration::from_millis(100u64);
336 let reply = UpgradeRegionsHandler::new_test()
337 .handle(
338 &handler_context,
339 vec![UpgradeRegion {
340 region_id,
341 replay_timeout,
342 ..Default::default()
343 }],
344 )
345 .await;
346
347 let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
348 assert!(!reply.ready);
349 assert!(reply.exists);
350 assert!(reply.error.is_none(), "error: {:?}", reply.error);
351 }
352
353 #[tokio::test]
354 async fn test_region_not_ready_with_retry() {
355 common_telemetry::init_default_ut_logging();
356 let mock_region_server = mock_region_server();
357 let region_id = RegionId::new(1024, 1);
358
359 let (mock_engine, _) =
360 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
361 region_engine.mock_role = Some(Some(RegionRole::Follower));
363 region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
364 region_engine.handle_request_delay = Some(Duration::from_millis(300));
365 });
366 mock_region_server.register_test_region(region_id, mock_engine);
367 let waits = vec![Duration::from_millis(100u64), Duration::from_millis(100u64)];
368 let handler_context = HandlerContext::new_for_test(mock_region_server);
369 for replay_timeout in waits {
370 let reply = UpgradeRegionsHandler::new_test()
371 .handle(
372 &handler_context,
373 vec![UpgradeRegion {
374 region_id,
375 replay_timeout,
376 ..Default::default()
377 }],
378 )
379 .await;
380
381 let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
382 assert!(!reply.ready);
383 assert!(reply.exists);
384 assert!(reply.error.is_none(), "error: {:?}", reply.error);
385 }
386
387 let reply = UpgradeRegionsHandler::new_test()
388 .handle(
389 &handler_context,
390 vec![UpgradeRegion {
391 region_id,
392 replay_timeout: Duration::from_millis(500),
393 ..Default::default()
394 }],
395 )
396 .await;
397 let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
398 assert!(reply.ready);
399 assert!(reply.exists);
400 assert!(reply.error.is_none(), "error: {:?}", reply.error);
401 }
402
403 #[tokio::test]
404 async fn test_region_error() {
405 common_telemetry::init_default_ut_logging();
406 let mock_region_server = mock_region_server();
407 let region_id = RegionId::new(1024, 1);
408
409 let (mock_engine, _) =
410 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
411 region_engine.mock_role = Some(Some(RegionRole::Follower));
413 region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
414 error::UnexpectedSnafu {
415 violated: "mock_error".to_string(),
416 }
417 .fail()
418 }));
419 region_engine.handle_request_delay = Some(Duration::from_millis(100));
421 });
422 mock_region_server.register_test_region(region_id, mock_engine);
423
424 let handler_context = HandlerContext::new_for_test(mock_region_server);
425 let reply = UpgradeRegionsHandler::new_test()
426 .handle(
427 &handler_context,
428 vec![UpgradeRegion {
429 region_id,
430 ..Default::default()
431 }],
432 )
433 .await;
434
435 let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
437 assert!(!reply.ready);
438 assert!(reply.exists);
439 assert!(reply.error.is_none());
440
441 let reply = UpgradeRegionsHandler::new_test()
442 .handle(
443 &handler_context,
444 vec![UpgradeRegion {
445 region_id,
446 replay_timeout: Duration::from_millis(200),
447 ..Default::default()
448 }],
449 )
450 .await;
451
452 let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
453 assert!(!reply.ready);
454 assert!(reply.exists);
455 assert!(reply.error.is_some());
456 assert!(reply.error.as_ref().unwrap().contains("mock_error"));
457 }
458}