1use common_error::ext::{BoxedError, ErrorExt};
16use common_error::status_code::StatusCode;
17use common_meta::instruction::{
18 InstructionReply, UpgradeRegion, UpgradeRegionReply, UpgradeRegionsReply,
19};
20use common_telemetry::{debug, info, warn};
21use store_api::region_request::{RegionCatchupRequest, ReplayCheckpoint};
22use store_api::storage::RegionId;
23
24use crate::error::Result;
25use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
26
27#[derive(Debug, Clone, Copy, Default)]
28pub struct UpgradeRegionsHandler {
29 pub upgrade_region_parallelism: usize,
30}
31
32#[cfg(test)]
33impl UpgradeRegionsHandler {
34 fn new_test() -> UpgradeRegionsHandler {
35 UpgradeRegionsHandler {
36 upgrade_region_parallelism: 8,
37 }
38 }
39}
40
41impl UpgradeRegionsHandler {
42 fn convert_responses_to_replies(
43 responses: Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>>,
44 catchup_regions: &[RegionId],
45 ) -> Vec<UpgradeRegionReply> {
46 match responses {
47 Ok(responses) => responses
48 .into_iter()
49 .map(|(region_id, result)| match result {
50 Ok(()) => UpgradeRegionReply {
51 region_id,
52 ready: true,
53 exists: true,
54 error: None,
55 },
56 Err(err) => {
57 if err.status_code() == StatusCode::RegionNotFound {
58 UpgradeRegionReply {
59 region_id,
60 ready: false,
61 exists: false,
62 error: Some(format!("{err:?}")),
63 }
64 } else {
65 UpgradeRegionReply {
66 region_id,
67 ready: false,
68 exists: true,
69 error: Some(format!("{err:?}")),
70 }
71 }
72 }
73 })
74 .collect::<Vec<_>>(),
75 Err(err) => catchup_regions
76 .iter()
77 .map(|region_id| UpgradeRegionReply {
78 region_id: *region_id,
79 ready: false,
80 exists: true,
81 error: Some(format!("{err:?}")),
82 })
83 .collect::<Vec<_>>(),
84 }
85 }
86}
87
88impl UpgradeRegionsHandler {
89 async fn handle_upgrade_regions(
93 &self,
94 ctx: &HandlerContext,
95 upgrade_regions: Vec<UpgradeRegion>,
96 ) -> Vec<UpgradeRegionReply> {
97 let num_upgrade_regions = upgrade_regions.len();
98 let mut replies = Vec::with_capacity(num_upgrade_regions);
99 let mut catchup_requests = Vec::with_capacity(num_upgrade_regions);
100 let mut catchup_regions = Vec::with_capacity(num_upgrade_regions);
101 let mut timeout = None;
102
103 for upgrade_region in upgrade_regions {
104 let Some(writable) = ctx.region_server.is_region_leader(upgrade_region.region_id)
105 else {
106 debug!("Region {} is not found", upgrade_region.region_id);
108 replies.push(UpgradeRegionReply {
109 region_id: upgrade_region.region_id,
110 ready: false,
111 exists: false,
112 error: None,
113 });
114 continue;
115 };
116
117 if writable {
119 warn!(
120 "Region {} is writable, ignores the catchup request",
121 upgrade_region.region_id
122 );
123 replies.push(UpgradeRegionReply {
124 region_id: upgrade_region.region_id,
125 ready: true,
126 exists: true,
127 error: None,
128 });
129 } else {
130 let UpgradeRegion {
131 last_entry_id,
132 metadata_last_entry_id,
133 location_id,
134 replay_entry_id,
135 metadata_replay_entry_id,
136 replay_timeout,
137 ..
138 } = upgrade_region;
139 match timeout {
140 Some(timeout) => {
141 debug_assert_eq!(timeout, replay_timeout);
142 }
143 None => {
144 timeout = Some(replay_timeout);
146 }
147 }
148
149 let checkpoint = match (replay_entry_id, metadata_replay_entry_id) {
150 (Some(entry_id), metadata_entry_id) => Some(ReplayCheckpoint {
151 entry_id,
152 metadata_entry_id,
153 }),
154 _ => None,
155 };
156
157 catchup_regions.push(upgrade_region.region_id);
158 catchup_requests.push((
159 upgrade_region.region_id,
160 RegionCatchupRequest {
161 set_writable: true,
162 entry_id: last_entry_id,
163 metadata_entry_id: metadata_last_entry_id,
164 location_id,
165 checkpoint,
166 },
167 ));
168 }
169 }
170
171 let Some(timeout) = timeout else {
172 info!("All regions are writable, no need to catchup");
174 debug_assert_eq!(replies.len(), num_upgrade_regions);
175 return replies;
176 };
177
178 match tokio::time::timeout(
179 timeout,
180 ctx.region_server
181 .handle_batch_catchup_requests(self.upgrade_region_parallelism, catchup_requests),
182 )
183 .await
184 {
185 Ok(responses) => {
186 replies.extend(Self::convert_responses_to_replies(
187 responses,
188 &catchup_regions,
189 ));
190 }
191 Err(_) => {
192 replies.extend(catchup_regions.iter().map(|region_id| UpgradeRegionReply {
193 region_id: *region_id,
194 ready: false,
195 exists: true,
196 error: None,
197 }));
198 }
199 }
200
201 replies
202 }
203}
204
205#[async_trait::async_trait]
206impl InstructionHandler for UpgradeRegionsHandler {
207 type Instruction = Vec<UpgradeRegion>;
208
209 async fn handle(
210 &self,
211 ctx: &HandlerContext,
212 upgrade_regions: Self::Instruction,
213 ) -> Option<InstructionReply> {
214 let replies = self.handle_upgrade_regions(ctx, upgrade_regions).await;
215
216 Some(InstructionReply::UpgradeRegions(UpgradeRegionsReply::new(
217 replies,
218 )))
219 }
220}
221
222#[cfg(test)]
223mod tests {
224 use std::sync::Arc;
225 use std::time::Duration;
226
227 use common_meta::instruction::UpgradeRegion;
228 use common_meta::kv_backend::memory::MemoryKvBackend;
229 use mito2::engine::MITO_ENGINE_NAME;
230 use store_api::region_engine::RegionRole;
231 use store_api::storage::RegionId;
232
233 use crate::error;
234 use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler;
235 use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
236 use crate::tests::{MockRegionEngine, mock_region_server};
237
238 #[tokio::test]
239 async fn test_region_not_exist() {
240 let mut mock_region_server = mock_region_server();
241 let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
242 mock_region_server.register_engine(mock_engine);
243 let kv_backend = Arc::new(MemoryKvBackend::new());
244 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
245
246 let region_id = RegionId::new(1024, 1);
247 let region_id2 = RegionId::new(1024, 2);
248 let replay_timeout = Duration::from_millis(100u64);
249 let reply = UpgradeRegionsHandler::new_test()
250 .handle(
251 &handler_context,
252 vec![
253 UpgradeRegion {
254 region_id,
255 replay_timeout,
256 ..Default::default()
257 },
258 UpgradeRegion {
259 region_id: region_id2,
260 replay_timeout,
261 ..Default::default()
262 },
263 ],
264 )
265 .await;
266
267 let replies = &reply.unwrap().expect_upgrade_regions_reply();
268 assert_eq!(replies[0].region_id, region_id);
269 assert_eq!(replies[1].region_id, region_id2);
270 for reply in replies {
271 assert!(!reply.exists);
272 assert!(reply.error.is_none());
273 }
274 }
275
276 #[tokio::test]
277 async fn test_region_writable() {
278 let mock_region_server = mock_region_server();
279 let region_id = RegionId::new(1024, 1);
280 let region_id2 = RegionId::new(1024, 2);
281
282 let (mock_engine, _) =
283 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
284 region_engine.mock_role = Some(Some(RegionRole::Leader));
285 region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
286 unreachable!();
288 }));
289 });
290 mock_region_server.register_test_region(region_id, mock_engine.clone());
291 mock_region_server.register_test_region(region_id2, mock_engine);
292 let kv_backend = Arc::new(MemoryKvBackend::new());
293 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
294 let replay_timeout = Duration::from_millis(100u64);
295 let reply = UpgradeRegionsHandler::new_test()
296 .handle(
297 &handler_context,
298 vec![
299 UpgradeRegion {
300 region_id,
301 replay_timeout,
302 ..Default::default()
303 },
304 UpgradeRegion {
305 region_id: region_id2,
306 replay_timeout,
307 ..Default::default()
308 },
309 ],
310 )
311 .await;
312
313 let replies = &reply.unwrap().expect_upgrade_regions_reply();
314 assert_eq!(replies[0].region_id, region_id);
315 assert_eq!(replies[1].region_id, region_id2);
316 for reply in replies {
317 assert!(reply.ready);
318 assert!(reply.exists);
319 assert!(reply.error.is_none());
320 }
321 }
322
323 #[tokio::test]
324 async fn test_region_not_ready() {
325 let mock_region_server = mock_region_server();
326 let region_id = RegionId::new(1024, 1);
327
328 let (mock_engine, _) =
329 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
330 region_engine.mock_role = Some(Some(RegionRole::Follower));
332 region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
333 region_engine.handle_request_delay = Some(Duration::from_secs(100));
335 });
336 mock_region_server.register_test_region(region_id, mock_engine);
337 let kv_backend = Arc::new(MemoryKvBackend::new());
338 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
339 let replay_timeout = Duration::from_millis(100u64);
340 let reply = UpgradeRegionsHandler::new_test()
341 .handle(
342 &handler_context,
343 vec![UpgradeRegion {
344 region_id,
345 replay_timeout,
346 ..Default::default()
347 }],
348 )
349 .await;
350
351 let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
352 assert!(!reply.ready);
353 assert!(reply.exists);
354 assert!(reply.error.is_none(), "error: {:?}", reply.error);
355 }
356
357 #[tokio::test]
358 async fn test_region_not_ready_with_retry() {
359 common_telemetry::init_default_ut_logging();
360 let mock_region_server = mock_region_server();
361 let region_id = RegionId::new(1024, 1);
362
363 let (mock_engine, _) =
364 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
365 region_engine.mock_role = Some(Some(RegionRole::Follower));
367 region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
368 region_engine.handle_request_delay = Some(Duration::from_millis(300));
369 });
370 mock_region_server.register_test_region(region_id, mock_engine);
371 let waits = vec![Duration::from_millis(100u64), Duration::from_millis(100u64)];
372 let kv_backend = Arc::new(MemoryKvBackend::new());
373 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
374 for replay_timeout in waits {
375 let reply = UpgradeRegionsHandler::new_test()
376 .handle(
377 &handler_context,
378 vec![UpgradeRegion {
379 region_id,
380 replay_timeout,
381 ..Default::default()
382 }],
383 )
384 .await;
385
386 let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
387 assert!(!reply.ready);
388 assert!(reply.exists);
389 assert!(reply.error.is_none(), "error: {:?}", reply.error);
390 }
391
392 let reply = UpgradeRegionsHandler::new_test()
393 .handle(
394 &handler_context,
395 vec![UpgradeRegion {
396 region_id,
397 replay_timeout: Duration::from_millis(500),
398 ..Default::default()
399 }],
400 )
401 .await;
402 let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
403 assert!(reply.ready);
404 assert!(reply.exists);
405 assert!(reply.error.is_none(), "error: {:?}", reply.error);
406 }
407
408 #[tokio::test]
409 async fn test_region_error() {
410 common_telemetry::init_default_ut_logging();
411 let mock_region_server = mock_region_server();
412 let region_id = RegionId::new(1024, 1);
413
414 let (mock_engine, _) =
415 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
416 region_engine.mock_role = Some(Some(RegionRole::Follower));
418 region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
419 error::UnexpectedSnafu {
420 violated: "mock_error".to_string(),
421 }
422 .fail()
423 }));
424 region_engine.handle_request_delay = Some(Duration::from_millis(100));
426 });
427 mock_region_server.register_test_region(region_id, mock_engine);
428 let kv_backend = Arc::new(MemoryKvBackend::new());
429 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
430 let reply = UpgradeRegionsHandler::new_test()
431 .handle(
432 &handler_context,
433 vec![UpgradeRegion {
434 region_id,
435 ..Default::default()
436 }],
437 )
438 .await;
439
440 let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
442 assert!(!reply.ready);
443 assert!(reply.exists);
444 assert!(reply.error.is_none());
445
446 let reply = UpgradeRegionsHandler::new_test()
447 .handle(
448 &handler_context,
449 vec![UpgradeRegion {
450 region_id,
451 replay_timeout: Duration::from_millis(200),
452 ..Default::default()
453 }],
454 )
455 .await;
456
457 let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
458 assert!(!reply.ready);
459 assert!(reply.exists);
460 assert!(reply.error.is_some());
461 assert!(reply.error.as_ref().unwrap().contains("mock_error"));
462 }
463}