datanode/heartbeat/handler/
downgrade_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_meta::instruction::{DowngradeRegion, DowngradeRegionReply, InstructionReply};
16use common_telemetry::tracing::info;
17use common_telemetry::{error, warn};
18use futures_util::future::BoxFuture;
19use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
20use store_api::region_request::{RegionFlushRequest, RegionRequest};
21use store_api::storage::RegionId;
22
23use crate::heartbeat::handler::HandlerContext;
24use crate::heartbeat::task_tracker::WaitResult;
25
26impl HandlerContext {
27    async fn downgrade_to_follower_gracefully(
28        &self,
29        region_id: RegionId,
30    ) -> Option<InstructionReply> {
31        match self
32            .region_server
33            .set_region_role_state_gracefully(region_id, SettableRegionRoleState::Follower)
34            .await
35        {
36            Ok(SetRegionRoleStateResponse::Success(success)) => {
37                Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
38                    last_entry_id: success.last_entry_id(),
39                    metadata_last_entry_id: success.metadata_last_entry_id(),
40                    exists: true,
41                    error: None,
42                }))
43            }
44            Ok(SetRegionRoleStateResponse::NotFound) => {
45                warn!("Region: {region_id} is not found");
46                Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
47                    last_entry_id: None,
48                    metadata_last_entry_id: None,
49                    exists: false,
50                    error: None,
51                }))
52            }
53            Err(err) => {
54                error!(err; "Failed to convert region to {}", SettableRegionRoleState::Follower);
55                Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
56                    last_entry_id: None,
57                    metadata_last_entry_id: None,
58                    exists: true,
59                    error: Some(format!("{err:?}")),
60                }))
61            }
62        }
63    }
64
65    pub(crate) fn handle_downgrade_region_instruction(
66        self,
67        DowngradeRegion {
68            region_id,
69            flush_timeout,
70        }: DowngradeRegion,
71    ) -> BoxFuture<'static, Option<InstructionReply>> {
72        Box::pin(async move {
73            let Some(writable) = self.region_server.is_region_leader(region_id) else {
74                warn!("Region: {region_id} is not found");
75                return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
76                    last_entry_id: None,
77                    metadata_last_entry_id: None,
78                    exists: false,
79                    error: None,
80                }));
81            };
82
83            let region_server_moved = self.region_server.clone();
84
85            // Ignores flush request
86            if !writable {
87                warn!(
88                    "Region: {region_id} is not writable, flush_timeout: {:?}",
89                    flush_timeout
90                );
91                return self.downgrade_to_follower_gracefully(region_id).await;
92            }
93
94            // If flush_timeout is not set, directly convert region to follower.
95            let Some(flush_timeout) = flush_timeout else {
96                return self.downgrade_to_follower_gracefully(region_id).await;
97            };
98
99            // Sets region to downgrading,
100            // the downgrading region will reject all write requests.
101            // However, the downgrading region will still accept read, flush requests.
102            match self
103                .region_server
104                .set_region_role_state_gracefully(
105                    region_id,
106                    SettableRegionRoleState::DowngradingLeader,
107                )
108                .await
109            {
110                Ok(SetRegionRoleStateResponse::Success { .. }) => {}
111                Ok(SetRegionRoleStateResponse::NotFound) => {
112                    warn!("Region: {region_id} is not found");
113                    return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
114                        last_entry_id: None,
115                        metadata_last_entry_id: None,
116                        exists: false,
117                        error: None,
118                    }));
119                }
120                Err(err) => {
121                    error!(err; "Failed to convert region to downgrading leader");
122                    return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
123                        last_entry_id: None,
124                        metadata_last_entry_id: None,
125                        exists: true,
126                        error: Some(format!("{err:?}")),
127                    }));
128                }
129            }
130
131            let register_result = self
132                .downgrade_tasks
133                .try_register(
134                    region_id,
135                    Box::pin(async move {
136                        info!("Flush region: {region_id} before converting region to follower");
137                        region_server_moved
138                            .handle_request(
139                                region_id,
140                                RegionRequest::Flush(RegionFlushRequest {
141                                    row_group_size: None,
142                                }),
143                            )
144                            .await?;
145
146                        Ok(())
147                    }),
148                )
149                .await;
150
151            if register_result.is_busy() {
152                warn!("Another flush task is running for the region: {region_id}");
153            }
154
155            let mut watcher = register_result.into_watcher();
156            let result = self.downgrade_tasks.wait(&mut watcher, flush_timeout).await;
157
158            match result {
159                WaitResult::Timeout => {
160                    Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
161                        last_entry_id: None,
162                        metadata_last_entry_id: None,
163                        exists: true,
164                        error: Some(format!(
165                            "Flush region timeout, region: {region_id}, timeout: {:?}",
166                            flush_timeout
167                        )),
168                    }))
169                }
170                WaitResult::Finish(Ok(_)) => self.downgrade_to_follower_gracefully(region_id).await,
171                WaitResult::Finish(Err(err)) => {
172                    Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
173                        last_entry_id: None,
174                        metadata_last_entry_id: None,
175                        exists: true,
176                        error: Some(format!("{err:?}")),
177                    }))
178                }
179            }
180        })
181    }
182}
183
184#[cfg(test)]
185mod tests {
186    use std::assert_matches::assert_matches;
187    use std::time::Duration;
188
189    use common_meta::instruction::{DowngradeRegion, InstructionReply};
190    use mito2::engine::MITO_ENGINE_NAME;
191    use store_api::region_engine::{
192        RegionRole, SetRegionRoleStateResponse, SetRegionRoleStateSuccess,
193    };
194    use store_api::region_request::RegionRequest;
195    use store_api::storage::RegionId;
196    use tokio::time::Instant;
197
198    use crate::error;
199    use crate::heartbeat::handler::HandlerContext;
200    use crate::tests::{mock_region_server, MockRegionEngine};
201
202    #[tokio::test]
203    async fn test_region_not_exist() {
204        let mut mock_region_server = mock_region_server();
205        let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
206        mock_region_server.register_engine(mock_engine);
207        let handler_context = HandlerContext::new_for_test(mock_region_server);
208        let region_id = RegionId::new(1024, 1);
209        let waits = vec![None, Some(Duration::from_millis(100u64))];
210
211        for flush_timeout in waits {
212            let reply = handler_context
213                .clone()
214                .handle_downgrade_region_instruction(DowngradeRegion {
215                    region_id,
216                    flush_timeout,
217                })
218                .await;
219            assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
220
221            if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
222                assert!(!reply.exists);
223                assert!(reply.error.is_none());
224                assert!(reply.last_entry_id.is_none());
225            }
226        }
227    }
228
229    #[tokio::test]
230    async fn test_region_readonly() {
231        let mock_region_server = mock_region_server();
232        let region_id = RegionId::new(1024, 1);
233        let (mock_engine, _) =
234            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
235                region_engine.mock_role = Some(Some(RegionRole::Follower));
236                region_engine.handle_request_mock_fn = Some(Box::new(|_, req| {
237                    if let RegionRequest::Flush(_) = req {
238                        // Should be unreachable.
239                        unreachable!();
240                    };
241
242                    Ok(0)
243                }));
244                region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
245                    Ok(SetRegionRoleStateResponse::success(
246                        SetRegionRoleStateSuccess::mito(1024),
247                    ))
248                }))
249            });
250        mock_region_server.register_test_region(region_id, mock_engine);
251        let handler_context = HandlerContext::new_for_test(mock_region_server);
252
253        let waits = vec![None, Some(Duration::from_millis(100u64))];
254        for flush_timeout in waits {
255            let reply = handler_context
256                .clone()
257                .handle_downgrade_region_instruction(DowngradeRegion {
258                    region_id,
259                    flush_timeout,
260                })
261                .await;
262            assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
263
264            if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
265                assert!(reply.exists);
266                assert!(reply.error.is_none());
267                assert_eq!(reply.last_entry_id.unwrap(), 1024);
268            }
269        }
270    }
271
272    #[tokio::test]
273    async fn test_region_flush_timeout() {
274        let mock_region_server = mock_region_server();
275        let region_id = RegionId::new(1024, 1);
276        let (mock_engine, _) =
277            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
278                region_engine.mock_role = Some(Some(RegionRole::Leader));
279                region_engine.handle_request_delay = Some(Duration::from_secs(100));
280                region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
281                    Ok(SetRegionRoleStateResponse::success(
282                        SetRegionRoleStateSuccess::mito(1024),
283                    ))
284                }))
285            });
286        mock_region_server.register_test_region(region_id, mock_engine);
287        let handler_context = HandlerContext::new_for_test(mock_region_server);
288
289        let flush_timeout = Duration::from_millis(100);
290        let reply = handler_context
291            .clone()
292            .handle_downgrade_region_instruction(DowngradeRegion {
293                region_id,
294                flush_timeout: Some(flush_timeout),
295            })
296            .await;
297        assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
298
299        if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
300            assert!(reply.exists);
301            assert!(reply.error.unwrap().contains("timeout"));
302            assert!(reply.last_entry_id.is_none());
303        }
304    }
305
306    #[tokio::test]
307    async fn test_region_flush_timeout_and_retry() {
308        let mock_region_server = mock_region_server();
309        let region_id = RegionId::new(1024, 1);
310        let (mock_engine, _) =
311            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
312                region_engine.mock_role = Some(Some(RegionRole::Leader));
313                region_engine.handle_request_delay = Some(Duration::from_millis(300));
314                region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
315                    Ok(SetRegionRoleStateResponse::success(
316                        SetRegionRoleStateSuccess::mito(1024),
317                    ))
318                }))
319            });
320        mock_region_server.register_test_region(region_id, mock_engine);
321        let handler_context = HandlerContext::new_for_test(mock_region_server);
322
323        let waits = vec![
324            Some(Duration::from_millis(100u64)),
325            Some(Duration::from_millis(100u64)),
326        ];
327
328        for flush_timeout in waits {
329            let reply = handler_context
330                .clone()
331                .handle_downgrade_region_instruction(DowngradeRegion {
332                    region_id,
333                    flush_timeout,
334                })
335                .await;
336            assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
337            if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
338                assert!(reply.exists);
339                assert!(reply.error.unwrap().contains("timeout"));
340                assert!(reply.last_entry_id.is_none());
341            }
342        }
343        let timer = Instant::now();
344        let reply = handler_context
345            .handle_downgrade_region_instruction(DowngradeRegion {
346                region_id,
347                flush_timeout: Some(Duration::from_millis(500)),
348            })
349            .await;
350        assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
351        // Must less than 300 ms.
352        assert!(timer.elapsed().as_millis() < 300);
353
354        if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
355            assert!(reply.exists);
356            assert!(reply.error.is_none());
357            assert_eq!(reply.last_entry_id.unwrap(), 1024);
358        }
359    }
360
361    #[tokio::test]
362    async fn test_region_flush_timeout_and_retry_error() {
363        let mock_region_server = mock_region_server();
364        let region_id = RegionId::new(1024, 1);
365        let (mock_engine, _) =
366            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
367                region_engine.mock_role = Some(Some(RegionRole::Leader));
368                region_engine.handle_request_delay = Some(Duration::from_millis(300));
369                region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
370                    error::UnexpectedSnafu {
371                        violated: "mock flush failed",
372                    }
373                    .fail()
374                }));
375                region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
376                    Ok(SetRegionRoleStateResponse::success(
377                        SetRegionRoleStateSuccess::mito(1024),
378                    ))
379                }))
380            });
381        mock_region_server.register_test_region(region_id, mock_engine);
382        let handler_context = HandlerContext::new_for_test(mock_region_server);
383
384        let waits = vec![
385            Some(Duration::from_millis(100u64)),
386            Some(Duration::from_millis(100u64)),
387        ];
388
389        for flush_timeout in waits {
390            let reply = handler_context
391                .clone()
392                .handle_downgrade_region_instruction(DowngradeRegion {
393                    region_id,
394                    flush_timeout,
395                })
396                .await;
397            assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
398            if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
399                assert!(reply.exists);
400                assert!(reply.error.unwrap().contains("timeout"));
401                assert!(reply.last_entry_id.is_none());
402            }
403        }
404        let timer = Instant::now();
405        let reply = handler_context
406            .handle_downgrade_region_instruction(DowngradeRegion {
407                region_id,
408                flush_timeout: Some(Duration::from_millis(500)),
409            })
410            .await;
411        assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
412        // Must less than 300 ms.
413        assert!(timer.elapsed().as_millis() < 300);
414
415        if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
416            assert!(reply.exists);
417            assert!(reply.error.unwrap().contains("flush failed"));
418            assert!(reply.last_entry_id.is_none());
419        }
420    }
421
422    #[tokio::test]
423    async fn test_set_region_readonly_not_found() {
424        let mock_region_server = mock_region_server();
425        let region_id = RegionId::new(1024, 1);
426        let (mock_engine, _) =
427            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
428                region_engine.mock_role = Some(Some(RegionRole::Leader));
429                region_engine.handle_set_readonly_gracefully_mock_fn =
430                    Some(Box::new(|_| Ok(SetRegionRoleStateResponse::NotFound)));
431            });
432        mock_region_server.register_test_region(region_id, mock_engine);
433        let handler_context = HandlerContext::new_for_test(mock_region_server);
434        let reply = handler_context
435            .clone()
436            .handle_downgrade_region_instruction(DowngradeRegion {
437                region_id,
438                flush_timeout: None,
439            })
440            .await;
441        assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
442        if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
443            assert!(!reply.exists);
444            assert!(reply.error.is_none());
445            assert!(reply.last_entry_id.is_none());
446        }
447    }
448
449    #[tokio::test]
450    async fn test_set_region_readonly_error() {
451        let mock_region_server = mock_region_server();
452        let region_id = RegionId::new(1024, 1);
453        let (mock_engine, _) =
454            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
455                region_engine.mock_role = Some(Some(RegionRole::Leader));
456                region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
457                    error::UnexpectedSnafu {
458                        violated: "Failed to set region to readonly",
459                    }
460                    .fail()
461                }));
462            });
463        mock_region_server.register_test_region(region_id, mock_engine);
464        let handler_context = HandlerContext::new_for_test(mock_region_server);
465        let reply = handler_context
466            .clone()
467            .handle_downgrade_region_instruction(DowngradeRegion {
468                region_id,
469                flush_timeout: None,
470            })
471            .await;
472        assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
473        if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
474            assert!(reply.exists);
475            assert!(reply
476                .error
477                .unwrap()
478                .contains("Failed to set region to readonly"));
479            assert!(reply.last_entry_id.is_none());
480        }
481    }
482}