datanode/heartbeat/handler/
upgrade_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_meta::instruction::{InstructionReply, UpgradeRegion, UpgradeRegionReply};
16use common_telemetry::{info, warn};
17use futures_util::future::BoxFuture;
18use store_api::region_request::{RegionCatchupRequest, RegionRequest};
19
20use crate::heartbeat::handler::HandlerContext;
21use crate::heartbeat::task_tracker::WaitResult;
22
23impl HandlerContext {
24    pub(crate) fn handle_upgrade_region_instruction(
25        self,
26        UpgradeRegion {
27            region_id,
28            last_entry_id,
29            metadata_last_entry_id,
30            replay_timeout,
31            location_id,
32        }: UpgradeRegion,
33    ) -> BoxFuture<'static, Option<InstructionReply>> {
34        Box::pin(async move {
35            let Some(writable) = self.region_server.is_region_leader(region_id) else {
36                return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
37                    ready: false,
38                    exists: false,
39                    error: None,
40                }));
41            };
42
43            if writable {
44                return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
45                    ready: true,
46                    exists: true,
47                    error: None,
48                }));
49            }
50
51            let region_server_moved = self.region_server.clone();
52
53            // The catchup task is almost zero cost if the inside region is writable.
54            // Therefore, it always registers a new catchup task.
55            let register_result = self
56                .catchup_tasks
57                .try_register(
58                    region_id,
59                    Box::pin(async move {
60                        info!("Executing region: {region_id} catchup to: last entry id {last_entry_id:?}");
61                        region_server_moved
62                            .handle_request(
63                                region_id,
64                                RegionRequest::Catchup(RegionCatchupRequest {
65                                    set_writable: true,
66                                    entry_id: last_entry_id,
67                                    metadata_entry_id: metadata_last_entry_id,
68                                    location_id,
69                                }),
70                            )
71                            .await?;
72
73                        Ok(())
74                    }),
75                )
76                .await;
77
78            if register_result.is_busy() {
79                warn!("Another catchup task is running for the region: {region_id}");
80            }
81
82            // Returns immediately
83            let Some(replay_timeout) = replay_timeout else {
84                return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
85                    ready: false,
86                    exists: true,
87                    error: None,
88                }));
89            };
90
91            // We don't care that it returns a newly registered or running task.
92            let mut watcher = register_result.into_watcher();
93            let result = self.catchup_tasks.wait(&mut watcher, replay_timeout).await;
94
95            match result {
96                WaitResult::Timeout => Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
97                    ready: false,
98                    exists: true,
99                    error: None,
100                })),
101                WaitResult::Finish(Ok(_)) => {
102                    Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
103                        ready: true,
104                        exists: true,
105                        error: None,
106                    }))
107                }
108                WaitResult::Finish(Err(err)) => {
109                    Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
110                        ready: false,
111                        exists: true,
112                        error: Some(format!("{err:?}")),
113                    }))
114                }
115            }
116        })
117    }
118}
119
120#[cfg(test)]
121mod tests {
122    use std::assert_matches::assert_matches;
123    use std::time::Duration;
124
125    use common_meta::instruction::{InstructionReply, UpgradeRegion};
126    use mito2::engine::MITO_ENGINE_NAME;
127    use store_api::region_engine::RegionRole;
128    use store_api::storage::RegionId;
129    use tokio::time::Instant;
130
131    use crate::error;
132    use crate::heartbeat::handler::HandlerContext;
133    use crate::tests::{mock_region_server, MockRegionEngine};
134
135    #[tokio::test]
136    async fn test_region_not_exist() {
137        let mut mock_region_server = mock_region_server();
138        let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
139        mock_region_server.register_engine(mock_engine);
140
141        let handler_context = HandlerContext::new_for_test(mock_region_server);
142
143        let region_id = RegionId::new(1024, 1);
144        let waits = vec![None, Some(Duration::from_millis(100u64))];
145
146        for replay_timeout in waits {
147            let reply = handler_context
148                .clone()
149                .handle_upgrade_region_instruction(UpgradeRegion {
150                    region_id,
151                    last_entry_id: None,
152                    metadata_last_entry_id: None,
153                    replay_timeout,
154                    location_id: None,
155                })
156                .await;
157            assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
158
159            if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
160                assert!(!reply.exists);
161                assert!(reply.error.is_none());
162            }
163        }
164    }
165
166    #[tokio::test]
167    async fn test_region_writable() {
168        let mock_region_server = mock_region_server();
169        let region_id = RegionId::new(1024, 1);
170
171        let (mock_engine, _) =
172            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
173                region_engine.mock_role = Some(Some(RegionRole::Leader));
174                region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
175                    // Should be unreachable.
176                    unreachable!();
177                }));
178            });
179        mock_region_server.register_test_region(region_id, mock_engine);
180
181        let handler_context = HandlerContext::new_for_test(mock_region_server);
182
183        let waits = vec![None, Some(Duration::from_millis(100u64))];
184
185        for replay_timeout in waits {
186            let reply = handler_context
187                .clone()
188                .handle_upgrade_region_instruction(UpgradeRegion {
189                    region_id,
190                    last_entry_id: None,
191                    metadata_last_entry_id: None,
192                    replay_timeout,
193                    location_id: None,
194                })
195                .await;
196            assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
197
198            if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
199                assert!(reply.ready);
200                assert!(reply.exists);
201                assert!(reply.error.is_none());
202            }
203        }
204    }
205
206    #[tokio::test]
207    async fn test_region_not_ready() {
208        let mock_region_server = mock_region_server();
209        let region_id = RegionId::new(1024, 1);
210
211        let (mock_engine, _) =
212            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
213                // Region is not ready.
214                region_engine.mock_role = Some(Some(RegionRole::Follower));
215                region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
216                // Note: Don't change.
217                region_engine.handle_request_delay = Some(Duration::from_secs(100));
218            });
219        mock_region_server.register_test_region(region_id, mock_engine);
220
221        let handler_context = HandlerContext::new_for_test(mock_region_server);
222
223        let waits = vec![None, Some(Duration::from_millis(100u64))];
224
225        for replay_timeout in waits {
226            let reply = handler_context
227                .clone()
228                .handle_upgrade_region_instruction(UpgradeRegion {
229                    region_id,
230                    last_entry_id: None,
231                    metadata_last_entry_id: None,
232                    replay_timeout,
233                    location_id: None,
234                })
235                .await;
236            assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
237
238            if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
239                assert!(!reply.ready);
240                assert!(reply.exists);
241                assert!(reply.error.is_none());
242            }
243        }
244    }
245
246    #[tokio::test]
247    async fn test_region_not_ready_with_retry() {
248        let mock_region_server = mock_region_server();
249        let region_id = RegionId::new(1024, 1);
250
251        let (mock_engine, _) =
252            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
253                // Region is not ready.
254                region_engine.mock_role = Some(Some(RegionRole::Follower));
255                region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
256                // Note: Don't change.
257                region_engine.handle_request_delay = Some(Duration::from_millis(300));
258            });
259        mock_region_server.register_test_region(region_id, mock_engine);
260
261        let waits = vec![
262            Some(Duration::from_millis(100u64)),
263            Some(Duration::from_millis(100u64)),
264        ];
265
266        let handler_context = HandlerContext::new_for_test(mock_region_server);
267
268        for replay_timeout in waits {
269            let reply = handler_context
270                .clone()
271                .handle_upgrade_region_instruction(UpgradeRegion {
272                    region_id,
273                    replay_timeout,
274                    last_entry_id: None,
275                    metadata_last_entry_id: None,
276                    location_id: None,
277                })
278                .await;
279            assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
280
281            if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
282                assert!(!reply.ready);
283                assert!(reply.exists);
284                assert!(reply.error.is_none());
285            }
286        }
287
288        let timer = Instant::now();
289        let reply = handler_context
290            .handle_upgrade_region_instruction(UpgradeRegion {
291                region_id,
292                last_entry_id: None,
293                metadata_last_entry_id: None,
294                replay_timeout: Some(Duration::from_millis(500)),
295                location_id: None,
296            })
297            .await;
298        assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
299        // Must less than 300 ms.
300        assert!(timer.elapsed().as_millis() < 300);
301
302        if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
303            assert!(reply.ready);
304            assert!(reply.exists);
305            assert!(reply.error.is_none());
306        }
307    }
308
309    #[tokio::test]
310    async fn test_region_error() {
311        let mock_region_server = mock_region_server();
312        let region_id = RegionId::new(1024, 1);
313
314        let (mock_engine, _) =
315            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
316                // Region is not ready.
317                region_engine.mock_role = Some(Some(RegionRole::Follower));
318                region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
319                    error::UnexpectedSnafu {
320                        violated: "mock_error".to_string(),
321                    }
322                    .fail()
323                }));
324                // Note: Don't change.
325                region_engine.handle_request_delay = Some(Duration::from_millis(100));
326            });
327        mock_region_server.register_test_region(region_id, mock_engine);
328
329        let handler_context = HandlerContext::new_for_test(mock_region_server);
330
331        let reply = handler_context
332            .clone()
333            .handle_upgrade_region_instruction(UpgradeRegion {
334                region_id,
335                last_entry_id: None,
336                metadata_last_entry_id: None,
337                replay_timeout: None,
338                location_id: None,
339            })
340            .await;
341        assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
342
343        // It didn't wait for handle returns; it had no idea about the error.
344        if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
345            assert!(!reply.ready);
346            assert!(reply.exists);
347            assert!(reply.error.is_none());
348        }
349
350        let reply = handler_context
351            .clone()
352            .handle_upgrade_region_instruction(UpgradeRegion {
353                region_id,
354                last_entry_id: None,
355                metadata_last_entry_id: None,
356                replay_timeout: Some(Duration::from_millis(200)),
357                location_id: None,
358            })
359            .await;
360        assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
361
362        if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
363            assert!(!reply.ready);
364            assert!(reply.exists);
365            assert!(reply.error.is_some());
366            assert!(reply.error.unwrap().contains("mock_error"));
367        }
368    }
369}