datanode/heartbeat/handler/
downgrade_region.rs1use common_meta::instruction::{DowngradeRegion, DowngradeRegionReply, InstructionReply};
16use common_telemetry::tracing::info;
17use common_telemetry::{error, warn};
18use futures_util::future::BoxFuture;
19use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
20use store_api::region_request::{RegionFlushRequest, RegionRequest};
21use store_api::storage::RegionId;
22
23use crate::heartbeat::handler::HandlerContext;
24use crate::heartbeat::task_tracker::WaitResult;
25
26impl HandlerContext {
27 async fn downgrade_to_follower_gracefully(
28 &self,
29 region_id: RegionId,
30 ) -> Option<InstructionReply> {
31 match self
32 .region_server
33 .set_region_role_state_gracefully(region_id, SettableRegionRoleState::Follower)
34 .await
35 {
36 Ok(SetRegionRoleStateResponse::Success(success)) => {
37 Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
38 last_entry_id: success.last_entry_id(),
39 metadata_last_entry_id: success.metadata_last_entry_id(),
40 exists: true,
41 error: None,
42 }))
43 }
44 Ok(SetRegionRoleStateResponse::NotFound) => {
45 warn!("Region: {region_id} is not found");
46 Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
47 last_entry_id: None,
48 metadata_last_entry_id: None,
49 exists: false,
50 error: None,
51 }))
52 }
53 Err(err) => {
54 error!(err; "Failed to convert region to {}", SettableRegionRoleState::Follower);
55 Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
56 last_entry_id: None,
57 metadata_last_entry_id: None,
58 exists: true,
59 error: Some(format!("{err:?}")),
60 }))
61 }
62 }
63 }
64
65 pub(crate) fn handle_downgrade_region_instruction(
66 self,
67 DowngradeRegion {
68 region_id,
69 flush_timeout,
70 }: DowngradeRegion,
71 ) -> BoxFuture<'static, Option<InstructionReply>> {
72 Box::pin(async move {
73 let Some(writable) = self.region_server.is_region_leader(region_id) else {
74 warn!("Region: {region_id} is not found");
75 return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
76 last_entry_id: None,
77 metadata_last_entry_id: None,
78 exists: false,
79 error: None,
80 }));
81 };
82
83 let region_server_moved = self.region_server.clone();
84
85 if !writable {
87 warn!(
88 "Region: {region_id} is not writable, flush_timeout: {:?}",
89 flush_timeout
90 );
91 return self.downgrade_to_follower_gracefully(region_id).await;
92 }
93
94 let Some(flush_timeout) = flush_timeout else {
96 return self.downgrade_to_follower_gracefully(region_id).await;
97 };
98
99 match self
103 .region_server
104 .set_region_role_state_gracefully(
105 region_id,
106 SettableRegionRoleState::DowngradingLeader,
107 )
108 .await
109 {
110 Ok(SetRegionRoleStateResponse::Success { .. }) => {}
111 Ok(SetRegionRoleStateResponse::NotFound) => {
112 warn!("Region: {region_id} is not found");
113 return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
114 last_entry_id: None,
115 metadata_last_entry_id: None,
116 exists: false,
117 error: None,
118 }));
119 }
120 Err(err) => {
121 error!(err; "Failed to convert region to downgrading leader");
122 return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
123 last_entry_id: None,
124 metadata_last_entry_id: None,
125 exists: true,
126 error: Some(format!("{err:?}")),
127 }));
128 }
129 }
130
131 let register_result = self
132 .downgrade_tasks
133 .try_register(
134 region_id,
135 Box::pin(async move {
136 info!("Flush region: {region_id} before converting region to follower");
137 region_server_moved
138 .handle_request(
139 region_id,
140 RegionRequest::Flush(RegionFlushRequest {
141 row_group_size: None,
142 }),
143 )
144 .await?;
145
146 Ok(())
147 }),
148 )
149 .await;
150
151 if register_result.is_busy() {
152 warn!("Another flush task is running for the region: {region_id}");
153 }
154
155 let mut watcher = register_result.into_watcher();
156 let result = self.downgrade_tasks.wait(&mut watcher, flush_timeout).await;
157
158 match result {
159 WaitResult::Timeout => {
160 Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
161 last_entry_id: None,
162 metadata_last_entry_id: None,
163 exists: true,
164 error: Some(format!(
165 "Flush region timeout, region: {region_id}, timeout: {:?}",
166 flush_timeout
167 )),
168 }))
169 }
170 WaitResult::Finish(Ok(_)) => self.downgrade_to_follower_gracefully(region_id).await,
171 WaitResult::Finish(Err(err)) => {
172 Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
173 last_entry_id: None,
174 metadata_last_entry_id: None,
175 exists: true,
176 error: Some(format!("{err:?}")),
177 }))
178 }
179 }
180 })
181 }
182}
183
184#[cfg(test)]
185mod tests {
186 use std::assert_matches::assert_matches;
187 use std::time::Duration;
188
189 use common_meta::instruction::{DowngradeRegion, InstructionReply};
190 use mito2::engine::MITO_ENGINE_NAME;
191 use store_api::region_engine::{
192 RegionRole, SetRegionRoleStateResponse, SetRegionRoleStateSuccess,
193 };
194 use store_api::region_request::RegionRequest;
195 use store_api::storage::RegionId;
196 use tokio::time::Instant;
197
198 use crate::error;
199 use crate::heartbeat::handler::HandlerContext;
200 use crate::tests::{mock_region_server, MockRegionEngine};
201
202 #[tokio::test]
203 async fn test_region_not_exist() {
204 let mut mock_region_server = mock_region_server();
205 let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
206 mock_region_server.register_engine(mock_engine);
207 let handler_context = HandlerContext::new_for_test(mock_region_server);
208 let region_id = RegionId::new(1024, 1);
209 let waits = vec![None, Some(Duration::from_millis(100u64))];
210
211 for flush_timeout in waits {
212 let reply = handler_context
213 .clone()
214 .handle_downgrade_region_instruction(DowngradeRegion {
215 region_id,
216 flush_timeout,
217 })
218 .await;
219 assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
220
221 if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
222 assert!(!reply.exists);
223 assert!(reply.error.is_none());
224 assert!(reply.last_entry_id.is_none());
225 }
226 }
227 }
228
229 #[tokio::test]
230 async fn test_region_readonly() {
231 let mock_region_server = mock_region_server();
232 let region_id = RegionId::new(1024, 1);
233 let (mock_engine, _) =
234 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
235 region_engine.mock_role = Some(Some(RegionRole::Follower));
236 region_engine.handle_request_mock_fn = Some(Box::new(|_, req| {
237 if let RegionRequest::Flush(_) = req {
238 unreachable!();
240 };
241
242 Ok(0)
243 }));
244 region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
245 Ok(SetRegionRoleStateResponse::success(
246 SetRegionRoleStateSuccess::mito(1024),
247 ))
248 }))
249 });
250 mock_region_server.register_test_region(region_id, mock_engine);
251 let handler_context = HandlerContext::new_for_test(mock_region_server);
252
253 let waits = vec![None, Some(Duration::from_millis(100u64))];
254 for flush_timeout in waits {
255 let reply = handler_context
256 .clone()
257 .handle_downgrade_region_instruction(DowngradeRegion {
258 region_id,
259 flush_timeout,
260 })
261 .await;
262 assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
263
264 if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
265 assert!(reply.exists);
266 assert!(reply.error.is_none());
267 assert_eq!(reply.last_entry_id.unwrap(), 1024);
268 }
269 }
270 }
271
272 #[tokio::test]
273 async fn test_region_flush_timeout() {
274 let mock_region_server = mock_region_server();
275 let region_id = RegionId::new(1024, 1);
276 let (mock_engine, _) =
277 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
278 region_engine.mock_role = Some(Some(RegionRole::Leader));
279 region_engine.handle_request_delay = Some(Duration::from_secs(100));
280 region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
281 Ok(SetRegionRoleStateResponse::success(
282 SetRegionRoleStateSuccess::mito(1024),
283 ))
284 }))
285 });
286 mock_region_server.register_test_region(region_id, mock_engine);
287 let handler_context = HandlerContext::new_for_test(mock_region_server);
288
289 let flush_timeout = Duration::from_millis(100);
290 let reply = handler_context
291 .clone()
292 .handle_downgrade_region_instruction(DowngradeRegion {
293 region_id,
294 flush_timeout: Some(flush_timeout),
295 })
296 .await;
297 assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
298
299 if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
300 assert!(reply.exists);
301 assert!(reply.error.unwrap().contains("timeout"));
302 assert!(reply.last_entry_id.is_none());
303 }
304 }
305
306 #[tokio::test]
307 async fn test_region_flush_timeout_and_retry() {
308 let mock_region_server = mock_region_server();
309 let region_id = RegionId::new(1024, 1);
310 let (mock_engine, _) =
311 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
312 region_engine.mock_role = Some(Some(RegionRole::Leader));
313 region_engine.handle_request_delay = Some(Duration::from_millis(300));
314 region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
315 Ok(SetRegionRoleStateResponse::success(
316 SetRegionRoleStateSuccess::mito(1024),
317 ))
318 }))
319 });
320 mock_region_server.register_test_region(region_id, mock_engine);
321 let handler_context = HandlerContext::new_for_test(mock_region_server);
322
323 let waits = vec![
324 Some(Duration::from_millis(100u64)),
325 Some(Duration::from_millis(100u64)),
326 ];
327
328 for flush_timeout in waits {
329 let reply = handler_context
330 .clone()
331 .handle_downgrade_region_instruction(DowngradeRegion {
332 region_id,
333 flush_timeout,
334 })
335 .await;
336 assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
337 if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
338 assert!(reply.exists);
339 assert!(reply.error.unwrap().contains("timeout"));
340 assert!(reply.last_entry_id.is_none());
341 }
342 }
343 let timer = Instant::now();
344 let reply = handler_context
345 .handle_downgrade_region_instruction(DowngradeRegion {
346 region_id,
347 flush_timeout: Some(Duration::from_millis(500)),
348 })
349 .await;
350 assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
351 assert!(timer.elapsed().as_millis() < 300);
353
354 if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
355 assert!(reply.exists);
356 assert!(reply.error.is_none());
357 assert_eq!(reply.last_entry_id.unwrap(), 1024);
358 }
359 }
360
361 #[tokio::test]
362 async fn test_region_flush_timeout_and_retry_error() {
363 let mock_region_server = mock_region_server();
364 let region_id = RegionId::new(1024, 1);
365 let (mock_engine, _) =
366 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
367 region_engine.mock_role = Some(Some(RegionRole::Leader));
368 region_engine.handle_request_delay = Some(Duration::from_millis(300));
369 region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
370 error::UnexpectedSnafu {
371 violated: "mock flush failed",
372 }
373 .fail()
374 }));
375 region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
376 Ok(SetRegionRoleStateResponse::success(
377 SetRegionRoleStateSuccess::mito(1024),
378 ))
379 }))
380 });
381 mock_region_server.register_test_region(region_id, mock_engine);
382 let handler_context = HandlerContext::new_for_test(mock_region_server);
383
384 let waits = vec![
385 Some(Duration::from_millis(100u64)),
386 Some(Duration::from_millis(100u64)),
387 ];
388
389 for flush_timeout in waits {
390 let reply = handler_context
391 .clone()
392 .handle_downgrade_region_instruction(DowngradeRegion {
393 region_id,
394 flush_timeout,
395 })
396 .await;
397 assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
398 if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
399 assert!(reply.exists);
400 assert!(reply.error.unwrap().contains("timeout"));
401 assert!(reply.last_entry_id.is_none());
402 }
403 }
404 let timer = Instant::now();
405 let reply = handler_context
406 .handle_downgrade_region_instruction(DowngradeRegion {
407 region_id,
408 flush_timeout: Some(Duration::from_millis(500)),
409 })
410 .await;
411 assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
412 assert!(timer.elapsed().as_millis() < 300);
414
415 if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
416 assert!(reply.exists);
417 assert!(reply.error.unwrap().contains("flush failed"));
418 assert!(reply.last_entry_id.is_none());
419 }
420 }
421
422 #[tokio::test]
423 async fn test_set_region_readonly_not_found() {
424 let mock_region_server = mock_region_server();
425 let region_id = RegionId::new(1024, 1);
426 let (mock_engine, _) =
427 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
428 region_engine.mock_role = Some(Some(RegionRole::Leader));
429 region_engine.handle_set_readonly_gracefully_mock_fn =
430 Some(Box::new(|_| Ok(SetRegionRoleStateResponse::NotFound)));
431 });
432 mock_region_server.register_test_region(region_id, mock_engine);
433 let handler_context = HandlerContext::new_for_test(mock_region_server);
434 let reply = handler_context
435 .clone()
436 .handle_downgrade_region_instruction(DowngradeRegion {
437 region_id,
438 flush_timeout: None,
439 })
440 .await;
441 assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
442 if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
443 assert!(!reply.exists);
444 assert!(reply.error.is_none());
445 assert!(reply.last_entry_id.is_none());
446 }
447 }
448
449 #[tokio::test]
450 async fn test_set_region_readonly_error() {
451 let mock_region_server = mock_region_server();
452 let region_id = RegionId::new(1024, 1);
453 let (mock_engine, _) =
454 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
455 region_engine.mock_role = Some(Some(RegionRole::Leader));
456 region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
457 error::UnexpectedSnafu {
458 violated: "Failed to set region to readonly",
459 }
460 .fail()
461 }));
462 });
463 mock_region_server.register_test_region(region_id, mock_engine);
464 let handler_context = HandlerContext::new_for_test(mock_region_server);
465 let reply = handler_context
466 .clone()
467 .handle_downgrade_region_instruction(DowngradeRegion {
468 region_id,
469 flush_timeout: None,
470 })
471 .await;
472 assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
473 if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
474 assert!(reply.exists);
475 assert!(reply
476 .error
477 .unwrap()
478 .contains("Failed to set region to readonly"));
479 assert!(reply.last_entry_id.is_none());
480 }
481 }
482}