1use common_meta::instruction::{
16 DowngradeRegion, DowngradeRegionReply, DowngradeRegionsReply, InstructionReply,
17};
18use common_telemetry::tracing::info;
19use common_telemetry::{error, warn};
20use futures::future::join_all;
21use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
22use store_api::region_request::{RegionFlushRequest, RegionRequest};
23use store_api::storage::RegionId;
24
25use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
26use crate::heartbeat::task_tracker::WaitResult;
27
28#[derive(Debug, Clone, Copy, Default)]
29pub struct DowngradeRegionsHandler;
30
31impl DowngradeRegionsHandler {
32 async fn handle_downgrade_region(
33 ctx: &HandlerContext,
34 DowngradeRegion {
35 region_id,
36 flush_timeout,
37 }: DowngradeRegion,
38 ) -> DowngradeRegionReply {
39 let Some(writable) = ctx.region_server.is_region_leader(region_id) else {
40 warn!("Region: {region_id} is not found");
41 return DowngradeRegionReply {
42 region_id,
43 last_entry_id: None,
44 metadata_last_entry_id: None,
45 exists: false,
46 error: None,
47 };
48 };
49
50 let region_server_moved = ctx.region_server.clone();
51
52 if !writable {
54 warn!(
55 "Region: {region_id} is not writable, flush_timeout: {:?}",
56 flush_timeout
57 );
58 return ctx.downgrade_to_follower_gracefully(region_id).await;
59 }
60
61 let Some(flush_timeout) = flush_timeout else {
63 return ctx.downgrade_to_follower_gracefully(region_id).await;
64 };
65
66 match ctx
70 .region_server
71 .set_region_role_state_gracefully(region_id, SettableRegionRoleState::DowngradingLeader)
72 .await
73 {
74 Ok(SetRegionRoleStateResponse::Success { .. }) => {}
75 Ok(SetRegionRoleStateResponse::NotFound) => {
76 warn!("Region: {region_id} is not found");
77 return DowngradeRegionReply {
78 region_id,
79 last_entry_id: None,
80 metadata_last_entry_id: None,
81 exists: false,
82 error: None,
83 };
84 }
85 Ok(SetRegionRoleStateResponse::InvalidTransition(err)) => {
86 error!(err; "Failed to convert region to downgrading leader - invalid transition");
87 return DowngradeRegionReply {
88 region_id,
89 last_entry_id: None,
90 metadata_last_entry_id: None,
91 exists: true,
92 error: Some(format!("{err:?}")),
93 };
94 }
95 Err(err) => {
96 error!(err; "Failed to convert region to downgrading leader");
97 return DowngradeRegionReply {
98 region_id,
99 last_entry_id: None,
100 metadata_last_entry_id: None,
101 exists: true,
102 error: Some(format!("{err:?}")),
103 };
104 }
105 }
106
107 let register_result = ctx
108 .downgrade_tasks
109 .try_register(
110 region_id,
111 Box::pin(async move {
112 info!("Flush region: {region_id} before converting region to follower");
113 region_server_moved
114 .handle_request(
115 region_id,
116 RegionRequest::Flush(RegionFlushRequest {
117 row_group_size: None,
118 }),
119 )
120 .await?;
121
122 Ok(())
123 }),
124 )
125 .await;
126
127 if register_result.is_busy() {
128 warn!("Another flush task is running for the region: {region_id}");
129 }
130
131 let mut watcher = register_result.into_watcher();
132 let result = ctx.downgrade_tasks.wait(&mut watcher, flush_timeout).await;
133
134 match result {
135 WaitResult::Timeout => DowngradeRegionReply {
136 region_id,
137 last_entry_id: None,
138 metadata_last_entry_id: None,
139 exists: true,
140 error: Some(format!(
141 "Flush region timeout, region: {region_id}, timeout: {:?}",
142 flush_timeout
143 )),
144 },
145 WaitResult::Finish(Ok(_)) => ctx.downgrade_to_follower_gracefully(region_id).await,
146 WaitResult::Finish(Err(err)) => DowngradeRegionReply {
147 region_id,
148 last_entry_id: None,
149 metadata_last_entry_id: None,
150 exists: true,
151 error: Some(format!("{err:?}")),
152 },
153 }
154 }
155}
156
157#[async_trait::async_trait]
158impl InstructionHandler for DowngradeRegionsHandler {
159 type Instruction = Vec<DowngradeRegion>;
160
161 async fn handle(
162 &self,
163 ctx: &HandlerContext,
164 downgrade_regions: Self::Instruction,
165 ) -> Option<InstructionReply> {
166 let futures = downgrade_regions
167 .into_iter()
168 .map(|downgrade_region| Self::handle_downgrade_region(ctx, downgrade_region));
169 let results = join_all(futures).await;
171
172 Some(InstructionReply::DowngradeRegions(
173 DowngradeRegionsReply::new(results),
174 ))
175 }
176}
177
178impl HandlerContext {
179 async fn downgrade_to_follower_gracefully(&self, region_id: RegionId) -> DowngradeRegionReply {
180 match self
181 .region_server
182 .set_region_role_state_gracefully(region_id, SettableRegionRoleState::Follower)
183 .await
184 {
185 Ok(SetRegionRoleStateResponse::Success(success)) => DowngradeRegionReply {
186 region_id,
187 last_entry_id: success.last_entry_id(),
188 metadata_last_entry_id: success.metadata_last_entry_id(),
189 exists: true,
190 error: None,
191 },
192 Ok(SetRegionRoleStateResponse::NotFound) => {
193 warn!("Region: {region_id} is not found");
194 DowngradeRegionReply {
195 region_id,
196 last_entry_id: None,
197 metadata_last_entry_id: None,
198 exists: false,
199 error: None,
200 }
201 }
202 Ok(SetRegionRoleStateResponse::InvalidTransition(err)) => {
203 error!(err; "Failed to convert region to follower - invalid transition");
204 DowngradeRegionReply {
205 region_id,
206 last_entry_id: None,
207 metadata_last_entry_id: None,
208 exists: true,
209 error: Some(format!("{err:?}")),
210 }
211 }
212 Err(err) => {
213 error!(err; "Failed to convert region to {}", SettableRegionRoleState::Follower);
214 DowngradeRegionReply {
215 region_id,
216 last_entry_id: None,
217 metadata_last_entry_id: None,
218 exists: true,
219 error: Some(format!("{err:?}")),
220 }
221 }
222 }
223 }
224}
225
226#[cfg(test)]
227mod tests {
228 use std::assert_matches::assert_matches;
229 use std::sync::Arc;
230 use std::time::Duration;
231
232 use common_meta::heartbeat::handler::{HandleControl, HeartbeatResponseHandler};
233 use common_meta::heartbeat::mailbox::MessageMeta;
234 use common_meta::instruction::{DowngradeRegion, Instruction};
235 use common_meta::kv_backend::memory::MemoryKvBackend;
236 use mito2::config::MitoConfig;
237 use mito2::engine::MITO_ENGINE_NAME;
238 use mito2::test_util::{CreateRequestBuilder, TestEnv};
239 use store_api::region_engine::{
240 RegionEngine, RegionRole, SetRegionRoleStateResponse, SetRegionRoleStateSuccess,
241 };
242 use store_api::region_request::RegionRequest;
243 use store_api::storage::RegionId;
244 use tokio::time::Instant;
245
246 use crate::error;
247 use crate::heartbeat::handler::downgrade_region::DowngradeRegionsHandler;
248 use crate::heartbeat::handler::tests::HeartbeatResponseTestEnv;
249 use crate::heartbeat::handler::{
250 HandlerContext, InstructionHandler, RegionHeartbeatResponseHandler,
251 };
252 use crate::tests::{MockRegionEngine, mock_region_server};
253
254 #[tokio::test]
255 async fn test_region_not_exist() {
256 let mut mock_region_server = mock_region_server();
257 let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
258 mock_region_server.register_engine(mock_engine);
259 let kv_backend = Arc::new(MemoryKvBackend::new());
260 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
261 let region_id = RegionId::new(1024, 1);
262 let waits = vec![None, Some(Duration::from_millis(100u64))];
263
264 for flush_timeout in waits {
265 let reply = DowngradeRegionsHandler
266 .handle(
267 &handler_context,
268 vec![DowngradeRegion {
269 region_id,
270 flush_timeout,
271 }],
272 )
273 .await;
274
275 let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
276 assert!(!reply.exists);
277 assert!(reply.error.is_none());
278 assert!(reply.last_entry_id.is_none());
279 }
280 }
281
282 #[tokio::test]
283 async fn test_region_readonly() {
284 let mock_region_server = mock_region_server();
285 let region_id = RegionId::new(1024, 1);
286 let (mock_engine, _) =
287 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
288 region_engine.mock_role = Some(Some(RegionRole::Follower));
289 region_engine.handle_request_mock_fn = Some(Box::new(|_, req| {
290 if let RegionRequest::Flush(_) = req {
291 unreachable!();
293 };
294
295 Ok(0)
296 }));
297 region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
298 Ok(SetRegionRoleStateResponse::success(
299 SetRegionRoleStateSuccess::mito(1024),
300 ))
301 }))
302 });
303 mock_region_server.register_test_region(region_id, mock_engine);
304 let kv_backend = Arc::new(MemoryKvBackend::new());
305 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
306
307 let waits = vec![None, Some(Duration::from_millis(100u64))];
308 for flush_timeout in waits {
309 let reply = DowngradeRegionsHandler
310 .handle(
311 &handler_context,
312 vec![DowngradeRegion {
313 region_id,
314 flush_timeout,
315 }],
316 )
317 .await;
318
319 let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
320 assert!(reply.exists);
321 assert!(reply.error.is_none());
322 assert_eq!(reply.last_entry_id.unwrap(), 1024);
323 }
324 }
325
326 #[tokio::test]
327 async fn test_region_flush_timeout() {
328 let mock_region_server = mock_region_server();
329 let region_id = RegionId::new(1024, 1);
330 let (mock_engine, _) =
331 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
332 region_engine.mock_role = Some(Some(RegionRole::Leader));
333 region_engine.handle_request_delay = Some(Duration::from_secs(100));
334 region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
335 Ok(SetRegionRoleStateResponse::success(
336 SetRegionRoleStateSuccess::mito(1024),
337 ))
338 }))
339 });
340 mock_region_server.register_test_region(region_id, mock_engine);
341 let kv_backend = Arc::new(MemoryKvBackend::new());
342 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
343
344 let flush_timeout = Duration::from_millis(100);
345 let reply = DowngradeRegionsHandler
346 .handle(
347 &handler_context,
348 vec![DowngradeRegion {
349 region_id,
350 flush_timeout: Some(flush_timeout),
351 }],
352 )
353 .await;
354
355 let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
356 assert!(reply.exists);
357 assert!(reply.error.as_ref().unwrap().contains("timeout"));
358 assert!(reply.last_entry_id.is_none());
359 }
360
361 #[tokio::test]
362 async fn test_region_flush_timeout_and_retry() {
363 let mock_region_server = mock_region_server();
364 let region_id = RegionId::new(1024, 1);
365 let (mock_engine, _) =
366 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
367 region_engine.mock_role = Some(Some(RegionRole::Leader));
368 region_engine.handle_request_delay = Some(Duration::from_millis(300));
369 region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
370 Ok(SetRegionRoleStateResponse::success(
371 SetRegionRoleStateSuccess::mito(1024),
372 ))
373 }))
374 });
375 mock_region_server.register_test_region(region_id, mock_engine);
376 let kv_backend = Arc::new(MemoryKvBackend::new());
377 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
378
379 let waits = vec![
380 Some(Duration::from_millis(100u64)),
381 Some(Duration::from_millis(100u64)),
382 ];
383
384 for flush_timeout in waits {
385 let reply = DowngradeRegionsHandler
386 .handle(
387 &handler_context,
388 vec![DowngradeRegion {
389 region_id,
390 flush_timeout,
391 }],
392 )
393 .await;
394
395 let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
396 assert!(reply.exists);
397 assert!(reply.error.as_ref().unwrap().contains("timeout"));
398 assert!(reply.last_entry_id.is_none());
399 }
400 let timer = Instant::now();
401 let reply = DowngradeRegionsHandler
402 .handle(
403 &handler_context,
404 vec![DowngradeRegion {
405 region_id,
406 flush_timeout: Some(Duration::from_millis(500)),
407 }],
408 )
409 .await;
410 assert!(timer.elapsed().as_millis() < 300);
412
413 let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
414 assert!(reply.exists);
415 assert!(reply.error.is_none());
416 assert_eq!(reply.last_entry_id.unwrap(), 1024);
417 }
418
419 #[tokio::test]
420 async fn test_region_flush_timeout_and_retry_error() {
421 let mock_region_server = mock_region_server();
422 let region_id = RegionId::new(1024, 1);
423 let (mock_engine, _) =
424 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
425 region_engine.mock_role = Some(Some(RegionRole::Leader));
426 region_engine.handle_request_delay = Some(Duration::from_millis(300));
427 region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
428 error::UnexpectedSnafu {
429 violated: "mock flush failed",
430 }
431 .fail()
432 }));
433 region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
434 Ok(SetRegionRoleStateResponse::success(
435 SetRegionRoleStateSuccess::mito(1024),
436 ))
437 }))
438 });
439 mock_region_server.register_test_region(region_id, mock_engine);
440 let kv_backend = Arc::new(MemoryKvBackend::new());
441 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
442
443 let waits = vec![
444 Some(Duration::from_millis(100u64)),
445 Some(Duration::from_millis(100u64)),
446 ];
447
448 for flush_timeout in waits {
449 let reply = DowngradeRegionsHandler
450 .handle(
451 &handler_context,
452 vec![DowngradeRegion {
453 region_id,
454 flush_timeout,
455 }],
456 )
457 .await;
458 let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
459 assert!(reply.exists);
460 assert!(reply.error.as_ref().unwrap().contains("timeout"));
461 assert!(reply.last_entry_id.is_none());
462 }
463 let timer = Instant::now();
464 let reply = DowngradeRegionsHandler
465 .handle(
466 &handler_context,
467 vec![DowngradeRegion {
468 region_id,
469 flush_timeout: Some(Duration::from_millis(500)),
470 }],
471 )
472 .await;
473 assert!(timer.elapsed().as_millis() < 300);
475 let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
476 assert!(reply.exists);
477 assert!(reply.error.as_ref().unwrap().contains("flush failed"));
478 assert!(reply.last_entry_id.is_none());
479 }
480
481 #[tokio::test]
482 async fn test_set_region_readonly_not_found() {
483 let mock_region_server = mock_region_server();
484 let region_id = RegionId::new(1024, 1);
485 let (mock_engine, _) =
486 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
487 region_engine.mock_role = Some(Some(RegionRole::Leader));
488 region_engine.handle_set_readonly_gracefully_mock_fn =
489 Some(Box::new(|_| Ok(SetRegionRoleStateResponse::NotFound)));
490 });
491 mock_region_server.register_test_region(region_id, mock_engine);
492 let kv_backend = Arc::new(MemoryKvBackend::new());
493 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
494 let reply = DowngradeRegionsHandler
495 .handle(
496 &handler_context,
497 vec![DowngradeRegion {
498 region_id,
499 flush_timeout: None,
500 }],
501 )
502 .await;
503 let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
504 assert!(!reply.exists);
505 assert!(reply.error.is_none());
506 assert!(reply.last_entry_id.is_none());
507 }
508
509 #[tokio::test]
510 async fn test_set_region_readonly_error() {
511 let mock_region_server = mock_region_server();
512 let region_id = RegionId::new(1024, 1);
513 let (mock_engine, _) =
514 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
515 region_engine.mock_role = Some(Some(RegionRole::Leader));
516 region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
517 error::UnexpectedSnafu {
518 violated: "Failed to set region to readonly",
519 }
520 .fail()
521 }));
522 });
523 mock_region_server.register_test_region(region_id, mock_engine);
524 let kv_backend = Arc::new(MemoryKvBackend::new());
525 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
526 let reply = DowngradeRegionsHandler
527 .handle(
528 &handler_context,
529 vec![DowngradeRegion {
530 region_id,
531 flush_timeout: None,
532 }],
533 )
534 .await;
535 let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
536 assert!(reply.exists);
537 assert!(
538 reply
539 .error
540 .as_ref()
541 .unwrap()
542 .contains("Failed to set region to readonly")
543 );
544 assert!(reply.last_entry_id.is_none());
545 }
546
547 #[tokio::test]
548 async fn test_downgrade_regions() {
549 common_telemetry::init_default_ut_logging();
550
551 let mut region_server = mock_region_server();
552 let kv_backend = Arc::new(MemoryKvBackend::new());
553 let heartbeat_handler =
554 RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend);
555 let mut engine_env = TestEnv::with_prefix("downgrade-regions").await;
556 let engine = engine_env.create_engine(MitoConfig::default()).await;
557 region_server.register_engine(Arc::new(engine.clone()));
558 let region_id = RegionId::new(1024, 1);
559 let region_id1 = RegionId::new(1024, 2);
560 let builder = CreateRequestBuilder::new();
561 let create_req = builder.build();
562 region_server
563 .handle_request(region_id, RegionRequest::Create(create_req))
564 .await
565 .unwrap();
566 let create_req1 = builder.build();
567 region_server
568 .handle_request(region_id1, RegionRequest::Create(create_req1))
569 .await
570 .unwrap();
571 let meta = MessageMeta::new_test(1, "test", "dn-1", "meta-0");
572 let instruction = Instruction::DowngradeRegions(vec![
573 DowngradeRegion {
574 region_id,
575 flush_timeout: Some(Duration::from_secs(1)),
576 },
577 DowngradeRegion {
578 region_id: region_id1,
579 flush_timeout: Some(Duration::from_secs(1)),
580 },
581 ]);
582 let mut heartbeat_env = HeartbeatResponseTestEnv::new();
583 let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
584 let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
585 assert_matches!(control, HandleControl::Continue);
586
587 let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
588 let reply = reply.expect_downgrade_regions_reply();
589 assert_eq!(reply[0].region_id, region_id);
590 assert!(reply[0].exists);
591 assert!(reply[0].error.is_none());
592 assert_eq!(reply[0].last_entry_id, Some(0));
593 assert_eq!(reply[1].region_id, region_id1);
594 assert!(reply[1].exists);
595 assert!(reply[1].error.is_none());
596 assert_eq!(reply[1].last_entry_id, Some(0));
597
598 assert_eq!(engine.role(region_id).unwrap(), RegionRole::Follower);
599 assert_eq!(engine.role(region_id1).unwrap(), RegionRole::Follower);
600 }
601}