datanode/heartbeat/handler/
upgrade_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_meta::instruction::{Instruction, InstructionReply, UpgradeRegion, UpgradeRegionReply};
16use common_telemetry::{info, warn};
17use store_api::region_request::{RegionCatchupRequest, RegionRequest, ReplayCheckpoint};
18
19use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
20use crate::heartbeat::task_tracker::WaitResult;
21
22#[derive(Debug, Clone, Copy, Default)]
23pub struct UpgradeRegionsHandler;
24
25#[async_trait::async_trait]
26impl InstructionHandler for UpgradeRegionsHandler {
27    async fn handle(
28        &self,
29        ctx: &HandlerContext,
30        instruction: Instruction,
31    ) -> Option<InstructionReply> {
32        let UpgradeRegion {
33            region_id,
34            last_entry_id,
35            metadata_last_entry_id,
36            replay_timeout,
37            location_id,
38            replay_entry_id,
39            metadata_replay_entry_id,
40        } = instruction.into_upgrade_regions().unwrap();
41
42        let Some(writable) = ctx.region_server.is_region_leader(region_id) else {
43            return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
44                ready: false,
45                exists: false,
46                error: None,
47            }));
48        };
49
50        if writable {
51            return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
52                ready: true,
53                exists: true,
54                error: None,
55            }));
56        }
57
58        let region_server_moved = ctx.region_server.clone();
59
60        let checkpoint = match (replay_entry_id, metadata_replay_entry_id) {
61            (Some(entry_id), metadata_entry_id) => Some(ReplayCheckpoint {
62                entry_id,
63                metadata_entry_id,
64            }),
65            _ => None,
66        };
67
68        // The catchup task is almost zero cost if the inside region is writable.
69        // Therefore, it always registers a new catchup task.
70        let register_result = ctx
71            .catchup_tasks
72            .try_register(
73                region_id,
74                Box::pin(async move {
75                    info!(
76                        "Executing region: {region_id} catchup to: last entry id {last_entry_id:?}"
77                    );
78                    region_server_moved
79                        .handle_request(
80                            region_id,
81                            RegionRequest::Catchup(RegionCatchupRequest {
82                                set_writable: true,
83                                entry_id: last_entry_id,
84                                metadata_entry_id: metadata_last_entry_id,
85                                location_id,
86                                checkpoint,
87                            }),
88                        )
89                        .await?;
90
91                    Ok(())
92                }),
93            )
94            .await;
95
96        if register_result.is_busy() {
97            warn!("Another catchup task is running for the region: {region_id}");
98        }
99
100        // Returns immediately
101        let Some(replay_timeout) = replay_timeout else {
102            return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
103                ready: false,
104                exists: true,
105                error: None,
106            }));
107        };
108
109        // We don't care that it returns a newly registered or running task.
110        let mut watcher = register_result.into_watcher();
111        let result = ctx.catchup_tasks.wait(&mut watcher, replay_timeout).await;
112
113        match result {
114            WaitResult::Timeout => Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
115                ready: false,
116                exists: true,
117                error: None,
118            })),
119            WaitResult::Finish(Ok(_)) => {
120                Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
121                    ready: true,
122                    exists: true,
123                    error: None,
124                }))
125            }
126            WaitResult::Finish(Err(err)) => {
127                Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
128                    ready: false,
129                    exists: true,
130                    error: Some(format!("{err:?}")),
131                }))
132            }
133        }
134    }
135}
136
137#[cfg(test)]
138mod tests {
139    use std::time::Duration;
140
141    use common_meta::instruction::{Instruction, UpgradeRegion};
142    use mito2::engine::MITO_ENGINE_NAME;
143    use store_api::region_engine::RegionRole;
144    use store_api::storage::RegionId;
145    use tokio::time::Instant;
146
147    use crate::error;
148    use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler;
149    use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
150    use crate::tests::{MockRegionEngine, mock_region_server};
151
152    #[tokio::test]
153    async fn test_region_not_exist() {
154        let mut mock_region_server = mock_region_server();
155        let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
156        mock_region_server.register_engine(mock_engine);
157
158        let handler_context = HandlerContext::new_for_test(mock_region_server);
159
160        let region_id = RegionId::new(1024, 1);
161        let waits = vec![None, Some(Duration::from_millis(100u64))];
162
163        for replay_timeout in waits {
164            let reply = UpgradeRegionsHandler
165                .handle(
166                    &handler_context,
167                    Instruction::UpgradeRegion(UpgradeRegion {
168                        region_id,
169                        replay_timeout,
170                        ..Default::default()
171                    }),
172                )
173                .await;
174
175            let reply = reply.unwrap().expect_upgrade_region_reply();
176            assert!(!reply.exists);
177            assert!(reply.error.is_none());
178        }
179    }
180
181    #[tokio::test]
182    async fn test_region_writable() {
183        let mock_region_server = mock_region_server();
184        let region_id = RegionId::new(1024, 1);
185
186        let (mock_engine, _) =
187            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
188                region_engine.mock_role = Some(Some(RegionRole::Leader));
189                region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
190                    // Should be unreachable.
191                    unreachable!();
192                }));
193            });
194        mock_region_server.register_test_region(region_id, mock_engine);
195
196        let handler_context = HandlerContext::new_for_test(mock_region_server);
197
198        let waits = vec![None, Some(Duration::from_millis(100u64))];
199
200        for replay_timeout in waits {
201            let reply = UpgradeRegionsHandler
202                .handle(
203                    &handler_context,
204                    Instruction::UpgradeRegion(UpgradeRegion {
205                        region_id,
206                        replay_timeout,
207                        ..Default::default()
208                    }),
209                )
210                .await;
211
212            let reply = reply.unwrap().expect_upgrade_region_reply();
213            assert!(reply.ready);
214            assert!(reply.exists);
215            assert!(reply.error.is_none());
216        }
217    }
218
219    #[tokio::test]
220    async fn test_region_not_ready() {
221        let mock_region_server = mock_region_server();
222        let region_id = RegionId::new(1024, 1);
223
224        let (mock_engine, _) =
225            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
226                // Region is not ready.
227                region_engine.mock_role = Some(Some(RegionRole::Follower));
228                region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
229                // Note: Don't change.
230                region_engine.handle_request_delay = Some(Duration::from_secs(100));
231            });
232        mock_region_server.register_test_region(region_id, mock_engine);
233
234        let handler_context = HandlerContext::new_for_test(mock_region_server);
235
236        let waits = vec![None, Some(Duration::from_millis(100u64))];
237
238        for replay_timeout in waits {
239            let reply = UpgradeRegionsHandler
240                .handle(
241                    &handler_context,
242                    Instruction::UpgradeRegion(UpgradeRegion {
243                        region_id,
244                        replay_timeout,
245                        ..Default::default()
246                    }),
247                )
248                .await;
249
250            let reply = reply.unwrap().expect_upgrade_region_reply();
251            assert!(!reply.ready);
252            assert!(reply.exists);
253            assert!(reply.error.is_none());
254        }
255    }
256
257    #[tokio::test]
258    async fn test_region_not_ready_with_retry() {
259        let mock_region_server = mock_region_server();
260        let region_id = RegionId::new(1024, 1);
261
262        let (mock_engine, _) =
263            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
264                // Region is not ready.
265                region_engine.mock_role = Some(Some(RegionRole::Follower));
266                region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
267                // Note: Don't change.
268                region_engine.handle_request_delay = Some(Duration::from_millis(300));
269            });
270        mock_region_server.register_test_region(region_id, mock_engine);
271
272        let waits = vec![
273            Some(Duration::from_millis(100u64)),
274            Some(Duration::from_millis(100u64)),
275        ];
276
277        let handler_context = HandlerContext::new_for_test(mock_region_server);
278
279        for replay_timeout in waits {
280            let reply = UpgradeRegionsHandler
281                .handle(
282                    &handler_context,
283                    Instruction::UpgradeRegion(UpgradeRegion {
284                        region_id,
285                        replay_timeout,
286                        ..Default::default()
287                    }),
288                )
289                .await;
290
291            let reply = reply.unwrap().expect_upgrade_region_reply();
292            assert!(!reply.ready);
293            assert!(reply.exists);
294            assert!(reply.error.is_none());
295        }
296
297        let timer = Instant::now();
298        let reply = UpgradeRegionsHandler
299            .handle(
300                &handler_context,
301                Instruction::UpgradeRegion(UpgradeRegion {
302                    region_id,
303                    replay_timeout: Some(Duration::from_millis(500)),
304                    ..Default::default()
305                }),
306            )
307            .await;
308        // Must less than 300 ms.
309        assert!(timer.elapsed().as_millis() < 300);
310
311        let reply = reply.unwrap().expect_upgrade_region_reply();
312        assert!(reply.ready);
313        assert!(reply.exists);
314        assert!(reply.error.is_none());
315    }
316
317    #[tokio::test]
318    async fn test_region_error() {
319        let mock_region_server = mock_region_server();
320        let region_id = RegionId::new(1024, 1);
321
322        let (mock_engine, _) =
323            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
324                // Region is not ready.
325                region_engine.mock_role = Some(Some(RegionRole::Follower));
326                region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
327                    error::UnexpectedSnafu {
328                        violated: "mock_error".to_string(),
329                    }
330                    .fail()
331                }));
332                // Note: Don't change.
333                region_engine.handle_request_delay = Some(Duration::from_millis(100));
334            });
335        mock_region_server.register_test_region(region_id, mock_engine);
336
337        let handler_context = HandlerContext::new_for_test(mock_region_server);
338
339        let reply = UpgradeRegionsHandler
340            .handle(
341                &handler_context,
342                Instruction::UpgradeRegion(UpgradeRegion {
343                    region_id,
344                    ..Default::default()
345                }),
346            )
347            .await;
348
349        // It didn't wait for handle returns; it had no idea about the error.
350        let reply = reply.unwrap().expect_upgrade_region_reply();
351        assert!(!reply.ready);
352        assert!(reply.exists);
353        assert!(reply.error.is_none());
354
355        let reply = UpgradeRegionsHandler
356            .handle(
357                &handler_context,
358                Instruction::UpgradeRegion(UpgradeRegion {
359                    region_id,
360                    replay_timeout: Some(Duration::from_millis(200)),
361                    ..Default::default()
362                }),
363            )
364            .await;
365
366        let reply = reply.unwrap().expect_upgrade_region_reply();
367        assert!(!reply.ready);
368        assert!(reply.exists);
369        assert!(reply.error.is_some());
370        assert!(reply.error.unwrap().contains("mock_error"));
371    }
372}