datanode/heartbeat/handler/
downgrade_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_meta::instruction::{
16    DowngradeRegion, DowngradeRegionReply, DowngradeRegionsReply, InstructionReply,
17};
18use common_telemetry::tracing::info;
19use common_telemetry::{error, warn};
20use futures::future::join_all;
21use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
22use store_api::region_request::{RegionFlushRequest, RegionRequest};
23use store_api::storage::RegionId;
24
25use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
26use crate::heartbeat::task_tracker::WaitResult;
27
28#[derive(Debug, Clone, Copy, Default)]
29pub struct DowngradeRegionsHandler;
30
31impl DowngradeRegionsHandler {
32    async fn handle_downgrade_region(
33        ctx: &HandlerContext,
34        DowngradeRegion {
35            region_id,
36            flush_timeout,
37        }: DowngradeRegion,
38    ) -> DowngradeRegionReply {
39        let Some(writable) = ctx.region_server.is_region_leader(region_id) else {
40            warn!("Region: {region_id} is not found");
41            return DowngradeRegionReply {
42                region_id,
43                last_entry_id: None,
44                metadata_last_entry_id: None,
45                exists: false,
46                error: None,
47            };
48        };
49
50        let region_server_moved = ctx.region_server.clone();
51
52        // Ignores flush request
53        if !writable {
54            warn!(
55                "Region: {region_id} is not writable, flush_timeout: {:?}",
56                flush_timeout
57            );
58            return ctx.downgrade_to_follower_gracefully(region_id).await;
59        }
60
61        // If flush_timeout is not set, directly convert region to follower.
62        let Some(flush_timeout) = flush_timeout else {
63            return ctx.downgrade_to_follower_gracefully(region_id).await;
64        };
65
66        // Sets region to downgrading,
67        // the downgrading region will reject all write requests.
68        // However, the downgrading region will still accept read, flush requests.
69        match ctx
70            .region_server
71            .set_region_role_state_gracefully(region_id, SettableRegionRoleState::DowngradingLeader)
72            .await
73        {
74            Ok(SetRegionRoleStateResponse::Success { .. }) => {}
75            Ok(SetRegionRoleStateResponse::NotFound) => {
76                warn!("Region: {region_id} is not found");
77                return DowngradeRegionReply {
78                    region_id,
79                    last_entry_id: None,
80                    metadata_last_entry_id: None,
81                    exists: false,
82                    error: None,
83                };
84            }
85            Ok(SetRegionRoleStateResponse::InvalidTransition(err)) => {
86                error!(err; "Failed to convert region to downgrading leader - invalid transition");
87                return DowngradeRegionReply {
88                    region_id,
89                    last_entry_id: None,
90                    metadata_last_entry_id: None,
91                    exists: true,
92                    error: Some(format!("{err:?}")),
93                };
94            }
95            Err(err) => {
96                error!(err; "Failed to convert region to downgrading leader");
97                return DowngradeRegionReply {
98                    region_id,
99                    last_entry_id: None,
100                    metadata_last_entry_id: None,
101                    exists: true,
102                    error: Some(format!("{err:?}")),
103                };
104            }
105        }
106
107        let register_result = ctx
108            .downgrade_tasks
109            .try_register(
110                region_id,
111                Box::pin(async move {
112                    info!("Flush region: {region_id} before converting region to follower");
113                    region_server_moved
114                        .handle_request(
115                            region_id,
116                            RegionRequest::Flush(RegionFlushRequest {
117                                row_group_size: None,
118                            }),
119                        )
120                        .await?;
121
122                    Ok(())
123                }),
124            )
125            .await;
126
127        if register_result.is_busy() {
128            warn!("Another flush task is running for the region: {region_id}");
129        }
130
131        let mut watcher = register_result.into_watcher();
132        let result = ctx.downgrade_tasks.wait(&mut watcher, flush_timeout).await;
133
134        match result {
135            WaitResult::Timeout => DowngradeRegionReply {
136                region_id,
137                last_entry_id: None,
138                metadata_last_entry_id: None,
139                exists: true,
140                error: Some(format!(
141                    "Flush region timeout, region: {region_id}, timeout: {:?}",
142                    flush_timeout
143                )),
144            },
145            WaitResult::Finish(Ok(_)) => ctx.downgrade_to_follower_gracefully(region_id).await,
146            WaitResult::Finish(Err(err)) => DowngradeRegionReply {
147                region_id,
148                last_entry_id: None,
149                metadata_last_entry_id: None,
150                exists: true,
151                error: Some(format!("{err:?}")),
152            },
153        }
154    }
155}
156
157#[async_trait::async_trait]
158impl InstructionHandler for DowngradeRegionsHandler {
159    type Instruction = Vec<DowngradeRegion>;
160
161    async fn handle(
162        &self,
163        ctx: &HandlerContext,
164        downgrade_regions: Self::Instruction,
165    ) -> Option<InstructionReply> {
166        let futures = downgrade_regions
167            .into_iter()
168            .map(|downgrade_region| Self::handle_downgrade_region(ctx, downgrade_region));
169        // Join all futures; parallelism is governed by the underlying flush scheduler.
170        let results = join_all(futures).await;
171
172        Some(InstructionReply::DowngradeRegions(
173            DowngradeRegionsReply::new(results),
174        ))
175    }
176}
177
178impl HandlerContext {
179    async fn downgrade_to_follower_gracefully(&self, region_id: RegionId) -> DowngradeRegionReply {
180        match self
181            .region_server
182            .set_region_role_state_gracefully(region_id, SettableRegionRoleState::Follower)
183            .await
184        {
185            Ok(SetRegionRoleStateResponse::Success(success)) => DowngradeRegionReply {
186                region_id,
187                last_entry_id: success.last_entry_id(),
188                metadata_last_entry_id: success.metadata_last_entry_id(),
189                exists: true,
190                error: None,
191            },
192            Ok(SetRegionRoleStateResponse::NotFound) => {
193                warn!("Region: {region_id} is not found");
194                DowngradeRegionReply {
195                    region_id,
196                    last_entry_id: None,
197                    metadata_last_entry_id: None,
198                    exists: false,
199                    error: None,
200                }
201            }
202            Ok(SetRegionRoleStateResponse::InvalidTransition(err)) => {
203                error!(err; "Failed to convert region to follower - invalid transition");
204                DowngradeRegionReply {
205                    region_id,
206                    last_entry_id: None,
207                    metadata_last_entry_id: None,
208                    exists: true,
209                    error: Some(format!("{err:?}")),
210                }
211            }
212            Err(err) => {
213                error!(err; "Failed to convert region to {}", SettableRegionRoleState::Follower);
214                DowngradeRegionReply {
215                    region_id,
216                    last_entry_id: None,
217                    metadata_last_entry_id: None,
218                    exists: true,
219                    error: Some(format!("{err:?}")),
220                }
221            }
222        }
223    }
224}
225
226#[cfg(test)]
227mod tests {
228    use std::assert_matches::assert_matches;
229    use std::sync::Arc;
230    use std::time::Duration;
231
232    use common_meta::heartbeat::handler::{HandleControl, HeartbeatResponseHandler};
233    use common_meta::heartbeat::mailbox::MessageMeta;
234    use common_meta::instruction::{DowngradeRegion, Instruction};
235    use common_meta::kv_backend::memory::MemoryKvBackend;
236    use mito2::config::MitoConfig;
237    use mito2::engine::MITO_ENGINE_NAME;
238    use mito2::test_util::{CreateRequestBuilder, TestEnv};
239    use store_api::region_engine::{
240        RegionEngine, RegionRole, SetRegionRoleStateResponse, SetRegionRoleStateSuccess,
241    };
242    use store_api::region_request::RegionRequest;
243    use store_api::storage::RegionId;
244    use tokio::time::Instant;
245
246    use crate::error;
247    use crate::heartbeat::handler::downgrade_region::DowngradeRegionsHandler;
248    use crate::heartbeat::handler::tests::HeartbeatResponseTestEnv;
249    use crate::heartbeat::handler::{
250        HandlerContext, InstructionHandler, RegionHeartbeatResponseHandler,
251    };
252    use crate::tests::{MockRegionEngine, mock_region_server};
253
254    #[tokio::test]
255    async fn test_region_not_exist() {
256        let mut mock_region_server = mock_region_server();
257        let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
258        mock_region_server.register_engine(mock_engine);
259        let kv_backend = Arc::new(MemoryKvBackend::new());
260        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
261        let region_id = RegionId::new(1024, 1);
262        let waits = vec![None, Some(Duration::from_millis(100u64))];
263
264        for flush_timeout in waits {
265            let reply = DowngradeRegionsHandler
266                .handle(
267                    &handler_context,
268                    vec![DowngradeRegion {
269                        region_id,
270                        flush_timeout,
271                    }],
272                )
273                .await;
274
275            let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
276            assert!(!reply.exists);
277            assert!(reply.error.is_none());
278            assert!(reply.last_entry_id.is_none());
279        }
280    }
281
282    #[tokio::test]
283    async fn test_region_readonly() {
284        let mock_region_server = mock_region_server();
285        let region_id = RegionId::new(1024, 1);
286        let (mock_engine, _) =
287            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
288                region_engine.mock_role = Some(Some(RegionRole::Follower));
289                region_engine.handle_request_mock_fn = Some(Box::new(|_, req| {
290                    if let RegionRequest::Flush(_) = req {
291                        // Should be unreachable.
292                        unreachable!();
293                    };
294
295                    Ok(0)
296                }));
297                region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
298                    Ok(SetRegionRoleStateResponse::success(
299                        SetRegionRoleStateSuccess::mito(1024),
300                    ))
301                }))
302            });
303        mock_region_server.register_test_region(region_id, mock_engine);
304        let kv_backend = Arc::new(MemoryKvBackend::new());
305        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
306
307        let waits = vec![None, Some(Duration::from_millis(100u64))];
308        for flush_timeout in waits {
309            let reply = DowngradeRegionsHandler
310                .handle(
311                    &handler_context,
312                    vec![DowngradeRegion {
313                        region_id,
314                        flush_timeout,
315                    }],
316                )
317                .await;
318
319            let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
320            assert!(reply.exists);
321            assert!(reply.error.is_none());
322            assert_eq!(reply.last_entry_id.unwrap(), 1024);
323        }
324    }
325
326    #[tokio::test]
327    async fn test_region_flush_timeout() {
328        let mock_region_server = mock_region_server();
329        let region_id = RegionId::new(1024, 1);
330        let (mock_engine, _) =
331            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
332                region_engine.mock_role = Some(Some(RegionRole::Leader));
333                region_engine.handle_request_delay = Some(Duration::from_secs(100));
334                region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
335                    Ok(SetRegionRoleStateResponse::success(
336                        SetRegionRoleStateSuccess::mito(1024),
337                    ))
338                }))
339            });
340        mock_region_server.register_test_region(region_id, mock_engine);
341        let kv_backend = Arc::new(MemoryKvBackend::new());
342        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
343
344        let flush_timeout = Duration::from_millis(100);
345        let reply = DowngradeRegionsHandler
346            .handle(
347                &handler_context,
348                vec![DowngradeRegion {
349                    region_id,
350                    flush_timeout: Some(flush_timeout),
351                }],
352            )
353            .await;
354
355        let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
356        assert!(reply.exists);
357        assert!(reply.error.as_ref().unwrap().contains("timeout"));
358        assert!(reply.last_entry_id.is_none());
359    }
360
361    #[tokio::test]
362    async fn test_region_flush_timeout_and_retry() {
363        let mock_region_server = mock_region_server();
364        let region_id = RegionId::new(1024, 1);
365        let (mock_engine, _) =
366            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
367                region_engine.mock_role = Some(Some(RegionRole::Leader));
368                region_engine.handle_request_delay = Some(Duration::from_millis(300));
369                region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
370                    Ok(SetRegionRoleStateResponse::success(
371                        SetRegionRoleStateSuccess::mito(1024),
372                    ))
373                }))
374            });
375        mock_region_server.register_test_region(region_id, mock_engine);
376        let kv_backend = Arc::new(MemoryKvBackend::new());
377        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
378
379        let waits = vec![
380            Some(Duration::from_millis(100u64)),
381            Some(Duration::from_millis(100u64)),
382        ];
383
384        for flush_timeout in waits {
385            let reply = DowngradeRegionsHandler
386                .handle(
387                    &handler_context,
388                    vec![DowngradeRegion {
389                        region_id,
390                        flush_timeout,
391                    }],
392                )
393                .await;
394
395            let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
396            assert!(reply.exists);
397            assert!(reply.error.as_ref().unwrap().contains("timeout"));
398            assert!(reply.last_entry_id.is_none());
399        }
400        let timer = Instant::now();
401        let reply = DowngradeRegionsHandler
402            .handle(
403                &handler_context,
404                vec![DowngradeRegion {
405                    region_id,
406                    flush_timeout: Some(Duration::from_millis(500)),
407                }],
408            )
409            .await;
410        // Must less than 300 ms.
411        assert!(timer.elapsed().as_millis() < 300);
412
413        let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
414        assert!(reply.exists);
415        assert!(reply.error.is_none());
416        assert_eq!(reply.last_entry_id.unwrap(), 1024);
417    }
418
419    #[tokio::test]
420    async fn test_region_flush_timeout_and_retry_error() {
421        let mock_region_server = mock_region_server();
422        let region_id = RegionId::new(1024, 1);
423        let (mock_engine, _) =
424            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
425                region_engine.mock_role = Some(Some(RegionRole::Leader));
426                region_engine.handle_request_delay = Some(Duration::from_millis(300));
427                region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
428                    error::UnexpectedSnafu {
429                        violated: "mock flush failed",
430                    }
431                    .fail()
432                }));
433                region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
434                    Ok(SetRegionRoleStateResponse::success(
435                        SetRegionRoleStateSuccess::mito(1024),
436                    ))
437                }))
438            });
439        mock_region_server.register_test_region(region_id, mock_engine);
440        let kv_backend = Arc::new(MemoryKvBackend::new());
441        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
442
443        let waits = vec![
444            Some(Duration::from_millis(100u64)),
445            Some(Duration::from_millis(100u64)),
446        ];
447
448        for flush_timeout in waits {
449            let reply = DowngradeRegionsHandler
450                .handle(
451                    &handler_context,
452                    vec![DowngradeRegion {
453                        region_id,
454                        flush_timeout,
455                    }],
456                )
457                .await;
458            let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
459            assert!(reply.exists);
460            assert!(reply.error.as_ref().unwrap().contains("timeout"));
461            assert!(reply.last_entry_id.is_none());
462        }
463        let timer = Instant::now();
464        let reply = DowngradeRegionsHandler
465            .handle(
466                &handler_context,
467                vec![DowngradeRegion {
468                    region_id,
469                    flush_timeout: Some(Duration::from_millis(500)),
470                }],
471            )
472            .await;
473        // Must less than 300 ms.
474        assert!(timer.elapsed().as_millis() < 300);
475        let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
476        assert!(reply.exists);
477        assert!(reply.error.as_ref().unwrap().contains("flush failed"));
478        assert!(reply.last_entry_id.is_none());
479    }
480
481    #[tokio::test]
482    async fn test_set_region_readonly_not_found() {
483        let mock_region_server = mock_region_server();
484        let region_id = RegionId::new(1024, 1);
485        let (mock_engine, _) =
486            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
487                region_engine.mock_role = Some(Some(RegionRole::Leader));
488                region_engine.handle_set_readonly_gracefully_mock_fn =
489                    Some(Box::new(|_| Ok(SetRegionRoleStateResponse::NotFound)));
490            });
491        mock_region_server.register_test_region(region_id, mock_engine);
492        let kv_backend = Arc::new(MemoryKvBackend::new());
493        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
494        let reply = DowngradeRegionsHandler
495            .handle(
496                &handler_context,
497                vec![DowngradeRegion {
498                    region_id,
499                    flush_timeout: None,
500                }],
501            )
502            .await;
503        let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
504        assert!(!reply.exists);
505        assert!(reply.error.is_none());
506        assert!(reply.last_entry_id.is_none());
507    }
508
509    #[tokio::test]
510    async fn test_set_region_readonly_error() {
511        let mock_region_server = mock_region_server();
512        let region_id = RegionId::new(1024, 1);
513        let (mock_engine, _) =
514            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
515                region_engine.mock_role = Some(Some(RegionRole::Leader));
516                region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
517                    error::UnexpectedSnafu {
518                        violated: "Failed to set region to readonly",
519                    }
520                    .fail()
521                }));
522            });
523        mock_region_server.register_test_region(region_id, mock_engine);
524        let kv_backend = Arc::new(MemoryKvBackend::new());
525        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
526        let reply = DowngradeRegionsHandler
527            .handle(
528                &handler_context,
529                vec![DowngradeRegion {
530                    region_id,
531                    flush_timeout: None,
532                }],
533            )
534            .await;
535        let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
536        assert!(reply.exists);
537        assert!(
538            reply
539                .error
540                .as_ref()
541                .unwrap()
542                .contains("Failed to set region to readonly")
543        );
544        assert!(reply.last_entry_id.is_none());
545    }
546
547    #[tokio::test]
548    async fn test_downgrade_regions() {
549        common_telemetry::init_default_ut_logging();
550
551        let mut region_server = mock_region_server();
552        let kv_backend = Arc::new(MemoryKvBackend::new());
553        let heartbeat_handler =
554            RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend);
555        let mut engine_env = TestEnv::with_prefix("downgrade-regions").await;
556        let engine = engine_env.create_engine(MitoConfig::default()).await;
557        region_server.register_engine(Arc::new(engine.clone()));
558        let region_id = RegionId::new(1024, 1);
559        let region_id1 = RegionId::new(1024, 2);
560        let builder = CreateRequestBuilder::new();
561        let create_req = builder.build();
562        region_server
563            .handle_request(region_id, RegionRequest::Create(create_req))
564            .await
565            .unwrap();
566        let create_req1 = builder.build();
567        region_server
568            .handle_request(region_id1, RegionRequest::Create(create_req1))
569            .await
570            .unwrap();
571        let meta = MessageMeta::new_test(1, "test", "dn-1", "meta-0");
572        let instruction = Instruction::DowngradeRegions(vec![
573            DowngradeRegion {
574                region_id,
575                flush_timeout: Some(Duration::from_secs(1)),
576            },
577            DowngradeRegion {
578                region_id: region_id1,
579                flush_timeout: Some(Duration::from_secs(1)),
580            },
581        ]);
582        let mut heartbeat_env = HeartbeatResponseTestEnv::new();
583        let mut ctx = heartbeat_env.create_handler_ctx((meta, instruction));
584        let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
585        assert_matches!(control, HandleControl::Continue);
586
587        let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
588        let reply = reply.expect_downgrade_regions_reply();
589        assert_eq!(reply[0].region_id, region_id);
590        assert!(reply[0].exists);
591        assert!(reply[0].error.is_none());
592        assert_eq!(reply[0].last_entry_id, Some(0));
593        assert_eq!(reply[1].region_id, region_id1);
594        assert!(reply[1].exists);
595        assert!(reply[1].error.is_none());
596        assert_eq!(reply[1].last_entry_id, Some(0));
597
598        assert_eq!(engine.role(region_id).unwrap(), RegionRole::Follower);
599        assert_eq!(engine.role(region_id1).unwrap(), RegionRole::Follower);
600    }
601}