Skip to main content

datanode/heartbeat/handler/
downgrade_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_meta::instruction::{
16    DowngradeRegion, DowngradeRegionReply, DowngradeRegionsReply, InstructionReply,
17};
18use common_telemetry::tracing::info;
19use common_telemetry::{error, warn};
20use futures::future::join_all;
21use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
22use store_api::region_request::{RegionFlushRequest, RegionRequest};
23use store_api::storage::RegionId;
24
25use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
26use crate::heartbeat::task_tracker::WaitResult;
27
28#[derive(Debug, Clone, Copy, Default)]
29pub struct DowngradeRegionsHandler;
30
31impl DowngradeRegionsHandler {
32    async fn handle_downgrade_region(
33        ctx: &HandlerContext,
34        DowngradeRegion {
35            region_id,
36            flush_timeout,
37        }: DowngradeRegion,
38    ) -> DowngradeRegionReply {
39        let Some(writable) = ctx.region_server.is_region_leader(region_id) else {
40            warn!("Region: {region_id} is not found");
41            return DowngradeRegionReply {
42                region_id,
43                last_entry_id: None,
44                metadata_last_entry_id: None,
45                exists: false,
46                error: None,
47            };
48        };
49
50        let region_server_moved = ctx.region_server.clone();
51
52        // Ignores flush request
53        if !writable {
54            warn!(
55                "Region: {region_id} is not writable, flush_timeout: {:?}",
56                flush_timeout
57            );
58            return ctx.downgrade_to_follower_gracefully(region_id).await;
59        }
60
61        // If flush_timeout is not set, directly convert region to follower.
62        let Some(flush_timeout) = flush_timeout else {
63            return ctx.downgrade_to_follower_gracefully(region_id).await;
64        };
65
66        // Sets region to downgrading,
67        // the downgrading region will reject all write requests.
68        // However, the downgrading region will still accept read, flush requests.
69        match ctx
70            .region_server
71            .set_region_role_state_gracefully(region_id, SettableRegionRoleState::DowngradingLeader)
72            .await
73        {
74            Ok(SetRegionRoleStateResponse::Success { .. }) => {}
75            Ok(SetRegionRoleStateResponse::NotFound) => {
76                warn!("Region: {region_id} is not found");
77                return DowngradeRegionReply {
78                    region_id,
79                    last_entry_id: None,
80                    metadata_last_entry_id: None,
81                    exists: false,
82                    error: None,
83                };
84            }
85            Ok(SetRegionRoleStateResponse::InvalidTransition(err)) => {
86                error!(err; "Failed to convert region to downgrading leader - invalid transition");
87                return DowngradeRegionReply {
88                    region_id,
89                    last_entry_id: None,
90                    metadata_last_entry_id: None,
91                    exists: true,
92                    error: Some(format!("{err:?}")),
93                };
94            }
95            Err(err) => {
96                error!(err; "Failed to convert region to downgrading leader");
97                return DowngradeRegionReply {
98                    region_id,
99                    last_entry_id: None,
100                    metadata_last_entry_id: None,
101                    exists: true,
102                    error: Some(format!("{err:?}")),
103                };
104            }
105        }
106
107        let register_result = ctx
108            .downgrade_tasks
109            .try_register(
110                region_id,
111                Box::pin(async move {
112                    info!("Flush region: {region_id} before converting region to follower");
113                    region_server_moved
114                        .handle_request(
115                            region_id,
116                            RegionRequest::Flush(RegionFlushRequest::default()),
117                        )
118                        .await?;
119
120                    Ok(())
121                }),
122            )
123            .await;
124
125        if register_result.is_busy() {
126            warn!("Another flush task is running for the region: {region_id}");
127        }
128
129        let mut watcher = register_result.into_watcher();
130        let result = ctx.downgrade_tasks.wait(&mut watcher, flush_timeout).await;
131
132        match result {
133            WaitResult::Timeout => DowngradeRegionReply {
134                region_id,
135                last_entry_id: None,
136                metadata_last_entry_id: None,
137                exists: true,
138                error: Some(format!(
139                    "Flush region timeout, region: {region_id}, timeout: {:?}",
140                    flush_timeout
141                )),
142            },
143            WaitResult::Finish(Ok(_)) => ctx.downgrade_to_follower_gracefully(region_id).await,
144            WaitResult::Finish(Err(err)) => DowngradeRegionReply {
145                region_id,
146                last_entry_id: None,
147                metadata_last_entry_id: None,
148                exists: true,
149                error: Some(format!("{err:?}")),
150            },
151        }
152    }
153}
154
155#[async_trait::async_trait]
156impl InstructionHandler for DowngradeRegionsHandler {
157    type Instruction = Vec<DowngradeRegion>;
158
159    async fn handle(
160        &self,
161        ctx: &HandlerContext,
162        downgrade_regions: Self::Instruction,
163    ) -> Option<InstructionReply> {
164        let futures = downgrade_regions
165            .into_iter()
166            .map(|downgrade_region| Self::handle_downgrade_region(ctx, downgrade_region));
167        // Join all futures; parallelism is governed by the underlying flush scheduler.
168        let results = join_all(futures).await;
169
170        Some(InstructionReply::DowngradeRegions(
171            DowngradeRegionsReply::new(results),
172        ))
173    }
174}
175
176impl HandlerContext {
177    async fn downgrade_to_follower_gracefully(&self, region_id: RegionId) -> DowngradeRegionReply {
178        match self
179            .region_server
180            .set_region_role_state_gracefully(region_id, SettableRegionRoleState::Follower)
181            .await
182        {
183            Ok(SetRegionRoleStateResponse::Success(success)) => DowngradeRegionReply {
184                region_id,
185                last_entry_id: success.last_entry_id(),
186                metadata_last_entry_id: success.metadata_last_entry_id(),
187                exists: true,
188                error: None,
189            },
190            Ok(SetRegionRoleStateResponse::NotFound) => {
191                warn!("Region: {region_id} is not found");
192                DowngradeRegionReply {
193                    region_id,
194                    last_entry_id: None,
195                    metadata_last_entry_id: None,
196                    exists: false,
197                    error: None,
198                }
199            }
200            Ok(SetRegionRoleStateResponse::InvalidTransition(err)) => {
201                error!(err; "Failed to convert region to follower - invalid transition");
202                DowngradeRegionReply {
203                    region_id,
204                    last_entry_id: None,
205                    metadata_last_entry_id: None,
206                    exists: true,
207                    error: Some(format!("{err:?}")),
208                }
209            }
210            Err(err) => {
211                error!(err; "Failed to convert region to {}", SettableRegionRoleState::Follower);
212                DowngradeRegionReply {
213                    region_id,
214                    last_entry_id: None,
215                    metadata_last_entry_id: None,
216                    exists: true,
217                    error: Some(format!("{err:?}")),
218                }
219            }
220        }
221    }
222}
223
224#[cfg(test)]
225mod tests {
226    use std::assert_matches;
227    use std::sync::Arc;
228    use std::time::Duration;
229
230    use common_meta::heartbeat::handler::{HandleControl, HeartbeatResponseHandler};
231    use common_meta::heartbeat::mailbox::MessageMeta;
232    use common_meta::instruction::{DowngradeRegion, Instruction};
233    use common_meta::kv_backend::memory::MemoryKvBackend;
234    use mito2::config::MitoConfig;
235    use mito2::engine::MITO_ENGINE_NAME;
236    use mito2::test_util::{CreateRequestBuilder, TestEnv};
237    use store_api::region_engine::{
238        RegionEngine, RegionRole, SetRegionRoleStateResponse, SetRegionRoleStateSuccess,
239    };
240    use store_api::region_request::RegionRequest;
241    use store_api::storage::RegionId;
242    use tokio::time::Instant;
243
244    use crate::error;
245    use crate::heartbeat::handler::downgrade_region::DowngradeRegionsHandler;
246    use crate::heartbeat::handler::tests::HeartbeatResponseTestEnv;
247    use crate::heartbeat::handler::{
248        HandlerContext, InstructionHandler, RegionHeartbeatResponseHandler,
249    };
250    use crate::tests::{MockRegionEngine, mock_region_server};
251
252    #[tokio::test]
253    async fn test_region_not_exist() {
254        let mut mock_region_server = mock_region_server();
255        let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
256        mock_region_server.register_engine(mock_engine);
257        let kv_backend = Arc::new(MemoryKvBackend::new());
258        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
259        let region_id = RegionId::new(1024, 1);
260        let waits = vec![None, Some(Duration::from_millis(100u64))];
261
262        for flush_timeout in waits {
263            let reply = DowngradeRegionsHandler
264                .handle(
265                    &handler_context,
266                    vec![DowngradeRegion {
267                        region_id,
268                        flush_timeout,
269                    }],
270                )
271                .await;
272
273            let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
274            assert!(!reply.exists);
275            assert!(reply.error.is_none());
276            assert!(reply.last_entry_id.is_none());
277        }
278    }
279
280    #[tokio::test]
281    async fn test_region_readonly() {
282        let mock_region_server = mock_region_server();
283        let region_id = RegionId::new(1024, 1);
284        let (mock_engine, _) =
285            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
286                region_engine.mock_role = Some(Some(RegionRole::Follower));
287                region_engine.handle_request_mock_fn = Some(Box::new(|_, req| {
288                    if let RegionRequest::Flush(_) = req {
289                        // Should be unreachable.
290                        unreachable!();
291                    };
292
293                    Ok(0)
294                }));
295                region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
296                    Ok(SetRegionRoleStateResponse::success(
297                        SetRegionRoleStateSuccess::mito(1024),
298                    ))
299                }))
300            });
301        mock_region_server.register_test_region(region_id, mock_engine);
302        let kv_backend = Arc::new(MemoryKvBackend::new());
303        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
304
305        let waits = vec![None, Some(Duration::from_millis(100u64))];
306        for flush_timeout in waits {
307            let reply = DowngradeRegionsHandler
308                .handle(
309                    &handler_context,
310                    vec![DowngradeRegion {
311                        region_id,
312                        flush_timeout,
313                    }],
314                )
315                .await;
316
317            let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
318            assert!(reply.exists);
319            assert!(reply.error.is_none());
320            assert_eq!(reply.last_entry_id.unwrap(), 1024);
321        }
322    }
323
324    #[tokio::test]
325    async fn test_region_flush_timeout() {
326        let mock_region_server = mock_region_server();
327        let region_id = RegionId::new(1024, 1);
328        let (mock_engine, _) =
329            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
330                region_engine.mock_role = Some(Some(RegionRole::Leader));
331                region_engine.handle_request_delay = Some(Duration::from_secs(100));
332                region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
333                    Ok(SetRegionRoleStateResponse::success(
334                        SetRegionRoleStateSuccess::mito(1024),
335                    ))
336                }))
337            });
338        mock_region_server.register_test_region(region_id, mock_engine);
339        let kv_backend = Arc::new(MemoryKvBackend::new());
340        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
341
342        let flush_timeout = Duration::from_millis(100);
343        let reply = DowngradeRegionsHandler
344            .handle(
345                &handler_context,
346                vec![DowngradeRegion {
347                    region_id,
348                    flush_timeout: Some(flush_timeout),
349                }],
350            )
351            .await;
352
353        let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
354        assert!(reply.exists);
355        assert!(reply.error.as_ref().unwrap().contains("timeout"));
356        assert!(reply.last_entry_id.is_none());
357    }
358
359    #[tokio::test]
360    async fn test_region_flush_timeout_and_retry() {
361        let mock_region_server = mock_region_server();
362        let region_id = RegionId::new(1024, 1);
363        let (mock_engine, _) =
364            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
365                region_engine.mock_role = Some(Some(RegionRole::Leader));
366                region_engine.handle_request_delay = Some(Duration::from_millis(300));
367                region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
368                    Ok(SetRegionRoleStateResponse::success(
369                        SetRegionRoleStateSuccess::mito(1024),
370                    ))
371                }))
372            });
373        mock_region_server.register_test_region(region_id, mock_engine);
374        let kv_backend = Arc::new(MemoryKvBackend::new());
375        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
376
377        let waits = vec![
378            Some(Duration::from_millis(100u64)),
379            Some(Duration::from_millis(100u64)),
380        ];
381
382        for flush_timeout in waits {
383            let reply = DowngradeRegionsHandler
384                .handle(
385                    &handler_context,
386                    vec![DowngradeRegion {
387                        region_id,
388                        flush_timeout,
389                    }],
390                )
391                .await;
392
393            let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
394            assert!(reply.exists);
395            assert!(reply.error.as_ref().unwrap().contains("timeout"));
396            assert!(reply.last_entry_id.is_none());
397        }
398        let timer = Instant::now();
399        let reply = DowngradeRegionsHandler
400            .handle(
401                &handler_context,
402                vec![DowngradeRegion {
403                    region_id,
404                    flush_timeout: Some(Duration::from_millis(500)),
405                }],
406            )
407            .await;
408        // Must less than 300 ms.
409        assert!(timer.elapsed().as_millis() < 300);
410
411        let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
412        assert!(reply.exists);
413        assert!(reply.error.is_none());
414        assert_eq!(reply.last_entry_id.unwrap(), 1024);
415    }
416
417    #[tokio::test]
418    async fn test_region_flush_timeout_and_retry_error() {
419        let mock_region_server = mock_region_server();
420        let region_id = RegionId::new(1024, 1);
421        let (mock_engine, _) =
422            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
423                region_engine.mock_role = Some(Some(RegionRole::Leader));
424                region_engine.handle_request_delay = Some(Duration::from_millis(300));
425                region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
426                    error::UnexpectedSnafu {
427                        violated: "mock flush failed",
428                    }
429                    .fail()
430                }));
431                region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
432                    Ok(SetRegionRoleStateResponse::success(
433                        SetRegionRoleStateSuccess::mito(1024),
434                    ))
435                }))
436            });
437        mock_region_server.register_test_region(region_id, mock_engine);
438        let kv_backend = Arc::new(MemoryKvBackend::new());
439        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
440
441        let waits = vec![
442            Some(Duration::from_millis(100u64)),
443            Some(Duration::from_millis(100u64)),
444        ];
445
446        for flush_timeout in waits {
447            let reply = DowngradeRegionsHandler
448                .handle(
449                    &handler_context,
450                    vec![DowngradeRegion {
451                        region_id,
452                        flush_timeout,
453                    }],
454                )
455                .await;
456            let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
457            assert!(reply.exists);
458            assert!(reply.error.as_ref().unwrap().contains("timeout"));
459            assert!(reply.last_entry_id.is_none());
460        }
461        let timer = Instant::now();
462        let reply = DowngradeRegionsHandler
463            .handle(
464                &handler_context,
465                vec![DowngradeRegion {
466                    region_id,
467                    flush_timeout: Some(Duration::from_millis(500)),
468                }],
469            )
470            .await;
471        // Must less than 300 ms.
472        assert!(timer.elapsed().as_millis() < 300);
473        let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
474        assert!(reply.exists);
475        assert!(reply.error.as_ref().unwrap().contains("flush failed"));
476        assert!(reply.last_entry_id.is_none());
477    }
478
479    #[tokio::test]
480    async fn test_set_region_readonly_not_found() {
481        let mock_region_server = mock_region_server();
482        let region_id = RegionId::new(1024, 1);
483        let (mock_engine, _) =
484            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
485                region_engine.mock_role = Some(Some(RegionRole::Leader));
486                region_engine.handle_set_readonly_gracefully_mock_fn =
487                    Some(Box::new(|_| Ok(SetRegionRoleStateResponse::NotFound)));
488            });
489        mock_region_server.register_test_region(region_id, mock_engine);
490        let kv_backend = Arc::new(MemoryKvBackend::new());
491        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
492        let reply = DowngradeRegionsHandler
493            .handle(
494                &handler_context,
495                vec![DowngradeRegion {
496                    region_id,
497                    flush_timeout: None,
498                }],
499            )
500            .await;
501        let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
502        assert!(!reply.exists);
503        assert!(reply.error.is_none());
504        assert!(reply.last_entry_id.is_none());
505    }
506
507    #[tokio::test]
508    async fn test_set_region_readonly_error() {
509        let mock_region_server = mock_region_server();
510        let region_id = RegionId::new(1024, 1);
511        let (mock_engine, _) =
512            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
513                region_engine.mock_role = Some(Some(RegionRole::Leader));
514                region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
515                    error::UnexpectedSnafu {
516                        violated: "Failed to set region to readonly",
517                    }
518                    .fail()
519                }));
520            });
521        mock_region_server.register_test_region(region_id, mock_engine);
522        let kv_backend = Arc::new(MemoryKvBackend::new());
523        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
524        let reply = DowngradeRegionsHandler
525            .handle(
526                &handler_context,
527                vec![DowngradeRegion {
528                    region_id,
529                    flush_timeout: None,
530                }],
531            )
532            .await;
533        let reply = &reply.unwrap().expect_downgrade_regions_reply()[0];
534        assert!(reply.exists);
535        assert!(
536            reply
537                .error
538                .as_ref()
539                .unwrap()
540                .contains("Failed to set region to readonly")
541        );
542        assert!(reply.last_entry_id.is_none());
543    }
544
545    #[tokio::test]
546    async fn test_downgrade_regions() {
547        common_telemetry::init_default_ut_logging();
548
549        let mut region_server = mock_region_server();
550        let kv_backend = Arc::new(MemoryKvBackend::new());
551        let heartbeat_handler =
552            RegionHeartbeatResponseHandler::new(region_server.clone(), kv_backend);
553        let mut engine_env = TestEnv::with_prefix("downgrade-regions").await;
554        let engine = engine_env.create_engine(MitoConfig::default()).await;
555        region_server.register_engine(Arc::new(engine.clone()));
556        let region_id = RegionId::new(1024, 1);
557        let region_id1 = RegionId::new(1024, 2);
558        let builder = CreateRequestBuilder::new();
559        let create_req = builder.build();
560        region_server
561            .handle_request(region_id, RegionRequest::Create(create_req))
562            .await
563            .unwrap();
564        let create_req1 = builder.build();
565        region_server
566            .handle_request(region_id1, RegionRequest::Create(create_req1))
567            .await
568            .unwrap();
569        let meta = MessageMeta::new_test(1, "test", "dn-1", "meta-0");
570        let instruction = Instruction::DowngradeRegions(vec![
571            DowngradeRegion {
572                region_id,
573                flush_timeout: Some(Duration::from_secs(1)),
574            },
575            DowngradeRegion {
576                region_id: region_id1,
577                flush_timeout: Some(Duration::from_secs(1)),
578            },
579        ]);
580        let mut heartbeat_env = HeartbeatResponseTestEnv::new();
581        let mut ctx = heartbeat_env.create_handler_ctx((meta, Default::default(), instruction));
582        let control = heartbeat_handler.handle(&mut ctx).await.unwrap();
583        assert_matches!(control, HandleControl::Continue);
584
585        let (_, reply) = heartbeat_env.receiver.recv().await.unwrap();
586        let reply = reply.expect_downgrade_regions_reply();
587        assert_eq!(reply[0].region_id, region_id);
588        assert!(reply[0].exists);
589        assert!(reply[0].error.is_none());
590        assert_eq!(reply[0].last_entry_id, Some(0));
591        assert_eq!(reply[1].region_id, region_id1);
592        assert!(reply[1].exists);
593        assert!(reply[1].error.is_none());
594        assert_eq!(reply[1].last_entry_id, Some(0));
595
596        assert_eq!(engine.role(region_id).unwrap(), RegionRole::Follower);
597        assert_eq!(engine.role(region_id1).unwrap(), RegionRole::Follower);
598    }
599}