datanode/heartbeat/handler/
downgrade_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_meta::instruction::{
16    DowngradeRegion, DowngradeRegionReply, DowngradeRegionsReply, Instruction, InstructionReply,
17};
18use common_telemetry::tracing::info;
19use common_telemetry::{error, warn};
20use futures::future::join_all;
21use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
22use store_api::region_request::{RegionFlushRequest, RegionRequest};
23use store_api::storage::RegionId;
24
25use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
26use crate::heartbeat::task_tracker::WaitResult;
27
28#[derive(Debug, Clone, Copy, Default)]
29pub struct DowngradeRegionsHandler;
30
31impl DowngradeRegionsHandler {
32    async fn handle_downgrade_region(
33        ctx: &HandlerContext,
34        DowngradeRegion {
35            region_id,
36            flush_timeout,
37        }: DowngradeRegion,
38    ) -> DowngradeRegionReply {
39        let Some(writable) = ctx.region_server.is_region_leader(region_id) else {
40            warn!("Region: {region_id} is not found");
41            return DowngradeRegionReply {
42                region_id,
43                last_entry_id: None,
44                metadata_last_entry_id: None,
45                exists: false,
46                error: None,
47            };
48        };
49
50        let region_server_moved = ctx.region_server.clone();
51
52        // Ignores flush request
53        if !writable {
54            warn!(
55                "Region: {region_id} is not writable, flush_timeout: {:?}",
56                flush_timeout
57            );
58            return ctx.downgrade_to_follower_gracefully(region_id).await;
59        }
60
61        // If flush_timeout is not set, directly convert region to follower.
62        let Some(flush_timeout) = flush_timeout else {
63            return ctx.downgrade_to_follower_gracefully(region_id).await;
64        };
65
66        // Sets region to downgrading,
67        // the downgrading region will reject all write requests.
68        // However, the downgrading region will still accept read, flush requests.
69        match ctx
70            .region_server
71            .set_region_role_state_gracefully(region_id, SettableRegionRoleState::DowngradingLeader)
72            .await
73        {
74            Ok(SetRegionRoleStateResponse::Success { .. }) => {}
75            Ok(SetRegionRoleStateResponse::NotFound) => {
76                warn!("Region: {region_id} is not found");
77                return DowngradeRegionReply {
78                    region_id,
79                    last_entry_id: None,
80                    metadata_last_entry_id: None,
81                    exists: false,
82                    error: None,
83                };
84            }
85            Ok(SetRegionRoleStateResponse::InvalidTransition(err)) => {
86                error!(err; "Failed to convert region to downgrading leader - invalid transition");
87                return DowngradeRegionReply {
88                    region_id,
89                    last_entry_id: None,
90                    metadata_last_entry_id: None,
91                    exists: true,
92                    error: Some(format!("{err:?}")),
93                };
94            }
95            Err(err) => {
96                error!(err; "Failed to convert region to downgrading leader");
97                return DowngradeRegionReply {
98                    region_id,
99                    last_entry_id: None,
100                    metadata_last_entry_id: None,
101                    exists: true,
102                    error: Some(format!("{err:?}")),
103                };
104            }
105        }
106
107        let register_result = ctx
108            .downgrade_tasks
109            .try_register(
110                region_id,
111                Box::pin(async move {
112                    info!("Flush region: {region_id} before converting region to follower");
113                    region_server_moved
114                        .handle_request(
115                            region_id,
116                            RegionRequest::Flush(RegionFlushRequest {
117                                row_group_size: None,
118                            }),
119                        )
120                        .await?;
121
122                    Ok(())
123                }),
124            )
125            .await;
126
127        if register_result.is_busy() {
128            warn!("Another flush task is running for the region: {region_id}");
129        }
130
131        let mut watcher = register_result.into_watcher();
132        let result = ctx.downgrade_tasks.wait(&mut watcher, flush_timeout).await;
133
134        match result {
135            WaitResult::Timeout => DowngradeRegionReply {
136                region_id,
137                last_entry_id: None,
138                metadata_last_entry_id: None,
139                exists: true,
140                error: Some(format!(
141                    "Flush region timeout, region: {region_id}, timeout: {:?}",
142                    flush_timeout
143                )),
144            },
145            WaitResult::Finish(Ok(_)) => ctx.downgrade_to_follower_gracefully(region_id).await,
146            WaitResult::Finish(Err(err)) => DowngradeRegionReply {
147                region_id,
148                last_entry_id: None,
149                metadata_last_entry_id: None,
150                exists: true,
151                error: Some(format!("{err:?}")),
152            },
153        }
154    }
155}
156
157#[async_trait::async_trait]
158impl InstructionHandler for DowngradeRegionsHandler {
159    async fn handle(
160        &self,
161        ctx: &HandlerContext,
162        instruction: Instruction,
163    ) -> Option<InstructionReply> {
164        // Safety: must be `Instruction::DowngradeRegion` instruction.
165        let downgrade_regions = instruction.into_downgrade_regions().unwrap();
166        let futures = downgrade_regions
167            .into_iter()
168            .map(|downgrade_region| Self::handle_downgrade_region(ctx, downgrade_region));
169        // Join all futures; parallelism is governed by the underlying flush scheduler.
170        let results = join_all(futures).await;
171
172        Some(InstructionReply::DowngradeRegions(
173            DowngradeRegionsReply::new(results),
174        ))
175    }
176}
177
178impl HandlerContext {
179    async fn downgrade_to_follower_gracefully(&self, region_id: RegionId) -> DowngradeRegionReply {
180        match self
181            .region_server
182            .set_region_role_state_gracefully(region_id, SettableRegionRoleState::Follower)
183            .await
184        {
185            Ok(SetRegionRoleStateResponse::Success(success)) => DowngradeRegionReply {
186                region_id,
187                last_entry_id: success.last_entry_id(),
188                metadata_last_entry_id: success.metadata_last_entry_id(),
189                exists: true,
190                error: None,
191            },
192            Ok(SetRegionRoleStateResponse::NotFound) => {
193                warn!("Region: {region_id} is not found");
194                DowngradeRegionReply {
195                    region_id,
196                    last_entry_id: None,
197                    metadata_last_entry_id: None,
198                    exists: false,
199                    error: None,
200                }
201            }
202            Ok(SetRegionRoleStateResponse::InvalidTransition(err)) => {
203                error!(err; "Failed to convert region to follower - invalid transition");
204                DowngradeRegionReply {
205                    region_id,
206                    last_entry_id: None,
207                    metadata_last_entry_id: None,
208                    exists: true,
209                    error: Some(format!("{err:?}")),
210                }
211            }
212            Err(err) => {
213                error!(err; "Failed to convert region to {}", SettableRegionRoleState::Follower);
214                DowngradeRegionReply {
215                    region_id,
216                    last_entry_id: None,
217                    metadata_last_entry_id: None,
218                    exists: true,
219                    error: Some(format!("{err:?}")),
220                }
221            }
222        }
223    }
224}
225
226#[cfg(test)]
227mod tests {
228    use std::assert_matches::assert_matches;
229    use std::sync::Arc;
230    use std::time::Duration;
231
232    use common_meta::heartbeat::handler::{HandleControl, HeartbeatResponseHandler};
233    use common_meta::heartbeat::mailbox::MessageMeta;
234    use common_meta::instruction::{DowngradeRegion, Instruction};
235    use mito2::config::MitoConfig;
236    use mito2::engine::MITO_ENGINE_NAME;
237    use mito2::test_util::{CreateRequestBuilder, TestEnv};
238    use store_api::region_engine::{
239        RegionEngine, RegionRole, SetRegionRoleStateResponse, SetRegionRoleStateSuccess,
240    };
241    use store_api::region_request::RegionRequest;
242    use store_api::storage::RegionId;
243    use tokio::time::Instant;
244
245    use crate::error;
246    use crate::heartbeat::handler::downgrade_region::DowngradeRegionsHandler;
247    use crate::heartbeat::handler::tests::HeartbeatResponseTestEnv;
248    use crate::heartbeat::handler::{
249        HandlerContext, InstructionHandler, RegionHeartbeatResponseHandler,
250    };
251    use crate::tests::{MockRegionEngine, mock_region_server};
252
253    #[tokio::test]
254    async fn test_region_not_exist() {
255        let mut mock_region_server = mock_region_server();
256        let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
257        mock_region_server.register_engine(mock_engine);
258        let handler_context = HandlerContext::new_for_test(mock_region_server);
259        let region_id = RegionId::new(1024, 1);
260        let waits = vec![None, Some(Duration::from_millis(100u64))];
261
262        for flush_timeout in waits {
263            let reply = DowngradeRegionsHandler
264                .handle(
265                    &handler_context,
266                    Instruction::DowngradeRegions(vec![DowngradeRegion {
267                        region_id,
268                        flush_timeout,
269                    }]),
270                )
271                .await;
272
273            let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
274            assert!(!reply.exists);
275            assert!(reply.error.is_none());
276            assert!(reply.last_entry_id.is_none());
277        }
278    }
279
280    #[tokio::test]
281    async fn test_region_readonly() {
282        let mock_region_server = mock_region_server();
283        let region_id = RegionId::new(1024, 1);
284        let (mock_engine, _) =
285            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
286                region_engine.mock_role = Some(Some(RegionRole::Follower));
287                region_engine.handle_request_mock_fn = Some(Box::new(|_, req| {
288                    if let RegionRequest::Flush(_) = req {
289                        // Should be unreachable.
290                        unreachable!();
291                    };
292
293                    Ok(0)
294                }));
295                region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
296                    Ok(SetRegionRoleStateResponse::success(
297                        SetRegionRoleStateSuccess::mito(1024),
298                    ))
299                }))
300            });
301        mock_region_server.register_test_region(region_id, mock_engine);
302        let handler_context = HandlerContext::new_for_test(mock_region_server);
303
304        let waits = vec![None, Some(Duration::from_millis(100u64))];
305        for flush_timeout in waits {
306            let reply = DowngradeRegionsHandler
307                .handle(
308                    &handler_context,
309                    Instruction::DowngradeRegions(vec![DowngradeRegion {
310                        region_id,
311                        flush_timeout,
312                    }]),
313                )
314                .await;
315
316            let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
317            assert!(reply.exists);
318            assert!(reply.error.is_none());
319            assert_eq!(reply.last_entry_id.unwrap(), 1024);
320        }
321    }
322
323    #[tokio::test]
324    async fn test_region_flush_timeout() {
325        let mock_region_server = mock_region_server();
326        let region_id = RegionId::new(1024, 1);
327        let (mock_engine, _) =
328            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
329                region_engine.mock_role = Some(Some(RegionRole::Leader));
330                region_engine.handle_request_delay = Some(Duration::from_secs(100));
331                region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
332                    Ok(SetRegionRoleStateResponse::success(
333                        SetRegionRoleStateSuccess::mito(1024),
334                    ))
335                }))
336            });
337        mock_region_server.register_test_region(region_id, mock_engine);
338        let handler_context = HandlerContext::new_for_test(mock_region_server);
339
340        let flush_timeout = Duration::from_millis(100);
341        let reply = DowngradeRegionsHandler
342            .handle(
343                &handler_context,
344                Instruction::DowngradeRegions(vec![DowngradeRegion {
345                    region_id,
346                    flush_timeout: Some(flush_timeout),
347                }]),
348            )
349            .await;
350
351        let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
352        assert!(reply.exists);
353        assert!(reply.error.as_ref().unwrap().contains("timeout"));
354        assert!(reply.last_entry_id.is_none());
355    }
356
357    #[tokio::test]
358    async fn test_region_flush_timeout_and_retry() {
359        let mock_region_server = mock_region_server();
360        let region_id = RegionId::new(1024, 1);
361        let (mock_engine, _) =
362            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
363                region_engine.mock_role = Some(Some(RegionRole::Leader));
364                region_engine.handle_request_delay = Some(Duration::from_millis(300));
365                region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
366                    Ok(SetRegionRoleStateResponse::success(
367                        SetRegionRoleStateSuccess::mito(1024),
368                    ))
369                }))
370            });
371        mock_region_server.register_test_region(region_id, mock_engine);
372        let handler_context = HandlerContext::new_for_test(mock_region_server);
373
374        let waits = vec![
375            Some(Duration::from_millis(100u64)),
376            Some(Duration::from_millis(100u64)),
377        ];
378
379        for flush_timeout in waits {
380            let reply = DowngradeRegionsHandler
381                .handle(
382                    &handler_context,
383                    Instruction::DowngradeRegions(vec![DowngradeRegion {
384                        region_id,
385                        flush_timeout,
386                    }]),
387                )
388                .await;
389
390            let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
391            assert!(reply.exists);
392            assert!(reply.error.as_ref().unwrap().contains("timeout"));
393            assert!(reply.last_entry_id.is_none());
394        }
395        let timer = Instant::now();
396        let reply = DowngradeRegionsHandler
397            .handle(
398                &handler_context,
399                Instruction::DowngradeRegions(vec![DowngradeRegion {
400                    region_id,
401                    flush_timeout: Some(Duration::from_millis(500)),
402                }]),
403            )
404            .await;
405        // Must less than 300 ms.
406        assert!(timer.elapsed().as_millis() < 300);
407
408        let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
409        assert!(reply.exists);
410        assert!(reply.error.is_none());
411        assert_eq!(reply.last_entry_id.unwrap(), 1024);
412    }
413
414    #[tokio::test]
415    async fn test_region_flush_timeout_and_retry_error() {
416        let mock_region_server = mock_region_server();
417        let region_id = RegionId::new(1024, 1);
418        let (mock_engine, _) =
419            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
420                region_engine.mock_role = Some(Some(RegionRole::Leader));
421                region_engine.handle_request_delay = Some(Duration::from_millis(300));
422                region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
423                    error::UnexpectedSnafu {
424                        violated: "mock flush failed",
425                    }
426                    .fail()
427                }));
428                region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
429                    Ok(SetRegionRoleStateResponse::success(
430                        SetRegionRoleStateSuccess::mito(1024),
431                    ))
432                }))
433            });
434        mock_region_server.register_test_region(region_id, mock_engine);
435        let handler_context = HandlerContext::new_for_test(mock_region_server);
436
437        let waits = vec![
438            Some(Duration::from_millis(100u64)),
439            Some(Duration::from_millis(100u64)),
440        ];
441
442        for flush_timeout in waits {
443            let reply = DowngradeRegionsHandler
444                .handle(
445                    &handler_context,
446                    Instruction::DowngradeRegions(vec![DowngradeRegion {
447                        region_id,
448                        flush_timeout,
449                    }]),
450                )
451                .await;
452            let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
453            assert!(reply.exists);
454            assert!(reply.error.as_ref().unwrap().contains("timeout"));
455            assert!(reply.last_entry_id.is_none());
456        }
457        let timer = Instant::now();
458        let reply = DowngradeRegionsHandler
459            .handle(
460                &handler_context,
461                Instruction::DowngradeRegions(vec![DowngradeRegion {
462                    region_id,
463                    flush_timeout: Some(Duration::from_millis(500)),
464                }]),
465            )
466            .await;
467        // Must less than 300 ms.
468        assert!(timer.elapsed().as_millis() < 300);
469        let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
470        assert!(reply.exists);
471        assert!(reply.error.as_ref().unwrap().contains("flush failed"));
472        assert!(reply.last_entry_id.is_none());
473    }
474
475    #[tokio::test]
476    async fn test_set_region_readonly_not_found() {
477        let mock_region_server = mock_region_server();
478        let region_id = RegionId::new(1024, 1);
479        let (mock_engine, _) =
480            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
481                region_engine.mock_role = Some(Some(RegionRole::Leader));
482                region_engine.handle_set_readonly_gracefully_mock_fn =
483                    Some(Box::new(|_| Ok(SetRegionRoleStateResponse::NotFound)));
484            });
485        mock_region_server.register_test_region(region_id, mock_engine);
486        let handler_context = HandlerContext::new_for_test(mock_region_server);
487        let reply = DowngradeRegionsHandler
488            .handle(
489                &handler_context,
490                Instruction::DowngradeRegions(vec![DowngradeRegion {
491                    region_id,
492                    flush_timeout: None,
493                }]),
494            )
495            .await;
496        let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
497        assert!(!reply.exists);
498        assert!(reply.error.is_none());
499        assert!(reply.last_entry_id.is_none());
500    }
501
502    #[tokio::test]
503    async fn test_set_region_readonly_error() {
504        let mock_region_server = mock_region_server();
505        let region_id = RegionId::new(1024, 1);
506        let (mock_engine, _) =
507            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
508                region_engine.mock_role = Some(Some(RegionRole::Leader));
509                region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
510                    error::UnexpectedSnafu {
511                        violated: "Failed to set region to readonly",
512                    }
513                    .fail()
514                }));
515            });
516        mock_region_server.register_test_region(region_id, mock_engine);
517        let handler_context = HandlerContext::new_for_test(mock_region_server);
518        let reply = DowngradeRegionsHandler
519            .handle(
520                &handler_context,
521                Instruction::DowngradeRegions(vec![DowngradeRegion {
522                    region_id,
523                    flush_timeout: None,
524                }]),
525            )
526            .await;
527        let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
528        assert!(reply.exists);
529        assert!(
530            reply
531                .error
532                .as_ref()
533                .unwrap()
534                .contains("Failed to set region to readonly")
535        );
536        assert!(reply.last_entry_id.is_none());
537    }
538
539    #[tokio::test]
540    async fn test_downgrade_regions() {
541        common_telemetry::init_default_ut_logging();
542
543        let mut region_server = mock_region_server();
544        let heartbeat_handler = RegionHeartbeatResponseHandler::new(region_server.clone());
545        let mut engine_env = TestEnv::with_prefix("downgrade-regions").await;
546        let engine = engine_env.create_engine(MitoConfig::default()).await;
547        region_server.register_engine(Arc::new(engine.clone()));
548        let region_id = RegionId::new(1024, 1);
549        let region_id1 = RegionId::new(1024, 2);
550        let builder = CreateRequestBuilder::new();
551        let create_req = builder.build();
552        region_server
553            .handle_request(region_id, RegionRequest::Create(create_req))
554            .await
555            .unwrap();
556        let create_req1 = builder.build();
557        region_server
558            .handle_request(region_id1, RegionRequest::Create(create_req1))
559            .await
560            .unwrap();
561        let meta = MessageMeta::new_test(1, "test", "dn-1", "meta-0");
562        let instruction = Instruction::DowngradeRegions(vec![
563            DowngradeRegion {
564                region_id,
565                flush_timeout: Some(Duration::from_secs(1)),
566            },
567            DowngradeRegion {
568                region_id: region_id1,
569                flush_timeout: Some(Duration::from_secs(1)),
570            },
571        ]);
572        let mut heartbeat_env = HeartbeatResponseTestEnv::new();
573        let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
574        let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
575        assert_matches!(control, HandleControl::Continue);
576
577        let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
578        let reply = reply.expect_downgrade_regions_reply();
579        assert_eq!(reply[0].region_id, region_id);
580        assert!(reply[0].exists);
581        assert!(reply[0].error.is_none());
582        assert_eq!(reply[0].last_entry_id, Some(0));
583        assert_eq!(reply[1].region_id, region_id1);
584        assert!(reply[1].exists);
585        assert!(reply[1].error.is_none());
586        assert_eq!(reply[1].last_entry_id, Some(0));
587
588        assert_eq!(engine.role(region_id).unwrap(), RegionRole::Follower);
589        assert_eq!(engine.role(region_id1).unwrap(), RegionRole::Follower);
590    }
591}