datanode/heartbeat/handler/
upgrade_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_meta::instruction::{InstructionReply, UpgradeRegion, UpgradeRegionReply};
16use common_telemetry::{info, warn};
17use futures_util::future::BoxFuture;
18use store_api::region_request::{RegionCatchupRequest, RegionRequest, ReplayCheckpoint};
19
20use crate::heartbeat::handler::HandlerContext;
21use crate::heartbeat::task_tracker::WaitResult;
22
23impl HandlerContext {
24    pub(crate) fn handle_upgrade_region_instruction(
25        self,
26        UpgradeRegion {
27            region_id,
28            last_entry_id,
29            metadata_last_entry_id,
30            replay_timeout,
31            location_id,
32            replay_entry_id,
33            metadata_replay_entry_id,
34        }: UpgradeRegion,
35    ) -> BoxFuture<'static, Option<InstructionReply>> {
36        Box::pin(async move {
37            let Some(writable) = self.region_server.is_region_leader(region_id) else {
38                return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
39                    ready: false,
40                    exists: false,
41                    error: None,
42                }));
43            };
44
45            if writable {
46                return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
47                    ready: true,
48                    exists: true,
49                    error: None,
50                }));
51            }
52
53            let region_server_moved = self.region_server.clone();
54
55            let checkpoint = match (replay_entry_id, metadata_replay_entry_id) {
56                (Some(entry_id), metadata_entry_id) => Some(ReplayCheckpoint {
57                    entry_id,
58                    metadata_entry_id,
59                }),
60                _ => None,
61            };
62
63            // The catchup task is almost zero cost if the inside region is writable.
64            // Therefore, it always registers a new catchup task.
65            let register_result = self
66                .catchup_tasks
67                .try_register(
68                    region_id,
69                    Box::pin(async move {
70                        info!("Executing region: {region_id} catchup to: last entry id {last_entry_id:?}");
71                        region_server_moved
72                            .handle_request(
73                                region_id,
74                                RegionRequest::Catchup(RegionCatchupRequest {
75                                    set_writable: true,
76                                    entry_id: last_entry_id,
77                                    metadata_entry_id: metadata_last_entry_id,
78                                    location_id,
79                                    checkpoint,
80                                }),
81                            )
82                            .await?;
83
84                        Ok(())
85                    }),
86                )
87                .await;
88
89            if register_result.is_busy() {
90                warn!("Another catchup task is running for the region: {region_id}");
91            }
92
93            // Returns immediately
94            let Some(replay_timeout) = replay_timeout else {
95                return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
96                    ready: false,
97                    exists: true,
98                    error: None,
99                }));
100            };
101
102            // We don't care that it returns a newly registered or running task.
103            let mut watcher = register_result.into_watcher();
104            let result = self.catchup_tasks.wait(&mut watcher, replay_timeout).await;
105
106            match result {
107                WaitResult::Timeout => Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
108                    ready: false,
109                    exists: true,
110                    error: None,
111                })),
112                WaitResult::Finish(Ok(_)) => {
113                    Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
114                        ready: true,
115                        exists: true,
116                        error: None,
117                    }))
118                }
119                WaitResult::Finish(Err(err)) => {
120                    Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
121                        ready: false,
122                        exists: true,
123                        error: Some(format!("{err:?}")),
124                    }))
125                }
126            }
127        })
128    }
129}
130
131#[cfg(test)]
132mod tests {
133    use std::assert_matches::assert_matches;
134    use std::time::Duration;
135
136    use common_meta::instruction::{InstructionReply, UpgradeRegion};
137    use mito2::engine::MITO_ENGINE_NAME;
138    use store_api::region_engine::RegionRole;
139    use store_api::storage::RegionId;
140    use tokio::time::Instant;
141
142    use crate::error;
143    use crate::heartbeat::handler::HandlerContext;
144    use crate::tests::{mock_region_server, MockRegionEngine};
145
146    #[tokio::test]
147    async fn test_region_not_exist() {
148        let mut mock_region_server = mock_region_server();
149        let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
150        mock_region_server.register_engine(mock_engine);
151
152        let handler_context = HandlerContext::new_for_test(mock_region_server);
153
154        let region_id = RegionId::new(1024, 1);
155        let waits = vec![None, Some(Duration::from_millis(100u64))];
156
157        for replay_timeout in waits {
158            let reply = handler_context
159                .clone()
160                .handle_upgrade_region_instruction(UpgradeRegion {
161                    region_id,
162                    replay_timeout,
163                    ..Default::default()
164                })
165                .await;
166            assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
167
168            if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
169                assert!(!reply.exists);
170                assert!(reply.error.is_none());
171            }
172        }
173    }
174
175    #[tokio::test]
176    async fn test_region_writable() {
177        let mock_region_server = mock_region_server();
178        let region_id = RegionId::new(1024, 1);
179
180        let (mock_engine, _) =
181            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
182                region_engine.mock_role = Some(Some(RegionRole::Leader));
183                region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
184                    // Should be unreachable.
185                    unreachable!();
186                }));
187            });
188        mock_region_server.register_test_region(region_id, mock_engine);
189
190        let handler_context = HandlerContext::new_for_test(mock_region_server);
191
192        let waits = vec![None, Some(Duration::from_millis(100u64))];
193
194        for replay_timeout in waits {
195            let reply = handler_context
196                .clone()
197                .handle_upgrade_region_instruction(UpgradeRegion {
198                    region_id,
199                    replay_timeout,
200                    ..Default::default()
201                })
202                .await;
203            assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
204
205            if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
206                assert!(reply.ready);
207                assert!(reply.exists);
208                assert!(reply.error.is_none());
209            }
210        }
211    }
212
213    #[tokio::test]
214    async fn test_region_not_ready() {
215        let mock_region_server = mock_region_server();
216        let region_id = RegionId::new(1024, 1);
217
218        let (mock_engine, _) =
219            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
220                // Region is not ready.
221                region_engine.mock_role = Some(Some(RegionRole::Follower));
222                region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
223                // Note: Don't change.
224                region_engine.handle_request_delay = Some(Duration::from_secs(100));
225            });
226        mock_region_server.register_test_region(region_id, mock_engine);
227
228        let handler_context = HandlerContext::new_for_test(mock_region_server);
229
230        let waits = vec![None, Some(Duration::from_millis(100u64))];
231
232        for replay_timeout in waits {
233            let reply = handler_context
234                .clone()
235                .handle_upgrade_region_instruction(UpgradeRegion {
236                    region_id,
237                    replay_timeout,
238                    ..Default::default()
239                })
240                .await;
241            assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
242
243            if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
244                assert!(!reply.ready);
245                assert!(reply.exists);
246                assert!(reply.error.is_none());
247            }
248        }
249    }
250
251    #[tokio::test]
252    async fn test_region_not_ready_with_retry() {
253        let mock_region_server = mock_region_server();
254        let region_id = RegionId::new(1024, 1);
255
256        let (mock_engine, _) =
257            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
258                // Region is not ready.
259                region_engine.mock_role = Some(Some(RegionRole::Follower));
260                region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
261                // Note: Don't change.
262                region_engine.handle_request_delay = Some(Duration::from_millis(300));
263            });
264        mock_region_server.register_test_region(region_id, mock_engine);
265
266        let waits = vec![
267            Some(Duration::from_millis(100u64)),
268            Some(Duration::from_millis(100u64)),
269        ];
270
271        let handler_context = HandlerContext::new_for_test(mock_region_server);
272
273        for replay_timeout in waits {
274            let reply = handler_context
275                .clone()
276                .handle_upgrade_region_instruction(UpgradeRegion {
277                    region_id,
278                    replay_timeout,
279                    ..Default::default()
280                })
281                .await;
282            assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
283
284            if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
285                assert!(!reply.ready);
286                assert!(reply.exists);
287                assert!(reply.error.is_none());
288            }
289        }
290
291        let timer = Instant::now();
292        let reply = handler_context
293            .handle_upgrade_region_instruction(UpgradeRegion {
294                region_id,
295                replay_timeout: Some(Duration::from_millis(500)),
296                ..Default::default()
297            })
298            .await;
299        assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
300        // Must less than 300 ms.
301        assert!(timer.elapsed().as_millis() < 300);
302
303        if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
304            assert!(reply.ready);
305            assert!(reply.exists);
306            assert!(reply.error.is_none());
307        }
308    }
309
310    #[tokio::test]
311    async fn test_region_error() {
312        let mock_region_server = mock_region_server();
313        let region_id = RegionId::new(1024, 1);
314
315        let (mock_engine, _) =
316            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
317                // Region is not ready.
318                region_engine.mock_role = Some(Some(RegionRole::Follower));
319                region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
320                    error::UnexpectedSnafu {
321                        violated: "mock_error".to_string(),
322                    }
323                    .fail()
324                }));
325                // Note: Don't change.
326                region_engine.handle_request_delay = Some(Duration::from_millis(100));
327            });
328        mock_region_server.register_test_region(region_id, mock_engine);
329
330        let handler_context = HandlerContext::new_for_test(mock_region_server);
331
332        let reply = handler_context
333            .clone()
334            .handle_upgrade_region_instruction(UpgradeRegion {
335                region_id,
336                ..Default::default()
337            })
338            .await;
339        assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
340
341        // It didn't wait for handle returns; it had no idea about the error.
342        if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
343            assert!(!reply.ready);
344            assert!(reply.exists);
345            assert!(reply.error.is_none());
346        }
347
348        let reply = handler_context
349            .clone()
350            .handle_upgrade_region_instruction(UpgradeRegion {
351                region_id,
352                replay_timeout: Some(Duration::from_millis(200)),
353                ..Default::default()
354            })
355            .await;
356        assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
357
358        if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
359            assert!(!reply.ready);
360            assert!(reply.exists);
361            assert!(reply.error.is_some());
362            assert!(reply.error.unwrap().contains("mock_error"));
363        }
364    }
365}