datanode/heartbeat/handler/
downgrade_region.rs1use common_meta::instruction::{DowngradeRegion, DowngradeRegionReply, InstructionReply};
16use common_telemetry::tracing::info;
17use common_telemetry::{error, warn};
18use futures_util::future::BoxFuture;
19use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
20use store_api::region_request::{RegionFlushRequest, RegionRequest};
21use store_api::storage::RegionId;
22
23use crate::heartbeat::handler::HandlerContext;
24use crate::heartbeat::task_tracker::WaitResult;
25
26impl HandlerContext {
27 async fn downgrade_to_follower_gracefully(
28 &self,
29 region_id: RegionId,
30 ) -> Option<InstructionReply> {
31 match self
32 .region_server
33 .set_region_role_state_gracefully(region_id, SettableRegionRoleState::Follower)
34 .await
35 {
36 Ok(SetRegionRoleStateResponse::Success(success)) => {
37 Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
38 last_entry_id: success.last_entry_id(),
39 metadata_last_entry_id: success.metadata_last_entry_id(),
40 exists: true,
41 error: None,
42 }))
43 }
44 Ok(SetRegionRoleStateResponse::NotFound) => {
45 warn!("Region: {region_id} is not found");
46 Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
47 last_entry_id: None,
48 metadata_last_entry_id: None,
49 exists: false,
50 error: None,
51 }))
52 }
53 Ok(SetRegionRoleStateResponse::InvalidTransition(err)) => {
54 error!(err; "Failed to convert region to follower - invalid transition");
55 Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
56 last_entry_id: None,
57 metadata_last_entry_id: None,
58 exists: true,
59 error: Some(format!("{err:?}")),
60 }))
61 }
62 Err(err) => {
63 error!(err; "Failed to convert region to {}", SettableRegionRoleState::Follower);
64 Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
65 last_entry_id: None,
66 metadata_last_entry_id: None,
67 exists: true,
68 error: Some(format!("{err:?}")),
69 }))
70 }
71 }
72 }
73
74 pub(crate) fn handle_downgrade_region_instruction(
75 self,
76 DowngradeRegion {
77 region_id,
78 flush_timeout,
79 }: DowngradeRegion,
80 ) -> BoxFuture<'static, Option<InstructionReply>> {
81 Box::pin(async move {
82 let Some(writable) = self.region_server.is_region_leader(region_id) else {
83 warn!("Region: {region_id} is not found");
84 return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
85 last_entry_id: None,
86 metadata_last_entry_id: None,
87 exists: false,
88 error: None,
89 }));
90 };
91
92 let region_server_moved = self.region_server.clone();
93
94 if !writable {
96 warn!(
97 "Region: {region_id} is not writable, flush_timeout: {:?}",
98 flush_timeout
99 );
100 return self.downgrade_to_follower_gracefully(region_id).await;
101 }
102
103 let Some(flush_timeout) = flush_timeout else {
105 return self.downgrade_to_follower_gracefully(region_id).await;
106 };
107
108 match self
112 .region_server
113 .set_region_role_state_gracefully(
114 region_id,
115 SettableRegionRoleState::DowngradingLeader,
116 )
117 .await
118 {
119 Ok(SetRegionRoleStateResponse::Success { .. }) => {}
120 Ok(SetRegionRoleStateResponse::NotFound) => {
121 warn!("Region: {region_id} is not found");
122 return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
123 last_entry_id: None,
124 metadata_last_entry_id: None,
125 exists: false,
126 error: None,
127 }));
128 }
129 Ok(SetRegionRoleStateResponse::InvalidTransition(err)) => {
130 error!(err; "Failed to convert region to downgrading leader - invalid transition");
131 return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
132 last_entry_id: None,
133 metadata_last_entry_id: None,
134 exists: true,
135 error: Some(format!("{err:?}")),
136 }));
137 }
138 Err(err) => {
139 error!(err; "Failed to convert region to downgrading leader");
140 return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
141 last_entry_id: None,
142 metadata_last_entry_id: None,
143 exists: true,
144 error: Some(format!("{err:?}")),
145 }));
146 }
147 }
148
149 let register_result = self
150 .downgrade_tasks
151 .try_register(
152 region_id,
153 Box::pin(async move {
154 info!("Flush region: {region_id} before converting region to follower");
155 region_server_moved
156 .handle_request(
157 region_id,
158 RegionRequest::Flush(RegionFlushRequest {
159 row_group_size: None,
160 }),
161 )
162 .await?;
163
164 Ok(())
165 }),
166 )
167 .await;
168
169 if register_result.is_busy() {
170 warn!("Another flush task is running for the region: {region_id}");
171 }
172
173 let mut watcher = register_result.into_watcher();
174 let result = self.downgrade_tasks.wait(&mut watcher, flush_timeout).await;
175
176 match result {
177 WaitResult::Timeout => {
178 Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
179 last_entry_id: None,
180 metadata_last_entry_id: None,
181 exists: true,
182 error: Some(format!(
183 "Flush region timeout, region: {region_id}, timeout: {:?}",
184 flush_timeout
185 )),
186 }))
187 }
188 WaitResult::Finish(Ok(_)) => self.downgrade_to_follower_gracefully(region_id).await,
189 WaitResult::Finish(Err(err)) => {
190 Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
191 last_entry_id: None,
192 metadata_last_entry_id: None,
193 exists: true,
194 error: Some(format!("{err:?}")),
195 }))
196 }
197 }
198 })
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use std::assert_matches::assert_matches;
205 use std::time::Duration;
206
207 use common_meta::instruction::{DowngradeRegion, InstructionReply};
208 use mito2::engine::MITO_ENGINE_NAME;
209 use store_api::region_engine::{
210 RegionRole, SetRegionRoleStateResponse, SetRegionRoleStateSuccess,
211 };
212 use store_api::region_request::RegionRequest;
213 use store_api::storage::RegionId;
214 use tokio::time::Instant;
215
216 use crate::error;
217 use crate::heartbeat::handler::HandlerContext;
218 use crate::tests::{mock_region_server, MockRegionEngine};
219
220 #[tokio::test]
221 async fn test_region_not_exist() {
222 let mut mock_region_server = mock_region_server();
223 let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
224 mock_region_server.register_engine(mock_engine);
225 let handler_context = HandlerContext::new_for_test(mock_region_server);
226 let region_id = RegionId::new(1024, 1);
227 let waits = vec![None, Some(Duration::from_millis(100u64))];
228
229 for flush_timeout in waits {
230 let reply = handler_context
231 .clone()
232 .handle_downgrade_region_instruction(DowngradeRegion {
233 region_id,
234 flush_timeout,
235 })
236 .await;
237 assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
238
239 if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
240 assert!(!reply.exists);
241 assert!(reply.error.is_none());
242 assert!(reply.last_entry_id.is_none());
243 }
244 }
245 }
246
247 #[tokio::test]
248 async fn test_region_readonly() {
249 let mock_region_server = mock_region_server();
250 let region_id = RegionId::new(1024, 1);
251 let (mock_engine, _) =
252 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
253 region_engine.mock_role = Some(Some(RegionRole::Follower));
254 region_engine.handle_request_mock_fn = Some(Box::new(|_, req| {
255 if let RegionRequest::Flush(_) = req {
256 unreachable!();
258 };
259
260 Ok(0)
261 }));
262 region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
263 Ok(SetRegionRoleStateResponse::success(
264 SetRegionRoleStateSuccess::mito(1024),
265 ))
266 }))
267 });
268 mock_region_server.register_test_region(region_id, mock_engine);
269 let handler_context = HandlerContext::new_for_test(mock_region_server);
270
271 let waits = vec![None, Some(Duration::from_millis(100u64))];
272 for flush_timeout in waits {
273 let reply = handler_context
274 .clone()
275 .handle_downgrade_region_instruction(DowngradeRegion {
276 region_id,
277 flush_timeout,
278 })
279 .await;
280 assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
281
282 if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
283 assert!(reply.exists);
284 assert!(reply.error.is_none());
285 assert_eq!(reply.last_entry_id.unwrap(), 1024);
286 }
287 }
288 }
289
290 #[tokio::test]
291 async fn test_region_flush_timeout() {
292 let mock_region_server = mock_region_server();
293 let region_id = RegionId::new(1024, 1);
294 let (mock_engine, _) =
295 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
296 region_engine.mock_role = Some(Some(RegionRole::Leader));
297 region_engine.handle_request_delay = Some(Duration::from_secs(100));
298 region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
299 Ok(SetRegionRoleStateResponse::success(
300 SetRegionRoleStateSuccess::mito(1024),
301 ))
302 }))
303 });
304 mock_region_server.register_test_region(region_id, mock_engine);
305 let handler_context = HandlerContext::new_for_test(mock_region_server);
306
307 let flush_timeout = Duration::from_millis(100);
308 let reply = handler_context
309 .clone()
310 .handle_downgrade_region_instruction(DowngradeRegion {
311 region_id,
312 flush_timeout: Some(flush_timeout),
313 })
314 .await;
315 assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
316
317 if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
318 assert!(reply.exists);
319 assert!(reply.error.unwrap().contains("timeout"));
320 assert!(reply.last_entry_id.is_none());
321 }
322 }
323
324 #[tokio::test]
325 async fn test_region_flush_timeout_and_retry() {
326 let mock_region_server = mock_region_server();
327 let region_id = RegionId::new(1024, 1);
328 let (mock_engine, _) =
329 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
330 region_engine.mock_role = Some(Some(RegionRole::Leader));
331 region_engine.handle_request_delay = Some(Duration::from_millis(300));
332 region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
333 Ok(SetRegionRoleStateResponse::success(
334 SetRegionRoleStateSuccess::mito(1024),
335 ))
336 }))
337 });
338 mock_region_server.register_test_region(region_id, mock_engine);
339 let handler_context = HandlerContext::new_for_test(mock_region_server);
340
341 let waits = vec![
342 Some(Duration::from_millis(100u64)),
343 Some(Duration::from_millis(100u64)),
344 ];
345
346 for flush_timeout in waits {
347 let reply = handler_context
348 .clone()
349 .handle_downgrade_region_instruction(DowngradeRegion {
350 region_id,
351 flush_timeout,
352 })
353 .await;
354 assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
355 if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
356 assert!(reply.exists);
357 assert!(reply.error.unwrap().contains("timeout"));
358 assert!(reply.last_entry_id.is_none());
359 }
360 }
361 let timer = Instant::now();
362 let reply = handler_context
363 .handle_downgrade_region_instruction(DowngradeRegion {
364 region_id,
365 flush_timeout: Some(Duration::from_millis(500)),
366 })
367 .await;
368 assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
369 assert!(timer.elapsed().as_millis() < 300);
371
372 if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
373 assert!(reply.exists);
374 assert!(reply.error.is_none());
375 assert_eq!(reply.last_entry_id.unwrap(), 1024);
376 }
377 }
378
379 #[tokio::test]
380 async fn test_region_flush_timeout_and_retry_error() {
381 let mock_region_server = mock_region_server();
382 let region_id = RegionId::new(1024, 1);
383 let (mock_engine, _) =
384 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
385 region_engine.mock_role = Some(Some(RegionRole::Leader));
386 region_engine.handle_request_delay = Some(Duration::from_millis(300));
387 region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
388 error::UnexpectedSnafu {
389 violated: "mock flush failed",
390 }
391 .fail()
392 }));
393 region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
394 Ok(SetRegionRoleStateResponse::success(
395 SetRegionRoleStateSuccess::mito(1024),
396 ))
397 }))
398 });
399 mock_region_server.register_test_region(region_id, mock_engine);
400 let handler_context = HandlerContext::new_for_test(mock_region_server);
401
402 let waits = vec![
403 Some(Duration::from_millis(100u64)),
404 Some(Duration::from_millis(100u64)),
405 ];
406
407 for flush_timeout in waits {
408 let reply = handler_context
409 .clone()
410 .handle_downgrade_region_instruction(DowngradeRegion {
411 region_id,
412 flush_timeout,
413 })
414 .await;
415 assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
416 if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
417 assert!(reply.exists);
418 assert!(reply.error.unwrap().contains("timeout"));
419 assert!(reply.last_entry_id.is_none());
420 }
421 }
422 let timer = Instant::now();
423 let reply = handler_context
424 .handle_downgrade_region_instruction(DowngradeRegion {
425 region_id,
426 flush_timeout: Some(Duration::from_millis(500)),
427 })
428 .await;
429 assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
430 assert!(timer.elapsed().as_millis() < 300);
432
433 if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
434 assert!(reply.exists);
435 assert!(reply.error.unwrap().contains("flush failed"));
436 assert!(reply.last_entry_id.is_none());
437 }
438 }
439
440 #[tokio::test]
441 async fn test_set_region_readonly_not_found() {
442 let mock_region_server = mock_region_server();
443 let region_id = RegionId::new(1024, 1);
444 let (mock_engine, _) =
445 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
446 region_engine.mock_role = Some(Some(RegionRole::Leader));
447 region_engine.handle_set_readonly_gracefully_mock_fn =
448 Some(Box::new(|_| Ok(SetRegionRoleStateResponse::NotFound)));
449 });
450 mock_region_server.register_test_region(region_id, mock_engine);
451 let handler_context = HandlerContext::new_for_test(mock_region_server);
452 let reply = handler_context
453 .clone()
454 .handle_downgrade_region_instruction(DowngradeRegion {
455 region_id,
456 flush_timeout: None,
457 })
458 .await;
459 assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
460 if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
461 assert!(!reply.exists);
462 assert!(reply.error.is_none());
463 assert!(reply.last_entry_id.is_none());
464 }
465 }
466
467 #[tokio::test]
468 async fn test_set_region_readonly_error() {
469 let mock_region_server = mock_region_server();
470 let region_id = RegionId::new(1024, 1);
471 let (mock_engine, _) =
472 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
473 region_engine.mock_role = Some(Some(RegionRole::Leader));
474 region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
475 error::UnexpectedSnafu {
476 violated: "Failed to set region to readonly",
477 }
478 .fail()
479 }));
480 });
481 mock_region_server.register_test_region(region_id, mock_engine);
482 let handler_context = HandlerContext::new_for_test(mock_region_server);
483 let reply = handler_context
484 .clone()
485 .handle_downgrade_region_instruction(DowngradeRegion {
486 region_id,
487 flush_timeout: None,
488 })
489 .await;
490 assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
491 if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
492 assert!(reply.exists);
493 assert!(reply
494 .error
495 .unwrap()
496 .contains("Failed to set region to readonly"));
497 assert!(reply.last_entry_id.is_none());
498 }
499 }
500}