datanode/heartbeat/handler/
upgrade_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_error::ext::{BoxedError, ErrorExt};
16use common_error::status_code::StatusCode;
17use common_meta::instruction::{
18    InstructionReply, UpgradeRegion, UpgradeRegionReply, UpgradeRegionsReply,
19};
20use common_telemetry::{debug, info, warn};
21use store_api::region_request::{RegionCatchupRequest, ReplayCheckpoint};
22use store_api::storage::RegionId;
23
24use crate::error::Result;
25use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
26
27#[derive(Debug, Clone, Copy, Default)]
28pub struct UpgradeRegionsHandler {
29    pub upgrade_region_parallelism: usize,
30}
31
32#[cfg(test)]
33impl UpgradeRegionsHandler {
34    fn new_test() -> UpgradeRegionsHandler {
35        UpgradeRegionsHandler {
36            upgrade_region_parallelism: 8,
37        }
38    }
39}
40
41impl UpgradeRegionsHandler {
42    fn convert_responses_to_replies(
43        responses: Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>>,
44        catchup_regions: &[RegionId],
45    ) -> Vec<UpgradeRegionReply> {
46        match responses {
47            Ok(responses) => responses
48                .into_iter()
49                .map(|(region_id, result)| match result {
50                    Ok(()) => UpgradeRegionReply {
51                        region_id,
52                        ready: true,
53                        exists: true,
54                        error: None,
55                    },
56                    Err(err) => {
57                        if err.status_code() == StatusCode::RegionNotFound {
58                            UpgradeRegionReply {
59                                region_id,
60                                ready: false,
61                                exists: false,
62                                error: Some(format!("{err:?}")),
63                            }
64                        } else {
65                            UpgradeRegionReply {
66                                region_id,
67                                ready: false,
68                                exists: true,
69                                error: Some(format!("{err:?}")),
70                            }
71                        }
72                    }
73                })
74                .collect::<Vec<_>>(),
75            Err(err) => catchup_regions
76                .iter()
77                .map(|region_id| UpgradeRegionReply {
78                    region_id: *region_id,
79                    ready: false,
80                    exists: true,
81                    error: Some(format!("{err:?}")),
82                })
83                .collect::<Vec<_>>(),
84        }
85    }
86}
87
88impl UpgradeRegionsHandler {
89    // Handles upgrade regions instruction.
90    //
91    // Returns batch of upgrade region replies, the order of the replies is not guaranteed.
92    async fn handle_upgrade_regions(
93        &self,
94        ctx: &HandlerContext,
95        upgrade_regions: Vec<UpgradeRegion>,
96    ) -> Vec<UpgradeRegionReply> {
97        let num_upgrade_regions = upgrade_regions.len();
98        let mut replies = Vec::with_capacity(num_upgrade_regions);
99        let mut catchup_requests = Vec::with_capacity(num_upgrade_regions);
100        let mut catchup_regions = Vec::with_capacity(num_upgrade_regions);
101        let mut timeout = None;
102
103        for upgrade_region in upgrade_regions {
104            let Some(writable) = ctx.region_server.is_region_leader(upgrade_region.region_id)
105            else {
106                // Region is not found.
107                debug!("Region {} is not found", upgrade_region.region_id);
108                replies.push(UpgradeRegionReply {
109                    region_id: upgrade_region.region_id,
110                    ready: false,
111                    exists: false,
112                    error: None,
113                });
114                continue;
115            };
116
117            // Ignores the catchup requests for writable regions.
118            if writable {
119                warn!(
120                    "Region {} is writable, ignores the catchup request",
121                    upgrade_region.region_id
122                );
123                replies.push(UpgradeRegionReply {
124                    region_id: upgrade_region.region_id,
125                    ready: true,
126                    exists: true,
127                    error: None,
128                });
129            } else {
130                let UpgradeRegion {
131                    last_entry_id,
132                    metadata_last_entry_id,
133                    location_id,
134                    replay_entry_id,
135                    metadata_replay_entry_id,
136                    replay_timeout,
137                    ..
138                } = upgrade_region;
139                match timeout {
140                    Some(timeout) => {
141                        debug_assert_eq!(timeout, replay_timeout);
142                    }
143                    None => {
144                        // TODO(weny): required the replay_timeout.
145                        timeout = Some(replay_timeout);
146                    }
147                }
148
149                let checkpoint = match (replay_entry_id, metadata_replay_entry_id) {
150                    (Some(entry_id), metadata_entry_id) => Some(ReplayCheckpoint {
151                        entry_id,
152                        metadata_entry_id,
153                    }),
154                    _ => None,
155                };
156
157                catchup_regions.push(upgrade_region.region_id);
158                catchup_requests.push((
159                    upgrade_region.region_id,
160                    RegionCatchupRequest {
161                        set_writable: true,
162                        entry_id: last_entry_id,
163                        metadata_entry_id: metadata_last_entry_id,
164                        location_id,
165                        checkpoint,
166                    },
167                ));
168            }
169        }
170
171        let Some(timeout) = timeout else {
172            // No replay timeout, so we don't need to catchup the regions.
173            info!("All regions are writable, no need to catchup");
174            debug_assert_eq!(replies.len(), num_upgrade_regions);
175            return replies;
176        };
177
178        match tokio::time::timeout(
179            timeout,
180            ctx.region_server
181                .handle_batch_catchup_requests(self.upgrade_region_parallelism, catchup_requests),
182        )
183        .await
184        {
185            Ok(responses) => {
186                replies.extend(
187                    Self::convert_responses_to_replies(responses, &catchup_regions).into_iter(),
188                );
189            }
190            Err(_) => {
191                replies.extend(catchup_regions.iter().map(|region_id| UpgradeRegionReply {
192                    region_id: *region_id,
193                    ready: false,
194                    exists: true,
195                    error: None,
196                }));
197            }
198        }
199
200        replies
201    }
202}
203
204#[async_trait::async_trait]
205impl InstructionHandler for UpgradeRegionsHandler {
206    type Instruction = Vec<UpgradeRegion>;
207
208    async fn handle(
209        &self,
210        ctx: &HandlerContext,
211        upgrade_regions: Self::Instruction,
212    ) -> Option<InstructionReply> {
213        let replies = self.handle_upgrade_regions(ctx, upgrade_regions).await;
214
215        Some(InstructionReply::UpgradeRegions(UpgradeRegionsReply::new(
216            replies,
217        )))
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use std::sync::Arc;
224    use std::time::Duration;
225
226    use common_meta::instruction::UpgradeRegion;
227    use common_meta::kv_backend::memory::MemoryKvBackend;
228    use mito2::engine::MITO_ENGINE_NAME;
229    use store_api::region_engine::RegionRole;
230    use store_api::storage::RegionId;
231
232    use crate::error;
233    use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler;
234    use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
235    use crate::tests::{MockRegionEngine, mock_region_server};
236
237    #[tokio::test]
238    async fn test_region_not_exist() {
239        let mut mock_region_server = mock_region_server();
240        let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
241        mock_region_server.register_engine(mock_engine);
242        let kv_backend = Arc::new(MemoryKvBackend::new());
243        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
244
245        let region_id = RegionId::new(1024, 1);
246        let region_id2 = RegionId::new(1024, 2);
247        let replay_timeout = Duration::from_millis(100u64);
248        let reply = UpgradeRegionsHandler::new_test()
249            .handle(
250                &handler_context,
251                vec![
252                    UpgradeRegion {
253                        region_id,
254                        replay_timeout,
255                        ..Default::default()
256                    },
257                    UpgradeRegion {
258                        region_id: region_id2,
259                        replay_timeout,
260                        ..Default::default()
261                    },
262                ],
263            )
264            .await;
265
266        let replies = &reply.unwrap().expect_upgrade_regions_reply();
267        assert_eq!(replies[0].region_id, region_id);
268        assert_eq!(replies[1].region_id, region_id2);
269        for reply in replies {
270            assert!(!reply.exists);
271            assert!(reply.error.is_none());
272        }
273    }
274
275    #[tokio::test]
276    async fn test_region_writable() {
277        let mock_region_server = mock_region_server();
278        let region_id = RegionId::new(1024, 1);
279        let region_id2 = RegionId::new(1024, 2);
280
281        let (mock_engine, _) =
282            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
283                region_engine.mock_role = Some(Some(RegionRole::Leader));
284                region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
285                    // Should be unreachable.
286                    unreachable!();
287                }));
288            });
289        mock_region_server.register_test_region(region_id, mock_engine.clone());
290        mock_region_server.register_test_region(region_id2, mock_engine);
291        let kv_backend = Arc::new(MemoryKvBackend::new());
292        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
293        let replay_timeout = Duration::from_millis(100u64);
294        let reply = UpgradeRegionsHandler::new_test()
295            .handle(
296                &handler_context,
297                vec![
298                    UpgradeRegion {
299                        region_id,
300                        replay_timeout,
301                        ..Default::default()
302                    },
303                    UpgradeRegion {
304                        region_id: region_id2,
305                        replay_timeout,
306                        ..Default::default()
307                    },
308                ],
309            )
310            .await;
311
312        let replies = &reply.unwrap().expect_upgrade_regions_reply();
313        assert_eq!(replies[0].region_id, region_id);
314        assert_eq!(replies[1].region_id, region_id2);
315        for reply in replies {
316            assert!(reply.ready);
317            assert!(reply.exists);
318            assert!(reply.error.is_none());
319        }
320    }
321
322    #[tokio::test]
323    async fn test_region_not_ready() {
324        let mock_region_server = mock_region_server();
325        let region_id = RegionId::new(1024, 1);
326
327        let (mock_engine, _) =
328            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
329                // Region is not ready.
330                region_engine.mock_role = Some(Some(RegionRole::Follower));
331                region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
332                // Note: Don't change.
333                region_engine.handle_request_delay = Some(Duration::from_secs(100));
334            });
335        mock_region_server.register_test_region(region_id, mock_engine);
336        let kv_backend = Arc::new(MemoryKvBackend::new());
337        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
338        let replay_timeout = Duration::from_millis(100u64);
339        let reply = UpgradeRegionsHandler::new_test()
340            .handle(
341                &handler_context,
342                vec![UpgradeRegion {
343                    region_id,
344                    replay_timeout,
345                    ..Default::default()
346                }],
347            )
348            .await;
349
350        let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
351        assert!(!reply.ready);
352        assert!(reply.exists);
353        assert!(reply.error.is_none(), "error: {:?}", reply.error);
354    }
355
356    #[tokio::test]
357    async fn test_region_not_ready_with_retry() {
358        common_telemetry::init_default_ut_logging();
359        let mock_region_server = mock_region_server();
360        let region_id = RegionId::new(1024, 1);
361
362        let (mock_engine, _) =
363            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
364                // Region is not ready.
365                region_engine.mock_role = Some(Some(RegionRole::Follower));
366                region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
367                region_engine.handle_request_delay = Some(Duration::from_millis(300));
368            });
369        mock_region_server.register_test_region(region_id, mock_engine);
370        let waits = vec![Duration::from_millis(100u64), Duration::from_millis(100u64)];
371        let kv_backend = Arc::new(MemoryKvBackend::new());
372        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
373        for replay_timeout in waits {
374            let reply = UpgradeRegionsHandler::new_test()
375                .handle(
376                    &handler_context,
377                    vec![UpgradeRegion {
378                        region_id,
379                        replay_timeout,
380                        ..Default::default()
381                    }],
382                )
383                .await;
384
385            let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
386            assert!(!reply.ready);
387            assert!(reply.exists);
388            assert!(reply.error.is_none(), "error: {:?}", reply.error);
389        }
390
391        let reply = UpgradeRegionsHandler::new_test()
392            .handle(
393                &handler_context,
394                vec![UpgradeRegion {
395                    region_id,
396                    replay_timeout: Duration::from_millis(500),
397                    ..Default::default()
398                }],
399            )
400            .await;
401        let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
402        assert!(reply.ready);
403        assert!(reply.exists);
404        assert!(reply.error.is_none(), "error: {:?}", reply.error);
405    }
406
407    #[tokio::test]
408    async fn test_region_error() {
409        common_telemetry::init_default_ut_logging();
410        let mock_region_server = mock_region_server();
411        let region_id = RegionId::new(1024, 1);
412
413        let (mock_engine, _) =
414            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
415                // Region is not ready.
416                region_engine.mock_role = Some(Some(RegionRole::Follower));
417                region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
418                    error::UnexpectedSnafu {
419                        violated: "mock_error".to_string(),
420                    }
421                    .fail()
422                }));
423                // Note: Don't change.
424                region_engine.handle_request_delay = Some(Duration::from_millis(100));
425            });
426        mock_region_server.register_test_region(region_id, mock_engine);
427        let kv_backend = Arc::new(MemoryKvBackend::new());
428        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
429        let reply = UpgradeRegionsHandler::new_test()
430            .handle(
431                &handler_context,
432                vec![UpgradeRegion {
433                    region_id,
434                    ..Default::default()
435                }],
436            )
437            .await;
438
439        // It didn't wait for handle returns; it had no idea about the error.
440        let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
441        assert!(!reply.ready);
442        assert!(reply.exists);
443        assert!(reply.error.is_none());
444
445        let reply = UpgradeRegionsHandler::new_test()
446            .handle(
447                &handler_context,
448                vec![UpgradeRegion {
449                    region_id,
450                    replay_timeout: Duration::from_millis(200),
451                    ..Default::default()
452                }],
453            )
454            .await;
455
456        let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
457        assert!(!reply.ready);
458        assert!(reply.exists);
459        assert!(reply.error.is_some());
460        assert!(reply.error.as_ref().unwrap().contains("mock_error"));
461    }
462}