datanode/heartbeat/handler/
downgrade_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_meta::instruction::{DowngradeRegion, DowngradeRegionReply, InstructionReply};
16use common_telemetry::tracing::info;
17use common_telemetry::{error, warn};
18use futures_util::future::BoxFuture;
19use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
20use store_api::region_request::{RegionFlushRequest, RegionRequest};
21use store_api::storage::RegionId;
22
23use crate::heartbeat::handler::HandlerContext;
24use crate::heartbeat::task_tracker::WaitResult;
25
26impl HandlerContext {
27    async fn downgrade_to_follower_gracefully(
28        &self,
29        region_id: RegionId,
30    ) -> Option<InstructionReply> {
31        match self
32            .region_server
33            .set_region_role_state_gracefully(region_id, SettableRegionRoleState::Follower)
34            .await
35        {
36            Ok(SetRegionRoleStateResponse::Success(success)) => {
37                Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
38                    last_entry_id: success.last_entry_id(),
39                    metadata_last_entry_id: success.metadata_last_entry_id(),
40                    exists: true,
41                    error: None,
42                }))
43            }
44            Ok(SetRegionRoleStateResponse::NotFound) => {
45                warn!("Region: {region_id} is not found");
46                Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
47                    last_entry_id: None,
48                    metadata_last_entry_id: None,
49                    exists: false,
50                    error: None,
51                }))
52            }
53            Ok(SetRegionRoleStateResponse::InvalidTransition(err)) => {
54                error!(err; "Failed to convert region to follower - invalid transition");
55                Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
56                    last_entry_id: None,
57                    metadata_last_entry_id: None,
58                    exists: true,
59                    error: Some(format!("{err:?}")),
60                }))
61            }
62            Err(err) => {
63                error!(err; "Failed to convert region to {}", SettableRegionRoleState::Follower);
64                Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
65                    last_entry_id: None,
66                    metadata_last_entry_id: None,
67                    exists: true,
68                    error: Some(format!("{err:?}")),
69                }))
70            }
71        }
72    }
73
74    pub(crate) fn handle_downgrade_region_instruction(
75        self,
76        DowngradeRegion {
77            region_id,
78            flush_timeout,
79        }: DowngradeRegion,
80    ) -> BoxFuture<'static, Option<InstructionReply>> {
81        Box::pin(async move {
82            let Some(writable) = self.region_server.is_region_leader(region_id) else {
83                warn!("Region: {region_id} is not found");
84                return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
85                    last_entry_id: None,
86                    metadata_last_entry_id: None,
87                    exists: false,
88                    error: None,
89                }));
90            };
91
92            let region_server_moved = self.region_server.clone();
93
94            // Ignores flush request
95            if !writable {
96                warn!(
97                    "Region: {region_id} is not writable, flush_timeout: {:?}",
98                    flush_timeout
99                );
100                return self.downgrade_to_follower_gracefully(region_id).await;
101            }
102
103            // If flush_timeout is not set, directly convert region to follower.
104            let Some(flush_timeout) = flush_timeout else {
105                return self.downgrade_to_follower_gracefully(region_id).await;
106            };
107
108            // Sets region to downgrading,
109            // the downgrading region will reject all write requests.
110            // However, the downgrading region will still accept read, flush requests.
111            match self
112                .region_server
113                .set_region_role_state_gracefully(
114                    region_id,
115                    SettableRegionRoleState::DowngradingLeader,
116                )
117                .await
118            {
119                Ok(SetRegionRoleStateResponse::Success { .. }) => {}
120                Ok(SetRegionRoleStateResponse::NotFound) => {
121                    warn!("Region: {region_id} is not found");
122                    return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
123                        last_entry_id: None,
124                        metadata_last_entry_id: None,
125                        exists: false,
126                        error: None,
127                    }));
128                }
129                Ok(SetRegionRoleStateResponse::InvalidTransition(err)) => {
130                    error!(err; "Failed to convert region to downgrading leader - invalid transition");
131                    return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
132                        last_entry_id: None,
133                        metadata_last_entry_id: None,
134                        exists: true,
135                        error: Some(format!("{err:?}")),
136                    }));
137                }
138                Err(err) => {
139                    error!(err; "Failed to convert region to downgrading leader");
140                    return Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
141                        last_entry_id: None,
142                        metadata_last_entry_id: None,
143                        exists: true,
144                        error: Some(format!("{err:?}")),
145                    }));
146                }
147            }
148
149            let register_result = self
150                .downgrade_tasks
151                .try_register(
152                    region_id,
153                    Box::pin(async move {
154                        info!("Flush region: {region_id} before converting region to follower");
155                        region_server_moved
156                            .handle_request(
157                                region_id,
158                                RegionRequest::Flush(RegionFlushRequest {
159                                    row_group_size: None,
160                                }),
161                            )
162                            .await?;
163
164                        Ok(())
165                    }),
166                )
167                .await;
168
169            if register_result.is_busy() {
170                warn!("Another flush task is running for the region: {region_id}");
171            }
172
173            let mut watcher = register_result.into_watcher();
174            let result = self.downgrade_tasks.wait(&mut watcher, flush_timeout).await;
175
176            match result {
177                WaitResult::Timeout => {
178                    Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
179                        last_entry_id: None,
180                        metadata_last_entry_id: None,
181                        exists: true,
182                        error: Some(format!(
183                            "Flush region timeout, region: {region_id}, timeout: {:?}",
184                            flush_timeout
185                        )),
186                    }))
187                }
188                WaitResult::Finish(Ok(_)) => self.downgrade_to_follower_gracefully(region_id).await,
189                WaitResult::Finish(Err(err)) => {
190                    Some(InstructionReply::DowngradeRegion(DowngradeRegionReply {
191                        last_entry_id: None,
192                        metadata_last_entry_id: None,
193                        exists: true,
194                        error: Some(format!("{err:?}")),
195                    }))
196                }
197            }
198        })
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use std::assert_matches::assert_matches;
205    use std::time::Duration;
206
207    use common_meta::instruction::{DowngradeRegion, InstructionReply};
208    use mito2::engine::MITO_ENGINE_NAME;
209    use store_api::region_engine::{
210        RegionRole, SetRegionRoleStateResponse, SetRegionRoleStateSuccess,
211    };
212    use store_api::region_request::RegionRequest;
213    use store_api::storage::RegionId;
214    use tokio::time::Instant;
215
216    use crate::error;
217    use crate::heartbeat::handler::HandlerContext;
218    use crate::tests::{mock_region_server, MockRegionEngine};
219
220    #[tokio::test]
221    async fn test_region_not_exist() {
222        let mut mock_region_server = mock_region_server();
223        let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
224        mock_region_server.register_engine(mock_engine);
225        let handler_context = HandlerContext::new_for_test(mock_region_server);
226        let region_id = RegionId::new(1024, 1);
227        let waits = vec![None, Some(Duration::from_millis(100u64))];
228
229        for flush_timeout in waits {
230            let reply = handler_context
231                .clone()
232                .handle_downgrade_region_instruction(DowngradeRegion {
233                    region_id,
234                    flush_timeout,
235                })
236                .await;
237            assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
238
239            if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
240                assert!(!reply.exists);
241                assert!(reply.error.is_none());
242                assert!(reply.last_entry_id.is_none());
243            }
244        }
245    }
246
247    #[tokio::test]
248    async fn test_region_readonly() {
249        let mock_region_server = mock_region_server();
250        let region_id = RegionId::new(1024, 1);
251        let (mock_engine, _) =
252            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
253                region_engine.mock_role = Some(Some(RegionRole::Follower));
254                region_engine.handle_request_mock_fn = Some(Box::new(|_, req| {
255                    if let RegionRequest::Flush(_) = req {
256                        // Should be unreachable.
257                        unreachable!();
258                    };
259
260                    Ok(0)
261                }));
262                region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
263                    Ok(SetRegionRoleStateResponse::success(
264                        SetRegionRoleStateSuccess::mito(1024),
265                    ))
266                }))
267            });
268        mock_region_server.register_test_region(region_id, mock_engine);
269        let handler_context = HandlerContext::new_for_test(mock_region_server);
270
271        let waits = vec![None, Some(Duration::from_millis(100u64))];
272        for flush_timeout in waits {
273            let reply = handler_context
274                .clone()
275                .handle_downgrade_region_instruction(DowngradeRegion {
276                    region_id,
277                    flush_timeout,
278                })
279                .await;
280            assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
281
282            if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
283                assert!(reply.exists);
284                assert!(reply.error.is_none());
285                assert_eq!(reply.last_entry_id.unwrap(), 1024);
286            }
287        }
288    }
289
290    #[tokio::test]
291    async fn test_region_flush_timeout() {
292        let mock_region_server = mock_region_server();
293        let region_id = RegionId::new(1024, 1);
294        let (mock_engine, _) =
295            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
296                region_engine.mock_role = Some(Some(RegionRole::Leader));
297                region_engine.handle_request_delay = Some(Duration::from_secs(100));
298                region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
299                    Ok(SetRegionRoleStateResponse::success(
300                        SetRegionRoleStateSuccess::mito(1024),
301                    ))
302                }))
303            });
304        mock_region_server.register_test_region(region_id, mock_engine);
305        let handler_context = HandlerContext::new_for_test(mock_region_server);
306
307        let flush_timeout = Duration::from_millis(100);
308        let reply = handler_context
309            .clone()
310            .handle_downgrade_region_instruction(DowngradeRegion {
311                region_id,
312                flush_timeout: Some(flush_timeout),
313            })
314            .await;
315        assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
316
317        if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
318            assert!(reply.exists);
319            assert!(reply.error.unwrap().contains("timeout"));
320            assert!(reply.last_entry_id.is_none());
321        }
322    }
323
324    #[tokio::test]
325    async fn test_region_flush_timeout_and_retry() {
326        let mock_region_server = mock_region_server();
327        let region_id = RegionId::new(1024, 1);
328        let (mock_engine, _) =
329            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
330                region_engine.mock_role = Some(Some(RegionRole::Leader));
331                region_engine.handle_request_delay = Some(Duration::from_millis(300));
332                region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
333                    Ok(SetRegionRoleStateResponse::success(
334                        SetRegionRoleStateSuccess::mito(1024),
335                    ))
336                }))
337            });
338        mock_region_server.register_test_region(region_id, mock_engine);
339        let handler_context = HandlerContext::new_for_test(mock_region_server);
340
341        let waits = vec![
342            Some(Duration::from_millis(100u64)),
343            Some(Duration::from_millis(100u64)),
344        ];
345
346        for flush_timeout in waits {
347            let reply = handler_context
348                .clone()
349                .handle_downgrade_region_instruction(DowngradeRegion {
350                    region_id,
351                    flush_timeout,
352                })
353                .await;
354            assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
355            if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
356                assert!(reply.exists);
357                assert!(reply.error.unwrap().contains("timeout"));
358                assert!(reply.last_entry_id.is_none());
359            }
360        }
361        let timer = Instant::now();
362        let reply = handler_context
363            .handle_downgrade_region_instruction(DowngradeRegion {
364                region_id,
365                flush_timeout: Some(Duration::from_millis(500)),
366            })
367            .await;
368        assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
369        // Must less than 300 ms.
370        assert!(timer.elapsed().as_millis() < 300);
371
372        if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
373            assert!(reply.exists);
374            assert!(reply.error.is_none());
375            assert_eq!(reply.last_entry_id.unwrap(), 1024);
376        }
377    }
378
379    #[tokio::test]
380    async fn test_region_flush_timeout_and_retry_error() {
381        let mock_region_server = mock_region_server();
382        let region_id = RegionId::new(1024, 1);
383        let (mock_engine, _) =
384            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
385                region_engine.mock_role = Some(Some(RegionRole::Leader));
386                region_engine.handle_request_delay = Some(Duration::from_millis(300));
387                region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
388                    error::UnexpectedSnafu {
389                        violated: "mock flush failed",
390                    }
391                    .fail()
392                }));
393                region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
394                    Ok(SetRegionRoleStateResponse::success(
395                        SetRegionRoleStateSuccess::mito(1024),
396                    ))
397                }))
398            });
399        mock_region_server.register_test_region(region_id, mock_engine);
400        let handler_context = HandlerContext::new_for_test(mock_region_server);
401
402        let waits = vec![
403            Some(Duration::from_millis(100u64)),
404            Some(Duration::from_millis(100u64)),
405        ];
406
407        for flush_timeout in waits {
408            let reply = handler_context
409                .clone()
410                .handle_downgrade_region_instruction(DowngradeRegion {
411                    region_id,
412                    flush_timeout,
413                })
414                .await;
415            assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
416            if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
417                assert!(reply.exists);
418                assert!(reply.error.unwrap().contains("timeout"));
419                assert!(reply.last_entry_id.is_none());
420            }
421        }
422        let timer = Instant::now();
423        let reply = handler_context
424            .handle_downgrade_region_instruction(DowngradeRegion {
425                region_id,
426                flush_timeout: Some(Duration::from_millis(500)),
427            })
428            .await;
429        assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
430        // Must less than 300 ms.
431        assert!(timer.elapsed().as_millis() < 300);
432
433        if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
434            assert!(reply.exists);
435            assert!(reply.error.unwrap().contains("flush failed"));
436            assert!(reply.last_entry_id.is_none());
437        }
438    }
439
440    #[tokio::test]
441    async fn test_set_region_readonly_not_found() {
442        let mock_region_server = mock_region_server();
443        let region_id = RegionId::new(1024, 1);
444        let (mock_engine, _) =
445            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
446                region_engine.mock_role = Some(Some(RegionRole::Leader));
447                region_engine.handle_set_readonly_gracefully_mock_fn =
448                    Some(Box::new(|_| Ok(SetRegionRoleStateResponse::NotFound)));
449            });
450        mock_region_server.register_test_region(region_id, mock_engine);
451        let handler_context = HandlerContext::new_for_test(mock_region_server);
452        let reply = handler_context
453            .clone()
454            .handle_downgrade_region_instruction(DowngradeRegion {
455                region_id,
456                flush_timeout: None,
457            })
458            .await;
459        assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
460        if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
461            assert!(!reply.exists);
462            assert!(reply.error.is_none());
463            assert!(reply.last_entry_id.is_none());
464        }
465    }
466
467    #[tokio::test]
468    async fn test_set_region_readonly_error() {
469        let mock_region_server = mock_region_server();
470        let region_id = RegionId::new(1024, 1);
471        let (mock_engine, _) =
472            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
473                region_engine.mock_role = Some(Some(RegionRole::Leader));
474                region_engine.handle_set_readonly_gracefully_mock_fn = Some(Box::new(|_| {
475                    error::UnexpectedSnafu {
476                        violated: "Failed to set region to readonly",
477                    }
478                    .fail()
479                }));
480            });
481        mock_region_server.register_test_region(region_id, mock_engine);
482        let handler_context = HandlerContext::new_for_test(mock_region_server);
483        let reply = handler_context
484            .clone()
485            .handle_downgrade_region_instruction(DowngradeRegion {
486                region_id,
487                flush_timeout: None,
488            })
489            .await;
490        assert_matches!(reply, Some(InstructionReply::DowngradeRegion(_)));
491        if let InstructionReply::DowngradeRegion(reply) = reply.unwrap() {
492            assert!(reply.exists);
493            assert!(reply
494                .error
495                .unwrap()
496                .contains("Failed to set region to readonly"));
497            assert!(reply.last_entry_id.is_none());
498        }
499    }
500}