datanode/heartbeat/handler/
upgrade_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_error::ext::{BoxedError, ErrorExt};
16use common_error::status_code::StatusCode;
17use common_meta::instruction::{
18    InstructionReply, UpgradeRegion, UpgradeRegionReply, UpgradeRegionsReply,
19};
20use common_telemetry::{debug, info, warn};
21use store_api::region_request::{RegionCatchupRequest, ReplayCheckpoint};
22use store_api::storage::RegionId;
23
24use crate::error::Result;
25use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
26
27#[derive(Debug, Clone, Copy, Default)]
28pub struct UpgradeRegionsHandler {
29    pub upgrade_region_parallelism: usize,
30}
31
32#[cfg(test)]
33impl UpgradeRegionsHandler {
34    fn new_test() -> UpgradeRegionsHandler {
35        UpgradeRegionsHandler {
36            upgrade_region_parallelism: 8,
37        }
38    }
39}
40
41impl UpgradeRegionsHandler {
42    fn convert_responses_to_replies(
43        responses: Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>>,
44        catchup_regions: &[RegionId],
45    ) -> Vec<UpgradeRegionReply> {
46        match responses {
47            Ok(responses) => responses
48                .into_iter()
49                .map(|(region_id, result)| match result {
50                    Ok(()) => UpgradeRegionReply {
51                        region_id,
52                        ready: true,
53                        exists: true,
54                        error: None,
55                    },
56                    Err(err) => {
57                        if err.status_code() == StatusCode::RegionNotFound {
58                            UpgradeRegionReply {
59                                region_id,
60                                ready: false,
61                                exists: false,
62                                error: Some(format!("{err:?}")),
63                            }
64                        } else {
65                            UpgradeRegionReply {
66                                region_id,
67                                ready: false,
68                                exists: true,
69                                error: Some(format!("{err:?}")),
70                            }
71                        }
72                    }
73                })
74                .collect::<Vec<_>>(),
75            Err(err) => catchup_regions
76                .iter()
77                .map(|region_id| UpgradeRegionReply {
78                    region_id: *region_id,
79                    ready: false,
80                    exists: true,
81                    error: Some(format!("{err:?}")),
82                })
83                .collect::<Vec<_>>(),
84        }
85    }
86}
87
88impl UpgradeRegionsHandler {
89    // Handles upgrade regions instruction.
90    //
91    // Returns batch of upgrade region replies, the order of the replies is not guaranteed.
92    async fn handle_upgrade_regions(
93        &self,
94        ctx: &HandlerContext,
95        upgrade_regions: Vec<UpgradeRegion>,
96    ) -> Vec<UpgradeRegionReply> {
97        let num_upgrade_regions = upgrade_regions.len();
98        let mut replies = Vec::with_capacity(num_upgrade_regions);
99        let mut catchup_requests = Vec::with_capacity(num_upgrade_regions);
100        let mut catchup_regions = Vec::with_capacity(num_upgrade_regions);
101        let mut timeout = None;
102
103        for upgrade_region in upgrade_regions {
104            let Some(writable) = ctx.region_server.is_region_leader(upgrade_region.region_id)
105            else {
106                // Region is not found.
107                debug!("Region {} is not found", upgrade_region.region_id);
108                replies.push(UpgradeRegionReply {
109                    region_id: upgrade_region.region_id,
110                    ready: false,
111                    exists: false,
112                    error: None,
113                });
114                continue;
115            };
116
117            // Ignores the catchup requests for writable regions.
118            if writable {
119                warn!(
120                    "Region {} is writable, ignores the catchup request",
121                    upgrade_region.region_id
122                );
123                replies.push(UpgradeRegionReply {
124                    region_id: upgrade_region.region_id,
125                    ready: true,
126                    exists: true,
127                    error: None,
128                });
129            } else {
130                let UpgradeRegion {
131                    last_entry_id,
132                    metadata_last_entry_id,
133                    location_id,
134                    replay_entry_id,
135                    metadata_replay_entry_id,
136                    replay_timeout,
137                    ..
138                } = upgrade_region;
139                match timeout {
140                    Some(timeout) => {
141                        debug_assert_eq!(timeout, replay_timeout);
142                    }
143                    None => {
144                        // TODO(weny): required the replay_timeout.
145                        timeout = Some(replay_timeout);
146                    }
147                }
148
149                let checkpoint = match (replay_entry_id, metadata_replay_entry_id) {
150                    (Some(entry_id), metadata_entry_id) => Some(ReplayCheckpoint {
151                        entry_id,
152                        metadata_entry_id,
153                    }),
154                    _ => None,
155                };
156
157                catchup_regions.push(upgrade_region.region_id);
158                catchup_requests.push((
159                    upgrade_region.region_id,
160                    RegionCatchupRequest {
161                        set_writable: true,
162                        entry_id: last_entry_id,
163                        metadata_entry_id: metadata_last_entry_id,
164                        location_id,
165                        checkpoint,
166                    },
167                ));
168            }
169        }
170
171        let Some(timeout) = timeout else {
172            // No replay timeout, so we don't need to catchup the regions.
173            info!("All regions are writable, no need to catchup");
174            debug_assert_eq!(replies.len(), num_upgrade_regions);
175            return replies;
176        };
177
178        match tokio::time::timeout(
179            timeout,
180            ctx.region_server
181                .handle_batch_catchup_requests(self.upgrade_region_parallelism, catchup_requests),
182        )
183        .await
184        {
185            Ok(responses) => {
186                replies.extend(
187                    Self::convert_responses_to_replies(responses, &catchup_regions).into_iter(),
188                );
189            }
190            Err(_) => {
191                replies.extend(catchup_regions.iter().map(|region_id| UpgradeRegionReply {
192                    region_id: *region_id,
193                    ready: false,
194                    exists: true,
195                    error: None,
196                }));
197            }
198        }
199
200        replies
201    }
202}
203
204#[async_trait::async_trait]
205impl InstructionHandler for UpgradeRegionsHandler {
206    type Instruction = Vec<UpgradeRegion>;
207
208    async fn handle(
209        &self,
210        ctx: &HandlerContext,
211        upgrade_regions: Self::Instruction,
212    ) -> Option<InstructionReply> {
213        let replies = self.handle_upgrade_regions(ctx, upgrade_regions).await;
214
215        Some(InstructionReply::UpgradeRegions(UpgradeRegionsReply::new(
216            replies,
217        )))
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use std::time::Duration;
224
225    use common_meta::instruction::UpgradeRegion;
226    use mito2::engine::MITO_ENGINE_NAME;
227    use store_api::region_engine::RegionRole;
228    use store_api::storage::RegionId;
229
230    use crate::error;
231    use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler;
232    use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
233    use crate::tests::{MockRegionEngine, mock_region_server};
234
235    #[tokio::test]
236    async fn test_region_not_exist() {
237        let mut mock_region_server = mock_region_server();
238        let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
239        mock_region_server.register_engine(mock_engine);
240
241        let handler_context = HandlerContext::new_for_test(mock_region_server);
242
243        let region_id = RegionId::new(1024, 1);
244        let region_id2 = RegionId::new(1024, 2);
245        let replay_timeout = Duration::from_millis(100u64);
246        let reply = UpgradeRegionsHandler::new_test()
247            .handle(
248                &handler_context,
249                vec![
250                    UpgradeRegion {
251                        region_id,
252                        replay_timeout,
253                        ..Default::default()
254                    },
255                    UpgradeRegion {
256                        region_id: region_id2,
257                        replay_timeout,
258                        ..Default::default()
259                    },
260                ],
261            )
262            .await;
263
264        let replies = &reply.unwrap().expect_upgrade_regions_reply();
265        assert_eq!(replies[0].region_id, region_id);
266        assert_eq!(replies[1].region_id, region_id2);
267        for reply in replies {
268            assert!(!reply.exists);
269            assert!(reply.error.is_none());
270        }
271    }
272
273    #[tokio::test]
274    async fn test_region_writable() {
275        let mock_region_server = mock_region_server();
276        let region_id = RegionId::new(1024, 1);
277        let region_id2 = RegionId::new(1024, 2);
278
279        let (mock_engine, _) =
280            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
281                region_engine.mock_role = Some(Some(RegionRole::Leader));
282                region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
283                    // Should be unreachable.
284                    unreachable!();
285                }));
286            });
287        mock_region_server.register_test_region(region_id, mock_engine.clone());
288        mock_region_server.register_test_region(region_id2, mock_engine);
289        let handler_context = HandlerContext::new_for_test(mock_region_server);
290        let replay_timeout = Duration::from_millis(100u64);
291        let reply = UpgradeRegionsHandler::new_test()
292            .handle(
293                &handler_context,
294                vec![
295                    UpgradeRegion {
296                        region_id,
297                        replay_timeout,
298                        ..Default::default()
299                    },
300                    UpgradeRegion {
301                        region_id: region_id2,
302                        replay_timeout,
303                        ..Default::default()
304                    },
305                ],
306            )
307            .await;
308
309        let replies = &reply.unwrap().expect_upgrade_regions_reply();
310        assert_eq!(replies[0].region_id, region_id);
311        assert_eq!(replies[1].region_id, region_id2);
312        for reply in replies {
313            assert!(reply.ready);
314            assert!(reply.exists);
315            assert!(reply.error.is_none());
316        }
317    }
318
319    #[tokio::test]
320    async fn test_region_not_ready() {
321        let mock_region_server = mock_region_server();
322        let region_id = RegionId::new(1024, 1);
323
324        let (mock_engine, _) =
325            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
326                // Region is not ready.
327                region_engine.mock_role = Some(Some(RegionRole::Follower));
328                region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
329                // Note: Don't change.
330                region_engine.handle_request_delay = Some(Duration::from_secs(100));
331            });
332        mock_region_server.register_test_region(region_id, mock_engine);
333
334        let handler_context = HandlerContext::new_for_test(mock_region_server);
335        let replay_timeout = Duration::from_millis(100u64);
336        let reply = UpgradeRegionsHandler::new_test()
337            .handle(
338                &handler_context,
339                vec![UpgradeRegion {
340                    region_id,
341                    replay_timeout,
342                    ..Default::default()
343                }],
344            )
345            .await;
346
347        let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
348        assert!(!reply.ready);
349        assert!(reply.exists);
350        assert!(reply.error.is_none(), "error: {:?}", reply.error);
351    }
352
353    #[tokio::test]
354    async fn test_region_not_ready_with_retry() {
355        common_telemetry::init_default_ut_logging();
356        let mock_region_server = mock_region_server();
357        let region_id = RegionId::new(1024, 1);
358
359        let (mock_engine, _) =
360            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
361                // Region is not ready.
362                region_engine.mock_role = Some(Some(RegionRole::Follower));
363                region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
364                region_engine.handle_request_delay = Some(Duration::from_millis(300));
365            });
366        mock_region_server.register_test_region(region_id, mock_engine);
367        let waits = vec![Duration::from_millis(100u64), Duration::from_millis(100u64)];
368        let handler_context = HandlerContext::new_for_test(mock_region_server);
369        for replay_timeout in waits {
370            let reply = UpgradeRegionsHandler::new_test()
371                .handle(
372                    &handler_context,
373                    vec![UpgradeRegion {
374                        region_id,
375                        replay_timeout,
376                        ..Default::default()
377                    }],
378                )
379                .await;
380
381            let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
382            assert!(!reply.ready);
383            assert!(reply.exists);
384            assert!(reply.error.is_none(), "error: {:?}", reply.error);
385        }
386
387        let reply = UpgradeRegionsHandler::new_test()
388            .handle(
389                &handler_context,
390                vec![UpgradeRegion {
391                    region_id,
392                    replay_timeout: Duration::from_millis(500),
393                    ..Default::default()
394                }],
395            )
396            .await;
397        let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
398        assert!(reply.ready);
399        assert!(reply.exists);
400        assert!(reply.error.is_none(), "error: {:?}", reply.error);
401    }
402
403    #[tokio::test]
404    async fn test_region_error() {
405        common_telemetry::init_default_ut_logging();
406        let mock_region_server = mock_region_server();
407        let region_id = RegionId::new(1024, 1);
408
409        let (mock_engine, _) =
410            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
411                // Region is not ready.
412                region_engine.mock_role = Some(Some(RegionRole::Follower));
413                region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
414                    error::UnexpectedSnafu {
415                        violated: "mock_error".to_string(),
416                    }
417                    .fail()
418                }));
419                // Note: Don't change.
420                region_engine.handle_request_delay = Some(Duration::from_millis(100));
421            });
422        mock_region_server.register_test_region(region_id, mock_engine);
423
424        let handler_context = HandlerContext::new_for_test(mock_region_server);
425        let reply = UpgradeRegionsHandler::new_test()
426            .handle(
427                &handler_context,
428                vec![UpgradeRegion {
429                    region_id,
430                    ..Default::default()
431                }],
432            )
433            .await;
434
435        // It didn't wait for handle returns; it had no idea about the error.
436        let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
437        assert!(!reply.ready);
438        assert!(reply.exists);
439        assert!(reply.error.is_none());
440
441        let reply = UpgradeRegionsHandler::new_test()
442            .handle(
443                &handler_context,
444                vec![UpgradeRegion {
445                    region_id,
446                    replay_timeout: Duration::from_millis(200),
447                    ..Default::default()
448                }],
449            )
450            .await;
451
452        let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
453        assert!(!reply.ready);
454        assert!(reply.exists);
455        assert!(reply.error.is_some());
456        assert!(reply.error.as_ref().unwrap().contains("mock_error"));
457    }
458}