1use common_error::ext::{BoxedError, ErrorExt};
16use common_error::status_code::StatusCode;
17use common_meta::instruction::{
18 InstructionReply, UpgradeRegion, UpgradeRegionReply, UpgradeRegionsReply,
19};
20use common_telemetry::{debug, info, warn};
21use store_api::region_request::{RegionCatchupRequest, ReplayCheckpoint};
22use store_api::storage::RegionId;
23
24use crate::error::Result;
25use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
26
27#[derive(Debug, Clone, Copy, Default)]
28pub struct UpgradeRegionsHandler {
29 pub upgrade_region_parallelism: usize,
30}
31
32#[cfg(test)]
33impl UpgradeRegionsHandler {
34 fn new_test() -> UpgradeRegionsHandler {
35 UpgradeRegionsHandler {
36 upgrade_region_parallelism: 8,
37 }
38 }
39}
40
41impl UpgradeRegionsHandler {
42 fn convert_responses_to_replies(
43 responses: Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>>,
44 catchup_regions: &[RegionId],
45 ) -> Vec<UpgradeRegionReply> {
46 match responses {
47 Ok(responses) => responses
48 .into_iter()
49 .map(|(region_id, result)| match result {
50 Ok(()) => UpgradeRegionReply {
51 region_id,
52 ready: true,
53 exists: true,
54 error: None,
55 },
56 Err(err) => {
57 if err.status_code() == StatusCode::RegionNotFound {
58 UpgradeRegionReply {
59 region_id,
60 ready: false,
61 exists: false,
62 error: Some(format!("{err:?}")),
63 }
64 } else {
65 UpgradeRegionReply {
66 region_id,
67 ready: false,
68 exists: true,
69 error: Some(format!("{err:?}")),
70 }
71 }
72 }
73 })
74 .collect::<Vec<_>>(),
75 Err(err) => catchup_regions
76 .iter()
77 .map(|region_id| UpgradeRegionReply {
78 region_id: *region_id,
79 ready: false,
80 exists: true,
81 error: Some(format!("{err:?}")),
82 })
83 .collect::<Vec<_>>(),
84 }
85 }
86}
87
88impl UpgradeRegionsHandler {
89 async fn handle_upgrade_regions(
93 &self,
94 ctx: &HandlerContext,
95 upgrade_regions: Vec<UpgradeRegion>,
96 ) -> Vec<UpgradeRegionReply> {
97 let num_upgrade_regions = upgrade_regions.len();
98 let mut replies = Vec::with_capacity(num_upgrade_regions);
99 let mut catchup_requests = Vec::with_capacity(num_upgrade_regions);
100 let mut catchup_regions = Vec::with_capacity(num_upgrade_regions);
101 let mut timeout = None;
102
103 for upgrade_region in upgrade_regions {
104 let Some(writable) = ctx.region_server.is_region_leader(upgrade_region.region_id)
105 else {
106 debug!("Region {} is not found", upgrade_region.region_id);
108 replies.push(UpgradeRegionReply {
109 region_id: upgrade_region.region_id,
110 ready: false,
111 exists: false,
112 error: None,
113 });
114 continue;
115 };
116
117 if writable {
119 warn!(
120 "Region {} is writable, ignores the catchup request",
121 upgrade_region.region_id
122 );
123 replies.push(UpgradeRegionReply {
124 region_id: upgrade_region.region_id,
125 ready: true,
126 exists: true,
127 error: None,
128 });
129 } else {
130 let UpgradeRegion {
131 last_entry_id,
132 metadata_last_entry_id,
133 location_id,
134 replay_entry_id,
135 metadata_replay_entry_id,
136 replay_timeout,
137 ..
138 } = upgrade_region;
139 match timeout {
140 Some(timeout) => {
141 debug_assert_eq!(timeout, replay_timeout);
142 }
143 None => {
144 timeout = Some(replay_timeout);
146 }
147 }
148
149 let checkpoint = match (replay_entry_id, metadata_replay_entry_id) {
150 (Some(entry_id), metadata_entry_id) => Some(ReplayCheckpoint {
151 entry_id,
152 metadata_entry_id,
153 }),
154 _ => None,
155 };
156
157 catchup_regions.push(upgrade_region.region_id);
158 catchup_requests.push((
159 upgrade_region.region_id,
160 RegionCatchupRequest {
161 set_writable: true,
162 entry_id: last_entry_id,
163 metadata_entry_id: metadata_last_entry_id,
164 location_id,
165 checkpoint,
166 },
167 ));
168 }
169 }
170
171 let Some(timeout) = timeout else {
172 info!("All regions are writable, no need to catchup");
174 debug_assert_eq!(replies.len(), num_upgrade_regions);
175 return replies;
176 };
177
178 match tokio::time::timeout(
179 timeout,
180 ctx.region_server
181 .handle_batch_catchup_requests(self.upgrade_region_parallelism, catchup_requests),
182 )
183 .await
184 {
185 Ok(responses) => {
186 replies.extend(
187 Self::convert_responses_to_replies(responses, &catchup_regions).into_iter(),
188 );
189 }
190 Err(_) => {
191 replies.extend(catchup_regions.iter().map(|region_id| UpgradeRegionReply {
192 region_id: *region_id,
193 ready: false,
194 exists: true,
195 error: None,
196 }));
197 }
198 }
199
200 replies
201 }
202}
203
204#[async_trait::async_trait]
205impl InstructionHandler for UpgradeRegionsHandler {
206 type Instruction = Vec<UpgradeRegion>;
207
208 async fn handle(
209 &self,
210 ctx: &HandlerContext,
211 upgrade_regions: Self::Instruction,
212 ) -> Option<InstructionReply> {
213 let replies = self.handle_upgrade_regions(ctx, upgrade_regions).await;
214
215 Some(InstructionReply::UpgradeRegions(UpgradeRegionsReply::new(
216 replies,
217 )))
218 }
219}
220
221#[cfg(test)]
222mod tests {
223 use std::sync::Arc;
224 use std::time::Duration;
225
226 use common_meta::instruction::UpgradeRegion;
227 use common_meta::kv_backend::memory::MemoryKvBackend;
228 use mito2::engine::MITO_ENGINE_NAME;
229 use store_api::region_engine::RegionRole;
230 use store_api::storage::RegionId;
231
232 use crate::error;
233 use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler;
234 use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
235 use crate::tests::{MockRegionEngine, mock_region_server};
236
237 #[tokio::test]
238 async fn test_region_not_exist() {
239 let mut mock_region_server = mock_region_server();
240 let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
241 mock_region_server.register_engine(mock_engine);
242 let kv_backend = Arc::new(MemoryKvBackend::new());
243 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
244
245 let region_id = RegionId::new(1024, 1);
246 let region_id2 = RegionId::new(1024, 2);
247 let replay_timeout = Duration::from_millis(100u64);
248 let reply = UpgradeRegionsHandler::new_test()
249 .handle(
250 &handler_context,
251 vec![
252 UpgradeRegion {
253 region_id,
254 replay_timeout,
255 ..Default::default()
256 },
257 UpgradeRegion {
258 region_id: region_id2,
259 replay_timeout,
260 ..Default::default()
261 },
262 ],
263 )
264 .await;
265
266 let replies = &reply.unwrap().expect_upgrade_regions_reply();
267 assert_eq!(replies[0].region_id, region_id);
268 assert_eq!(replies[1].region_id, region_id2);
269 for reply in replies {
270 assert!(!reply.exists);
271 assert!(reply.error.is_none());
272 }
273 }
274
275 #[tokio::test]
276 async fn test_region_writable() {
277 let mock_region_server = mock_region_server();
278 let region_id = RegionId::new(1024, 1);
279 let region_id2 = RegionId::new(1024, 2);
280
281 let (mock_engine, _) =
282 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
283 region_engine.mock_role = Some(Some(RegionRole::Leader));
284 region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
285 unreachable!();
287 }));
288 });
289 mock_region_server.register_test_region(region_id, mock_engine.clone());
290 mock_region_server.register_test_region(region_id2, mock_engine);
291 let kv_backend = Arc::new(MemoryKvBackend::new());
292 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
293 let replay_timeout = Duration::from_millis(100u64);
294 let reply = UpgradeRegionsHandler::new_test()
295 .handle(
296 &handler_context,
297 vec![
298 UpgradeRegion {
299 region_id,
300 replay_timeout,
301 ..Default::default()
302 },
303 UpgradeRegion {
304 region_id: region_id2,
305 replay_timeout,
306 ..Default::default()
307 },
308 ],
309 )
310 .await;
311
312 let replies = &reply.unwrap().expect_upgrade_regions_reply();
313 assert_eq!(replies[0].region_id, region_id);
314 assert_eq!(replies[1].region_id, region_id2);
315 for reply in replies {
316 assert!(reply.ready);
317 assert!(reply.exists);
318 assert!(reply.error.is_none());
319 }
320 }
321
322 #[tokio::test]
323 async fn test_region_not_ready() {
324 let mock_region_server = mock_region_server();
325 let region_id = RegionId::new(1024, 1);
326
327 let (mock_engine, _) =
328 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
329 region_engine.mock_role = Some(Some(RegionRole::Follower));
331 region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
332 region_engine.handle_request_delay = Some(Duration::from_secs(100));
334 });
335 mock_region_server.register_test_region(region_id, mock_engine);
336 let kv_backend = Arc::new(MemoryKvBackend::new());
337 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
338 let replay_timeout = Duration::from_millis(100u64);
339 let reply = UpgradeRegionsHandler::new_test()
340 .handle(
341 &handler_context,
342 vec![UpgradeRegion {
343 region_id,
344 replay_timeout,
345 ..Default::default()
346 }],
347 )
348 .await;
349
350 let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
351 assert!(!reply.ready);
352 assert!(reply.exists);
353 assert!(reply.error.is_none(), "error: {:?}", reply.error);
354 }
355
356 #[tokio::test]
357 async fn test_region_not_ready_with_retry() {
358 common_telemetry::init_default_ut_logging();
359 let mock_region_server = mock_region_server();
360 let region_id = RegionId::new(1024, 1);
361
362 let (mock_engine, _) =
363 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
364 region_engine.mock_role = Some(Some(RegionRole::Follower));
366 region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
367 region_engine.handle_request_delay = Some(Duration::from_millis(300));
368 });
369 mock_region_server.register_test_region(region_id, mock_engine);
370 let waits = vec![Duration::from_millis(100u64), Duration::from_millis(100u64)];
371 let kv_backend = Arc::new(MemoryKvBackend::new());
372 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
373 for replay_timeout in waits {
374 let reply = UpgradeRegionsHandler::new_test()
375 .handle(
376 &handler_context,
377 vec![UpgradeRegion {
378 region_id,
379 replay_timeout,
380 ..Default::default()
381 }],
382 )
383 .await;
384
385 let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
386 assert!(!reply.ready);
387 assert!(reply.exists);
388 assert!(reply.error.is_none(), "error: {:?}", reply.error);
389 }
390
391 let reply = UpgradeRegionsHandler::new_test()
392 .handle(
393 &handler_context,
394 vec![UpgradeRegion {
395 region_id,
396 replay_timeout: Duration::from_millis(500),
397 ..Default::default()
398 }],
399 )
400 .await;
401 let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
402 assert!(reply.ready);
403 assert!(reply.exists);
404 assert!(reply.error.is_none(), "error: {:?}", reply.error);
405 }
406
407 #[tokio::test]
408 async fn test_region_error() {
409 common_telemetry::init_default_ut_logging();
410 let mock_region_server = mock_region_server();
411 let region_id = RegionId::new(1024, 1);
412
413 let (mock_engine, _) =
414 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
415 region_engine.mock_role = Some(Some(RegionRole::Follower));
417 region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
418 error::UnexpectedSnafu {
419 violated: "mock_error".to_string(),
420 }
421 .fail()
422 }));
423 region_engine.handle_request_delay = Some(Duration::from_millis(100));
425 });
426 mock_region_server.register_test_region(region_id, mock_engine);
427 let kv_backend = Arc::new(MemoryKvBackend::new());
428 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
429 let reply = UpgradeRegionsHandler::new_test()
430 .handle(
431 &handler_context,
432 vec![UpgradeRegion {
433 region_id,
434 ..Default::default()
435 }],
436 )
437 .await;
438
439 let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
441 assert!(!reply.ready);
442 assert!(reply.exists);
443 assert!(reply.error.is_none());
444
445 let reply = UpgradeRegionsHandler::new_test()
446 .handle(
447 &handler_context,
448 vec![UpgradeRegion {
449 region_id,
450 replay_timeout: Duration::from_millis(200),
451 ..Default::default()
452 }],
453 )
454 .await;
455
456 let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
457 assert!(!reply.ready);
458 assert!(reply.exists);
459 assert!(reply.error.is_some());
460 assert!(reply.error.as_ref().unwrap().contains("mock_error"));
461 }
462}