1use common_meta::instruction::{
16 DowngradeRegion, DowngradeRegionReply, DowngradeRegionsReply, InstructionReply,
17};
18use common_telemetry::tracing::info;
19use common_telemetry::{error, warn};
20use futures::future::join_all;
21use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
22use store_api::region_request::{RegionFlushRequest, RegionRequest};
23use store_api::storage::RegionId;
24
25use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
26use crate::heartbeat::task_tracker::WaitResult;
27
28#[derive(Debug, Clone, Copy, Default)]
29pub struct DowngradeRegionsHandler;
30
31impl DowngradeRegionsHandler {
32 async fn handle_downgrade_region(
33 ctx: &HandlerContext,
34 DowngradeRegion {
35 region_id,
36 flush_timeout,
37 }: DowngradeRegion,
38 ) -> DowngradeRegionReply {
39 let Some(writable) = ctx.region_server.is_region_leader(region_id) else {
40 warn!("Region: {region_id} is not found");
41 return DowngradeRegionReply {
42 region_id,
43 last_entry_id: None,
44 metadata_last_entry_id: None,
45 exists: false,
46 error: None,
47 };
48 };
49
50 let region_server_moved = ctx.region_server.clone();
51
52 if !writable {
54 warn!(
55 "Region: {region_id} is not writable, flush_timeout: {:?}",
56 flush_timeout
57 );
58 return ctx.downgrade_to_follower_gracefully(region_id).await;
59 }
60
61 let Some(flush_timeout) = flush_timeout else {
63 return ctx.downgrade_to_follower_gracefully(region_id).await;
64 };
65
66 match ctx
70 .region_server
71 .set_region_role_state_gracefully(region_id, SettableRegionRoleState::DowngradingLeader)
72 .await
73 {
74 Ok(SetRegionRoleStateResponse::Success { .. }) => {}
75 Ok(SetRegionRoleStateResponse::NotFound) => {
76 warn!("Region: {region_id} is not found");
77 return DowngradeRegionReply {
78 region_id,
79 last_entry_id: None,
80 metadata_last_entry_id: None,
81 exists: false,
82 error: None,
83 };
84 }
85 Ok(SetRegionRoleStateResponse::InvalidTransition(err)) => {
86 error!(err; "Failed to convert region to downgrading leader - invalid transition");
87 return DowngradeRegionReply {
88 region_id,
89 last_entry_id: None,
90 metadata_last_entry_id: None,
91 exists: true,
92 error: Some(format!("{err:?}")),
93 };
94 }
95 Err(err) => {
96 error!(err; "Failed to convert region to downgrading leader");
97 return DowngradeRegionReply {
98 region_id,
99 last_entry_id: None,
100 metadata_last_entry_id: None,
101 exists: true,
102 error: Some(format!("{err:?}")),
103 };
104 }
105 }
106
107 let register_result = ctx
108 .downgrade_tasks
109 .try_register(
110 region_id,
111 Box::pin(async move {
112 info!("Flush region: {region_id} before converting region to follower");
113 region_server_moved
114 .handle_request(
115 region_id,
116 RegionRequest::Flush(RegionFlushRequest::default()),
117 )
118 .await?;
119
120 Ok(())
121 }),
122 )
123 .await;
124
125 if register_result.is_busy() {
126 warn!("Another flush task is running for the region: {region_id}");
127 }
128
129 let mut watcher = register_result.into_watcher();
130 let result = ctx.downgrade_tasks.wait(&mut watcher, flush_timeout).await;
131
132 match result {
133 WaitResult::Timeout => DowngradeRegionReply {
134 region_id,
135 last_entry_id: None,
136 metadata_last_entry_id: None,
137 exists: true,
138 error: Some(format!(
139 "Flush region timeout, region: {region_id}, timeout: {:?}",
140 flush_timeout
141 )),
142 },
143 WaitResult::Finish(Ok(_)) => ctx.downgrade_to_follower_gracefully(region_id).await,
144 WaitResult::Finish(Err(err)) => DowngradeRegionReply {
145 region_id,
146 last_entry_id: None,
147 metadata_last_entry_id: None,
148 exists: true,
149 error: Some(format!("{err:?}")),
150 },
151 }
152 }
153}
154
155#[async_trait::async_trait]
156impl InstructionHandler for DowngradeRegionsHandler {
157 type Instruction = Vec<DowngradeRegion>;
158
159 async fn handle(
160 &self,
161 ctx: &HandlerContext,
162 downgrade_regions: Self::Instruction,
163 ) -> Option<InstructionReply> {
164 let futures = downgrade_regions
165 .into_iter()
166 .map(|downgrade_region| Self::handle_downgrade_region(ctx, downgrade_region));
167 let results = join_all(futures).await;
169
170 Some(InstructionReply::DowngradeRegions(
171 DowngradeRegionsReply::new(results),
172 ))
173 }
174}
175
176impl HandlerContext {
177 async fn downgrade_to_follower_gracefully(&self, region_id: RegionId) -> DowngradeRegionReply {
178 match self
179 .region_server
180 .set_region_role_state_gracefully(region_id, SettableRegionRoleState::Follower)
181 .await
182 {
183 Ok(SetRegionRoleStateResponse::Success(success)) => DowngradeRegionReply {
184 region_id,
185 last_entry_id: success.last_entry_id(),
186 metadata_last_entry_id: success.metadata_last_entry_id(),
187 exists: true,
188 error: None,
189 },
190 Ok(SetRegionRoleStateResponse::NotFound) => {
191 warn!("Region: {region_id} is not found");
192 DowngradeRegionReply {
193 region_id,
194 last_entry_id: None,
195 metadata_last_entry_id: None,
196 exists: false,
197 error: None,
198 }
199 }
200 Ok(SetRegionRoleStateResponse::InvalidTransition(err)) => {
201 error!(err; "Failed to convert region to follower - invalid transition");
202 DowngradeRegionReply {
203 region_id,
204 last_entry_id: None,
205 metadata_last_entry_id: None,
206 exists: true,
207 error: Some(format!("{err:?}")),
208 }
209 }
210 Err(err) => {
211 error!(err; "Failed to convert region to {}", SettableRegionRoleState::Follower);
212 DowngradeRegionReply {
213 region_id,
214 last_entry_id: None,
215 metadata_last_entry_id: None,
216 exists: true,
217 error: Some(format!("{err:?}")),
218 }
219 }
220 }
221 }
222}
223
224#[cfg(test)]
225mod tests {
226 use std::assert_matches;
227 use std::sync::Arc;
228 use std::time::Duration;
229
230 use common_meta::heartbeat::handler::{HandleControl, HeartbeatResponseHandler};
231 use common_meta::heartbeat::mailbox::MessageMeta;
232 use common_meta::instruction::{DowngradeRegion, Instruction};
233 use common_meta::kv_backend::memory::MemoryKvBackend;
234 use mito2::config::MitoConfig;
235 use mito2::engine::MITO_ENGINE_NAME;
236 use mito2::test_util::{CreateRequestBuilder, TestEnv};
237 use store_api::region_engine::{
238 RegionEngine, RegionRole, SetRegionRoleStateResponse, SetRegionRoleStateSuccess,
239 };
240 use store_api::region_request::RegionRequest;
241 use store_api::storage::RegionId;
242 use tokio::time::Instant;
243
244 use crate::error;
245 use crate::heartbeat::handler::downgrade_region::DowngradeRegionsHandler;
246 use crate::heartbeat::handler::tests::HeartbeatResponseTestEnv;
247 use crate::heartbeat::handler::{
248 HandlerContext, InstructionHandler, RegionHeartbeatResponseHandler,
249 };
250 use crate::tests::{MockRegionEngine, mock_region_server};
251
252 #[tokio::test]
253 async fn test_region_not_exist() {
254 let mut mock_region_server = mock_region_server();
255 let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
256 mock_region_server.register_engine(mock_engine);
257 let kv_backend = Arc::new(MemoryKvBackend::new());
258 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
259 let region_id = RegionId::new(1024, 1);
260 let waits = vec![None, Some(Duration::from_millis(100u64))];
261
262 for flush_timeout in waits {
263 let reply = DowngradeRegionsHandler
264 .handle(
265 &handler_context,
266 vec![DowngradeRegion {
267 region_id,
268 flush_timeout,
269 }],
270 )
271 .await;
272
273 let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
274 assert!(!reply.exists);
275 assert!(reply.error.is_none());
276 assert!(reply.last_entry_id.is_none());
277 }
278 }
279
280 #[tokio::test]
281 async fn test_region_readonly() {
282 let mock_region_server = mock_region_server();
283 let region_id = RegionId::new(1024, 1);
284 let (mock_engine, _) =
285 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
286 region_engine.mock_role = Some(Some(RegionRole::Follower));
287 region_engine.handle_request_mock_fn = Some(Box::new(|_, req| {
288 if let RegionRequest::Flush(_) = req {
289 unreachable!();
291 };
292
293 Ok(0)
294 }));
295 region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
296 Ok(SetRegionRoleStateResponse::success(
297 SetRegionRoleStateSuccess::mito(1024),
298 ))
299 }))
300 });
301 mock_region_server.register_test_region(region_id, mock_engine);
302 let kv_backend = Arc::new(MemoryKvBackend::new());
303 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
304
305 let waits = vec![None, Some(Duration::from_millis(100u64))];
306 for flush_timeout in waits {
307 let reply = DowngradeRegionsHandler
308 .handle(
309 &handler_context,
310 vec![DowngradeRegion {
311 region_id,
312 flush_timeout,
313 }],
314 )
315 .await;
316
317 let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
318 assert!(reply.exists);
319 assert!(reply.error.is_none());
320 assert_eq!(reply.last_entry_id.unwrap(), 1024);
321 }
322 }
323
324 #[tokio::test]
325 async fn test_region_flush_timeout() {
326 let mock_region_server = mock_region_server();
327 let region_id = RegionId::new(1024, 1);
328 let (mock_engine, _) =
329 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
330 region_engine.mock_role = Some(Some(RegionRole::Leader));
331 region_engine.handle_request_delay = Some(Duration::from_secs(100));
332 region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
333 Ok(SetRegionRoleStateResponse::success(
334 SetRegionRoleStateSuccess::mito(1024),
335 ))
336 }))
337 });
338 mock_region_server.register_test_region(region_id, mock_engine);
339 let kv_backend = Arc::new(MemoryKvBackend::new());
340 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
341
342 let flush_timeout = Duration::from_millis(100);
343 let reply = DowngradeRegionsHandler
344 .handle(
345 &handler_context,
346 vec![DowngradeRegion {
347 region_id,
348 flush_timeout: Some(flush_timeout),
349 }],
350 )
351 .await;
352
353 let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
354 assert!(reply.exists);
355 assert!(reply.error.as_ref().unwrap().contains("timeout"));
356 assert!(reply.last_entry_id.is_none());
357 }
358
359 #[tokio::test]
360 async fn test_region_flush_timeout_and_retry() {
361 let mock_region_server = mock_region_server();
362 let region_id = RegionId::new(1024, 1);
363 let (mock_engine, _) =
364 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
365 region_engine.mock_role = Some(Some(RegionRole::Leader));
366 region_engine.handle_request_delay = Some(Duration::from_millis(300));
367 region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
368 Ok(SetRegionRoleStateResponse::success(
369 SetRegionRoleStateSuccess::mito(1024),
370 ))
371 }))
372 });
373 mock_region_server.register_test_region(region_id, mock_engine);
374 let kv_backend = Arc::new(MemoryKvBackend::new());
375 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
376
377 let waits = vec![
378 Some(Duration::from_millis(100u64)),
379 Some(Duration::from_millis(100u64)),
380 ];
381
382 for flush_timeout in waits {
383 let reply = DowngradeRegionsHandler
384 .handle(
385 &handler_context,
386 vec![DowngradeRegion {
387 region_id,
388 flush_timeout,
389 }],
390 )
391 .await;
392
393 let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
394 assert!(reply.exists);
395 assert!(reply.error.as_ref().unwrap().contains("timeout"));
396 assert!(reply.last_entry_id.is_none());
397 }
398 let timer = Instant::now();
399 let reply = DowngradeRegionsHandler
400 .handle(
401 &handler_context,
402 vec![DowngradeRegion {
403 region_id,
404 flush_timeout: Some(Duration::from_millis(500)),
405 }],
406 )
407 .await;
408 assert!(timer.elapsed().as_millis() < 300);
410
411 let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
412 assert!(reply.exists);
413 assert!(reply.error.is_none());
414 assert_eq!(reply.last_entry_id.unwrap(), 1024);
415 }
416
417 #[tokio::test]
418 async fn test_region_flush_timeout_and_retry_error() {
419 let mock_region_server = mock_region_server();
420 let region_id = RegionId::new(1024, 1);
421 let (mock_engine, _) =
422 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
423 region_engine.mock_role = Some(Some(RegionRole::Leader));
424 region_engine.handle_request_delay = Some(Duration::from_millis(300));
425 region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
426 error::UnexpectedSnafu {
427 violated: "mock flush failed",
428 }
429 .fail()
430 }));
431 region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
432 Ok(SetRegionRoleStateResponse::success(
433 SetRegionRoleStateSuccess::mito(1024),
434 ))
435 }))
436 });
437 mock_region_server.register_test_region(region_id, mock_engine);
438 let kv_backend = Arc::new(MemoryKvBackend::new());
439 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
440
441 let waits = vec![
442 Some(Duration::from_millis(100u64)),
443 Some(Duration::from_millis(100u64)),
444 ];
445
446 for flush_timeout in waits {
447 let reply = DowngradeRegionsHandler
448 .handle(
449 &handler_context,
450 vec![DowngradeRegion {
451 region_id,
452 flush_timeout,
453 }],
454 )
455 .await;
456 let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
457 assert!(reply.exists);
458 assert!(reply.error.as_ref().unwrap().contains("timeout"));
459 assert!(reply.last_entry_id.is_none());
460 }
461 let timer = Instant::now();
462 let reply = DowngradeRegionsHandler
463 .handle(
464 &handler_context,
465 vec![DowngradeRegion {
466 region_id,
467 flush_timeout: Some(Duration::from_millis(500)),
468 }],
469 )
470 .await;
471 assert!(timer.elapsed().as_millis() < 300);
473 let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
474 assert!(reply.exists);
475 assert!(reply.error.as_ref().unwrap().contains("flush failed"));
476 assert!(reply.last_entry_id.is_none());
477 }
478
479 #[tokio::test]
480 async fn test_set_region_readonly_not_found() {
481 let mock_region_server = mock_region_server();
482 let region_id = RegionId::new(1024, 1);
483 let (mock_engine, _) =
484 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
485 region_engine.mock_role = Some(Some(RegionRole::Leader));
486 region_engine.handle_set_readonly_gracefully_mock_fn =
487 Some(Box::new(|_| Ok(SetRegionRoleStateResponse::NotFound)));
488 });
489 mock_region_server.register_test_region(region_id, mock_engine);
490 let kv_backend = Arc::new(MemoryKvBackend::new());
491 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
492 let reply = DowngradeRegionsHandler
493 .handle(
494 &handler_context,
495 vec![DowngradeRegion {
496 region_id,
497 flush_timeout: None,
498 }],
499 )
500 .await;
501 let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
502 assert!(!reply.exists);
503 assert!(reply.error.is_none());
504 assert!(reply.last_entry_id.is_none());
505 }
506
507 #[tokio::test]
508 async fn test_set_region_readonly_error() {
509 let mock_region_server = mock_region_server();
510 let region_id = RegionId::new(1024, 1);
511 let (mock_engine, _) =
512 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
513 region_engine.mock_role = Some(Some(RegionRole::Leader));
514 region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
515 error::UnexpectedSnafu {
516 violated: "Failed to set region to readonly",
517 }
518 .fail()
519 }));
520 });
521 mock_region_server.register_test_region(region_id, mock_engine);
522 let kv_backend = Arc::new(MemoryKvBackend::new());
523 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
524 let reply = DowngradeRegionsHandler
525 .handle(
526 &handler_context,
527 vec![DowngradeRegion {
528 region_id,
529 flush_timeout: None,
530 }],
531 )
532 .await;
533 let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
534 assert!(reply.exists);
535 assert!(
536 reply
537 .error
538 .as_ref()
539 .unwrap()
540 .contains("Failed to set region to readonly")
541 );
542 assert!(reply.last_entry_id.is_none());
543 }
544
545 #[tokio::test]
546 async fn test_downgrade_regions() {
547 common_telemetry::init_default_ut_logging();
548
549 let mut region_server = mock_region_server();
550 let kv_backend = Arc::new(MemoryKvBackend::new());
551 let heartbeat_handler =
552 RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend);
553 let mut engine_env = TestEnv::with_prefix("downgrade-regions").await;
554 let engine = engine_env.create_engine(MitoConfig::default()).await;
555 region_server.register_engine(Arc::new(engine.clone()));
556 let region_id = RegionId::new(1024, 1);
557 let region_id1 = RegionId::new(1024, 2);
558 let builder = CreateRequestBuilder::new();
559 let create_req = builder.build();
560 region_server
561 .handle_request(region_id, RegionRequest::Create(create_req))
562 .await
563 .unwrap();
564 let create_req1 = builder.build();
565 region_server
566 .handle_request(region_id1, RegionRequest::Create(create_req1))
567 .await
568 .unwrap();
569 let meta = MessageMeta::new_test(1, "test", "dn-1", "meta-0");
570 let instruction = Instruction::DowngradeRegions(vec![
571 DowngradeRegion {
572 region_id,
573 flush_timeout: Some(Duration::from_secs(1)),
574 },
575 DowngradeRegion {
576 region_id: region_id1,
577 flush_timeout: Some(Duration::from_secs(1)),
578 },
579 ]);
580 let mut heartbeat_env = HeartbeatResponseTestEnv::new();
581 let mut ctx = heartbeat_env.create_handler_ctx((meta, Default::default(), instruction));
582 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
583 assert_matches!(control, HandleControl::Continue);
584
585 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
586 let reply = reply.expect_downgrade_regions_reply();
587 assert_eq!(reply[0].region_id, region_id);
588 assert!(reply[0].exists);
589 assert!(reply[0].error.is_none());
590 assert_eq!(reply[0].last_entry_id, Some(0));
591 assert_eq!(reply[1].region_id, region_id1);
592 assert!(reply[1].exists);
593 assert!(reply[1].error.is_none());
594 assert_eq!(reply[1].last_entry_id, Some(0));
595
596 assert_eq!(engine.role(region_id).unwrap(), RegionRole::Follower);
597 assert_eq!(engine.role(region_id1).unwrap(), RegionRole::Follower);
598 }
599}