1use common_meta::instruction::{
16 DowngradeRegion, DowngradeRegionReply, DowngradeRegionsReply, Instruction, InstructionReply,
17};
18use common_telemetry::tracing::info;
19use common_telemetry::{error, warn};
20use futures::future::join_all;
21use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
22use store_api::region_request::{RegionFlushRequest, RegionRequest};
23use store_api::storage::RegionId;
24
25use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
26use crate::heartbeat::task_tracker::WaitResult;
27
28#[derive(Debug, Clone, Copy, Default)]
29pub struct DowngradeRegionsHandler;
30
31impl DowngradeRegionsHandler {
32 async fn handle_downgrade_region(
33 ctx: &HandlerContext,
34 DowngradeRegion {
35 region_id,
36 flush_timeout,
37 }: DowngradeRegion,
38 ) -> DowngradeRegionReply {
39 let Some(writable) = ctx.region_server.is_region_leader(region_id) else {
40 warn!("Region: {region_id} is not found");
41 return DowngradeRegionReply {
42 region_id,
43 last_entry_id: None,
44 metadata_last_entry_id: None,
45 exists: false,
46 error: None,
47 };
48 };
49
50 let region_server_moved = ctx.region_server.clone();
51
52 if !writable {
54 warn!(
55 "Region: {region_id} is not writable, flush_timeout: {:?}",
56 flush_timeout
57 );
58 return ctx.downgrade_to_follower_gracefully(region_id).await;
59 }
60
61 let Some(flush_timeout) = flush_timeout else {
63 return ctx.downgrade_to_follower_gracefully(region_id).await;
64 };
65
66 match ctx
70 .region_server
71 .set_region_role_state_gracefully(region_id, SettableRegionRoleState::DowngradingLeader)
72 .await
73 {
74 Ok(SetRegionRoleStateResponse::Success { .. }) => {}
75 Ok(SetRegionRoleStateResponse::NotFound) => {
76 warn!("Region: {region_id} is not found");
77 return DowngradeRegionReply {
78 region_id,
79 last_entry_id: None,
80 metadata_last_entry_id: None,
81 exists: false,
82 error: None,
83 };
84 }
85 Ok(SetRegionRoleStateResponse::InvalidTransition(err)) => {
86 error!(err; "Failed to convert region to downgrading leader - invalid transition");
87 return DowngradeRegionReply {
88 region_id,
89 last_entry_id: None,
90 metadata_last_entry_id: None,
91 exists: true,
92 error: Some(format!("{err:?}")),
93 };
94 }
95 Err(err) => {
96 error!(err; "Failed to convert region to downgrading leader");
97 return DowngradeRegionReply {
98 region_id,
99 last_entry_id: None,
100 metadata_last_entry_id: None,
101 exists: true,
102 error: Some(format!("{err:?}")),
103 };
104 }
105 }
106
107 let register_result = ctx
108 .downgrade_tasks
109 .try_register(
110 region_id,
111 Box::pin(async move {
112 info!("Flush region: {region_id} before converting region to follower");
113 region_server_moved
114 .handle_request(
115 region_id,
116 RegionRequest::Flush(RegionFlushRequest {
117 row_group_size: None,
118 }),
119 )
120 .await?;
121
122 Ok(())
123 }),
124 )
125 .await;
126
127 if register_result.is_busy() {
128 warn!("Another flush task is running for the region: {region_id}");
129 }
130
131 let mut watcher = register_result.into_watcher();
132 let result = ctx.downgrade_tasks.wait(&mut watcher, flush_timeout).await;
133
134 match result {
135 WaitResult::Timeout => DowngradeRegionReply {
136 region_id,
137 last_entry_id: None,
138 metadata_last_entry_id: None,
139 exists: true,
140 error: Some(format!(
141 "Flush region timeout, region: {region_id}, timeout: {:?}",
142 flush_timeout
143 )),
144 },
145 WaitResult::Finish(Ok(_)) => ctx.downgrade_to_follower_gracefully(region_id).await,
146 WaitResult::Finish(Err(err)) => DowngradeRegionReply {
147 region_id,
148 last_entry_id: None,
149 metadata_last_entry_id: None,
150 exists: true,
151 error: Some(format!("{err:?}")),
152 },
153 }
154 }
155}
156
157#[async_trait::async_trait]
158impl InstructionHandler for DowngradeRegionsHandler {
159 async fn handle(
160 &self,
161 ctx: &HandlerContext,
162 instruction: Instruction,
163 ) -> Option<InstructionReply> {
164 let downgrade_regions = instruction.into_downgrade_regions().unwrap();
166 let futures = downgrade_regions
167 .into_iter()
168 .map(|downgrade_region| Self::handle_downgrade_region(ctx, downgrade_region));
169 let results = join_all(futures).await;
171
172 Some(InstructionReply::DowngradeRegions(
173 DowngradeRegionsReply::new(results),
174 ))
175 }
176}
177
178impl HandlerContext {
179 async fn downgrade_to_follower_gracefully(&self, region_id: RegionId) -> DowngradeRegionReply {
180 match self
181 .region_server
182 .set_region_role_state_gracefully(region_id, SettableRegionRoleState::Follower)
183 .await
184 {
185 Ok(SetRegionRoleStateResponse::Success(success)) => DowngradeRegionReply {
186 region_id,
187 last_entry_id: success.last_entry_id(),
188 metadata_last_entry_id: success.metadata_last_entry_id(),
189 exists: true,
190 error: None,
191 },
192 Ok(SetRegionRoleStateResponse::NotFound) => {
193 warn!("Region: {region_id} is not found");
194 DowngradeRegionReply {
195 region_id,
196 last_entry_id: None,
197 metadata_last_entry_id: None,
198 exists: false,
199 error: None,
200 }
201 }
202 Ok(SetRegionRoleStateResponse::InvalidTransition(err)) => {
203 error!(err; "Failed to convert region to follower - invalid transition");
204 DowngradeRegionReply {
205 region_id,
206 last_entry_id: None,
207 metadata_last_entry_id: None,
208 exists: true,
209 error: Some(format!("{err:?}")),
210 }
211 }
212 Err(err) => {
213 error!(err; "Failed to convert region to {}", SettableRegionRoleState::Follower);
214 DowngradeRegionReply {
215 region_id,
216 last_entry_id: None,
217 metadata_last_entry_id: None,
218 exists: true,
219 error: Some(format!("{err:?}")),
220 }
221 }
222 }
223 }
224}
225
226#[cfg(test)]
227mod tests {
228 use std::assert_matches::assert_matches;
229 use std::sync::Arc;
230 use std::time::Duration;
231
232 use common_meta::heartbeat::handler::{HandleControl, HeartbeatResponseHandler};
233 use common_meta::heartbeat::mailbox::MessageMeta;
234 use common_meta::instruction::{DowngradeRegion, Instruction};
235 use mito2::config::MitoConfig;
236 use mito2::engine::MITO_ENGINE_NAME;
237 use mito2::test_util::{CreateRequestBuilder, TestEnv};
238 use store_api::region_engine::{
239 RegionEngine, RegionRole, SetRegionRoleStateResponse, SetRegionRoleStateSuccess,
240 };
241 use store_api::region_request::RegionRequest;
242 use store_api::storage::RegionId;
243 use tokio::time::Instant;
244
245 use crate::error;
246 use crate::heartbeat::handler::downgrade_region::DowngradeRegionsHandler;
247 use crate::heartbeat::handler::tests::HeartbeatResponseTestEnv;
248 use crate::heartbeat::handler::{
249 HandlerContext, InstructionHandler, RegionHeartbeatResponseHandler,
250 };
251 use crate::tests::{MockRegionEngine, mock_region_server};
252
253 #[tokio::test]
254 async fn test_region_not_exist() {
255 let mut mock_region_server = mock_region_server();
256 let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
257 mock_region_server.register_engine(mock_engine);
258 let handler_context = HandlerContext::new_for_test(mock_region_server);
259 let region_id = RegionId::new(1024, 1);
260 let waits = vec![None, Some(Duration::from_millis(100u64))];
261
262 for flush_timeout in waits {
263 let reply = DowngradeRegionsHandler
264 .handle(
265 &handler_context,
266 Instruction::DowngradeRegions(vec![DowngradeRegion {
267 region_id,
268 flush_timeout,
269 }]),
270 )
271 .await;
272
273 let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
274 assert!(!reply.exists);
275 assert!(reply.error.is_none());
276 assert!(reply.last_entry_id.is_none());
277 }
278 }
279
280 #[tokio::test]
281 async fn test_region_readonly() {
282 let mock_region_server = mock_region_server();
283 let region_id = RegionId::new(1024, 1);
284 let (mock_engine, _) =
285 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
286 region_engine.mock_role = Some(Some(RegionRole::Follower));
287 region_engine.handle_request_mock_fn = Some(Box::new(|_, req| {
288 if let RegionRequest::Flush(_) = req {
289 unreachable!();
291 };
292
293 Ok(0)
294 }));
295 region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
296 Ok(SetRegionRoleStateResponse::success(
297 SetRegionRoleStateSuccess::mito(1024),
298 ))
299 }))
300 });
301 mock_region_server.register_test_region(region_id, mock_engine);
302 let handler_context = HandlerContext::new_for_test(mock_region_server);
303
304 let waits = vec![None, Some(Duration::from_millis(100u64))];
305 for flush_timeout in waits {
306 let reply = DowngradeRegionsHandler
307 .handle(
308 &handler_context,
309 Instruction::DowngradeRegions(vec![DowngradeRegion {
310 region_id,
311 flush_timeout,
312 }]),
313 )
314 .await;
315
316 let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
317 assert!(reply.exists);
318 assert!(reply.error.is_none());
319 assert_eq!(reply.last_entry_id.unwrap(), 1024);
320 }
321 }
322
323 #[tokio::test]
324 async fn test_region_flush_timeout() {
325 let mock_region_server = mock_region_server();
326 let region_id = RegionId::new(1024, 1);
327 let (mock_engine, _) =
328 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
329 region_engine.mock_role = Some(Some(RegionRole::Leader));
330 region_engine.handle_request_delay = Some(Duration::from_secs(100));
331 region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
332 Ok(SetRegionRoleStateResponse::success(
333 SetRegionRoleStateSuccess::mito(1024),
334 ))
335 }))
336 });
337 mock_region_server.register_test_region(region_id, mock_engine);
338 let handler_context = HandlerContext::new_for_test(mock_region_server);
339
340 let flush_timeout = Duration::from_millis(100);
341 let reply = DowngradeRegionsHandler
342 .handle(
343 &handler_context,
344 Instruction::DowngradeRegions(vec![DowngradeRegion {
345 region_id,
346 flush_timeout: Some(flush_timeout),
347 }]),
348 )
349 .await;
350
351 let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
352 assert!(reply.exists);
353 assert!(reply.error.as_ref().unwrap().contains("timeout"));
354 assert!(reply.last_entry_id.is_none());
355 }
356
357 #[tokio::test]
358 async fn test_region_flush_timeout_and_retry() {
359 let mock_region_server = mock_region_server();
360 let region_id = RegionId::new(1024, 1);
361 let (mock_engine, _) =
362 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
363 region_engine.mock_role = Some(Some(RegionRole::Leader));
364 region_engine.handle_request_delay = Some(Duration::from_millis(300));
365 region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
366 Ok(SetRegionRoleStateResponse::success(
367 SetRegionRoleStateSuccess::mito(1024),
368 ))
369 }))
370 });
371 mock_region_server.register_test_region(region_id, mock_engine);
372 let handler_context = HandlerContext::new_for_test(mock_region_server);
373
374 let waits = vec![
375 Some(Duration::from_millis(100u64)),
376 Some(Duration::from_millis(100u64)),
377 ];
378
379 for flush_timeout in waits {
380 let reply = DowngradeRegionsHandler
381 .handle(
382 &handler_context,
383 Instruction::DowngradeRegions(vec![DowngradeRegion {
384 region_id,
385 flush_timeout,
386 }]),
387 )
388 .await;
389
390 let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
391 assert!(reply.exists);
392 assert!(reply.error.as_ref().unwrap().contains("timeout"));
393 assert!(reply.last_entry_id.is_none());
394 }
395 let timer = Instant::now();
396 let reply = DowngradeRegionsHandler
397 .handle(
398 &handler_context,
399 Instruction::DowngradeRegions(vec![DowngradeRegion {
400 region_id,
401 flush_timeout: Some(Duration::from_millis(500)),
402 }]),
403 )
404 .await;
405 assert!(timer.elapsed().as_millis() < 300);
407
408 let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
409 assert!(reply.exists);
410 assert!(reply.error.is_none());
411 assert_eq!(reply.last_entry_id.unwrap(), 1024);
412 }
413
414 #[tokio::test]
415 async fn test_region_flush_timeout_and_retry_error() {
416 let mock_region_server = mock_region_server();
417 let region_id = RegionId::new(1024, 1);
418 let (mock_engine, _) =
419 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
420 region_engine.mock_role = Some(Some(RegionRole::Leader));
421 region_engine.handle_request_delay = Some(Duration::from_millis(300));
422 region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
423 error::UnexpectedSnafu {
424 violated: "mock flush failed",
425 }
426 .fail()
427 }));
428 region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
429 Ok(SetRegionRoleStateResponse::success(
430 SetRegionRoleStateSuccess::mito(1024),
431 ))
432 }))
433 });
434 mock_region_server.register_test_region(region_id, mock_engine);
435 let handler_context = HandlerContext::new_for_test(mock_region_server);
436
437 let waits = vec![
438 Some(Duration::from_millis(100u64)),
439 Some(Duration::from_millis(100u64)),
440 ];
441
442 for flush_timeout in waits {
443 let reply = DowngradeRegionsHandler
444 .handle(
445 &handler_context,
446 Instruction::DowngradeRegions(vec![DowngradeRegion {
447 region_id,
448 flush_timeout,
449 }]),
450 )
451 .await;
452 let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
453 assert!(reply.exists);
454 assert!(reply.error.as_ref().unwrap().contains("timeout"));
455 assert!(reply.last_entry_id.is_none());
456 }
457 let timer = Instant::now();
458 let reply = DowngradeRegionsHandler
459 .handle(
460 &handler_context,
461 Instruction::DowngradeRegions(vec![DowngradeRegion {
462 region_id,
463 flush_timeout: Some(Duration::from_millis(500)),
464 }]),
465 )
466 .await;
467 assert!(timer.elapsed().as_millis() < 300);
469 let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
470 assert!(reply.exists);
471 assert!(reply.error.as_ref().unwrap().contains("flush failed"));
472 assert!(reply.last_entry_id.is_none());
473 }
474
475 #[tokio::test]
476 async fn test_set_region_readonly_not_found() {
477 let mock_region_server = mock_region_server();
478 let region_id = RegionId::new(1024, 1);
479 let (mock_engine, _) =
480 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
481 region_engine.mock_role = Some(Some(RegionRole::Leader));
482 region_engine.handle_set_readonly_gracefully_mock_fn =
483 Some(Box::new(|_| Ok(SetRegionRoleStateResponse::NotFound)));
484 });
485 mock_region_server.register_test_region(region_id, mock_engine);
486 let handler_context = HandlerContext::new_for_test(mock_region_server);
487 let reply = DowngradeRegionsHandler
488 .handle(
489 &handler_context,
490 Instruction::DowngradeRegions(vec![DowngradeRegion {
491 region_id,
492 flush_timeout: None,
493 }]),
494 )
495 .await;
496 let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
497 assert!(!reply.exists);
498 assert!(reply.error.is_none());
499 assert!(reply.last_entry_id.is_none());
500 }
501
502 #[tokio::test]
503 async fn test_set_region_readonly_error() {
504 let mock_region_server = mock_region_server();
505 let region_id = RegionId::new(1024, 1);
506 let (mock_engine, _) =
507 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
508 region_engine.mock_role = Some(Some(RegionRole::Leader));
509 region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
510 error::UnexpectedSnafu {
511 violated: "Failed to set region to readonly",
512 }
513 .fail()
514 }));
515 });
516 mock_region_server.register_test_region(region_id, mock_engine);
517 let handler_context = HandlerContext::new_for_test(mock_region_server);
518 let reply = DowngradeRegionsHandler
519 .handle(
520 &handler_context,
521 Instruction::DowngradeRegions(vec![DowngradeRegion {
522 region_id,
523 flush_timeout: None,
524 }]),
525 )
526 .await;
527 let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
528 assert!(reply.exists);
529 assert!(
530 reply
531 .error
532 .as_ref()
533 .unwrap()
534 .contains("Failed to set region to readonly")
535 );
536 assert!(reply.last_entry_id.is_none());
537 }
538
539 #[tokio::test]
540 async fn test_downgrade_regions() {
541 common_telemetry::init_default_ut_logging();
542
543 let mut region_server = mock_region_server();
544 let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
545 let mut engine_env = TestEnv::with_prefix("downgrade-regions").await;
546 let engine = engine_env.create_engine(MitoConfig::default()).await;
547 region_server.register_engine(Arc::new(engine.clone()));
548 let region_id = RegionId::new(1024, 1);
549 let region_id1 = RegionId::new(1024, 2);
550 let builder = CreateRequestBuilder::new();
551 let create_req = builder.build();
552 region_server
553 .handle_request(region_id, RegionRequest::Create(create_req))
554 .await
555 .unwrap();
556 let create_req1 = builder.build();
557 region_server
558 .handle_request(region_id1, RegionRequest::Create(create_req1))
559 .await
560 .unwrap();
561 let meta = MessageMeta::new_test(1, "test", "dn-1", "meta-0");
562 let instruction = Instruction::DowngradeRegions(vec![
563 DowngradeRegion {
564 region_id,
565 flush_timeout: Some(Duration::from_secs(1)),
566 },
567 DowngradeRegion {
568 region_id: region_id1,
569 flush_timeout: Some(Duration::from_secs(1)),
570 },
571 ]);
572 let mut heartbeat_env = HeartbeatResponseTestEnv::new();
573 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
574 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
575 assert_matches!(control, HandleControl::Continue);
576
577 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
578 let reply = reply.expect_downgrade_regions_reply();
579 assert_eq!(reply[0].region_id, region_id);
580 assert!(reply[0].exists);
581 assert!(reply[0].error.is_none());
582 assert_eq!(reply[0].last_entry_id, Some(0));
583 assert_eq!(reply[1].region_id, region_id1);
584 assert!(reply[1].exists);
585 assert!(reply[1].error.is_none());
586 assert_eq!(reply[1].last_entry_id, Some(0));
587
588 assert_eq!(engine.role(region_id).unwrap(), RegionRole::Follower);
589 assert_eq!(engine.role(region_id1).unwrap(), RegionRole::Follower);
590 }
591}