Skip to main content

datanode/heartbeat/handler/
upgrade_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_error::ext::{BoxedError, ErrorExt};
16use common_error::status_code::StatusCode;
17use common_meta::instruction::{
18    InstructionReply, UpgradeRegion, UpgradeRegionReply, UpgradeRegionsReply,
19};
20use common_telemetry::{debug, info, warn};
21use store_api::region_request::{RegionCatchupRequest, ReplayCheckpoint};
22use store_api::storage::RegionId;
23
24use crate::error::Result;
25use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
26
27#[derive(Debug, Clone, Copy, Default)]
28pub struct UpgradeRegionsHandler {
29    pub upgrade_region_parallelism: usize,
30}
31
32#[cfg(test)]
33impl UpgradeRegionsHandler {
34    fn new_test() -> UpgradeRegionsHandler {
35        UpgradeRegionsHandler {
36            upgrade_region_parallelism: 8,
37        }
38    }
39}
40
41impl UpgradeRegionsHandler {
42    fn convert_responses_to_replies(
43        responses: Result<Vec<(RegionId, std::result::Result<(), BoxedError>)>>,
44        catchup_regions: &[RegionId],
45    ) -> Vec<UpgradeRegionReply> {
46        match responses {
47            Ok(responses) => responses
48                .into_iter()
49                .map(|(region_id, result)| match result {
50                    Ok(()) => UpgradeRegionReply {
51                        region_id,
52                        ready: true,
53                        exists: true,
54                        error: None,
55                    },
56                    Err(err) => {
57                        if err.status_code() == StatusCode::RegionNotFound {
58                            UpgradeRegionReply {
59                                region_id,
60                                ready: false,
61                                exists: false,
62                                error: Some(format!("{err:?}")),
63                            }
64                        } else {
65                            UpgradeRegionReply {
66                                region_id,
67                                ready: false,
68                                exists: true,
69                                error: Some(format!("{err:?}")),
70                            }
71                        }
72                    }
73                })
74                .collect::<Vec<_>>(),
75            Err(err) => catchup_regions
76                .iter()
77                .map(|region_id| UpgradeRegionReply {
78                    region_id: *region_id,
79                    ready: false,
80                    exists: true,
81                    error: Some(format!("{err:?}")),
82                })
83                .collect::<Vec<_>>(),
84        }
85    }
86}
87
88impl UpgradeRegionsHandler {
89    // Handles upgrade regions instruction.
90    //
91    // Returns batch of upgrade region replies, the order of the replies is not guaranteed.
92    async fn handle_upgrade_regions(
93        &self,
94        ctx: &HandlerContext,
95        upgrade_regions: Vec<UpgradeRegion>,
96    ) -> Vec<UpgradeRegionReply> {
97        let num_upgrade_regions = upgrade_regions.len();
98        let mut replies = Vec::with_capacity(num_upgrade_regions);
99        let mut catchup_requests = Vec::with_capacity(num_upgrade_regions);
100        let mut catchup_regions = Vec::with_capacity(num_upgrade_regions);
101        let mut timeout = None;
102
103        for upgrade_region in upgrade_regions {
104            let Some(writable) = ctx.region_server.is_region_leader(upgrade_region.region_id)
105            else {
106                // Region is not found.
107                debug!("Region {} is not found", upgrade_region.region_id);
108                replies.push(UpgradeRegionReply {
109                    region_id: upgrade_region.region_id,
110                    ready: false,
111                    exists: false,
112                    error: None,
113                });
114                continue;
115            };
116
117            // Ignores the catchup requests for writable regions.
118            if writable {
119                warn!(
120                    "Region {} is writable, ignores the catchup request",
121                    upgrade_region.region_id
122                );
123                replies.push(UpgradeRegionReply {
124                    region_id: upgrade_region.region_id,
125                    ready: true,
126                    exists: true,
127                    error: None,
128                });
129            } else {
130                let UpgradeRegion {
131                    last_entry_id,
132                    metadata_last_entry_id,
133                    location_id,
134                    replay_entry_id,
135                    metadata_replay_entry_id,
136                    replay_timeout,
137                    ..
138                } = upgrade_region;
139                match timeout {
140                    Some(timeout) => {
141                        debug_assert_eq!(timeout, replay_timeout);
142                    }
143                    None => {
144                        // TODO(weny): required the replay_timeout.
145                        timeout = Some(replay_timeout);
146                    }
147                }
148
149                let checkpoint = match (replay_entry_id, metadata_replay_entry_id) {
150                    (Some(entry_id), metadata_entry_id) => Some(ReplayCheckpoint {
151                        entry_id,
152                        metadata_entry_id,
153                    }),
154                    _ => None,
155                };
156
157                catchup_regions.push(upgrade_region.region_id);
158                catchup_requests.push((
159                    upgrade_region.region_id,
160                    RegionCatchupRequest {
161                        set_writable: true,
162                        entry_id: last_entry_id,
163                        metadata_entry_id: metadata_last_entry_id,
164                        location_id,
165                        checkpoint,
166                    },
167                ));
168            }
169        }
170
171        let Some(timeout) = timeout else {
172            // No replay timeout, so we don't need to catchup the regions.
173            info!("All regions are writable, no need to catchup");
174            debug_assert_eq!(replies.len(), num_upgrade_regions);
175            return replies;
176        };
177
178        match tokio::time::timeout(
179            timeout,
180            ctx.region_server
181                .handle_batch_catchup_requests(self.upgrade_region_parallelism, catchup_requests),
182        )
183        .await
184        {
185            Ok(responses) => {
186                replies.extend(Self::convert_responses_to_replies(
187                    responses,
188                    &catchup_regions,
189                ));
190            }
191            Err(_) => {
192                replies.extend(catchup_regions.iter().map(|region_id| UpgradeRegionReply {
193                    region_id: *region_id,
194                    ready: false,
195                    exists: true,
196                    error: None,
197                }));
198            }
199        }
200
201        replies
202    }
203}
204
205#[async_trait::async_trait]
206impl InstructionHandler for UpgradeRegionsHandler {
207    type Instruction = Vec<UpgradeRegion>;
208
209    async fn handle(
210        &self,
211        ctx: &HandlerContext,
212        upgrade_regions: Self::Instruction,
213    ) -> Option<InstructionReply> {
214        let replies = self.handle_upgrade_regions(ctx, upgrade_regions).await;
215
216        Some(InstructionReply::UpgradeRegions(UpgradeRegionsReply::new(
217            replies,
218        )))
219    }
220}
221
222#[cfg(test)]
223mod tests {
224    use std::sync::Arc;
225    use std::time::Duration;
226
227    use common_meta::instruction::UpgradeRegion;
228    use common_meta::kv_backend::memory::MemoryKvBackend;
229    use mito2::engine::MITO_ENGINE_NAME;
230    use store_api::region_engine::RegionRole;
231    use store_api::storage::RegionId;
232
233    use crate::error;
234    use crate::heartbeat::handler::upgrade_region::UpgradeRegionsHandler;
235    use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
236    use crate::tests::{MockRegionEngine, mock_region_server};
237
238    #[tokio::test]
239    async fn test_region_not_exist() {
240        let mut mock_region_server = mock_region_server();
241        let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
242        mock_region_server.register_engine(mock_engine);
243        let kv_backend = Arc::new(MemoryKvBackend::new());
244        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
245
246        let region_id = RegionId::new(1024, 1);
247        let region_id2 = RegionId::new(1024, 2);
248        let replay_timeout = Duration::from_millis(100u64);
249        let reply = UpgradeRegionsHandler::new_test()
250            .handle(
251                &handler_context,
252                vec![
253                    UpgradeRegion {
254                        region_id,
255                        replay_timeout,
256                        ..Default::default()
257                    },
258                    UpgradeRegion {
259                        region_id: region_id2,
260                        replay_timeout,
261                        ..Default::default()
262                    },
263                ],
264            )
265            .await;
266
267        let replies = &reply.unwrap().expect_upgrade_regions_reply();
268        assert_eq!(replies[0].region_id, region_id);
269        assert_eq!(replies[1].region_id, region_id2);
270        for reply in replies {
271            assert!(!reply.exists);
272            assert!(reply.error.is_none());
273        }
274    }
275
276    #[tokio::test]
277    async fn test_region_writable() {
278        let mock_region_server = mock_region_server();
279        let region_id = RegionId::new(1024, 1);
280        let region_id2 = RegionId::new(1024, 2);
281
282        let (mock_engine, _) =
283            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
284                region_engine.mock_role = Some(Some(RegionRole::Leader));
285                region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
286                    // Should be unreachable.
287                    unreachable!();
288                }));
289            });
290        mock_region_server.register_test_region(region_id, mock_engine.clone());
291        mock_region_server.register_test_region(region_id2, mock_engine);
292        let kv_backend = Arc::new(MemoryKvBackend::new());
293        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
294        let replay_timeout = Duration::from_millis(100u64);
295        let reply = UpgradeRegionsHandler::new_test()
296            .handle(
297                &handler_context,
298                vec![
299                    UpgradeRegion {
300                        region_id,
301                        replay_timeout,
302                        ..Default::default()
303                    },
304                    UpgradeRegion {
305                        region_id: region_id2,
306                        replay_timeout,
307                        ..Default::default()
308                    },
309                ],
310            )
311            .await;
312
313        let replies = &reply.unwrap().expect_upgrade_regions_reply();
314        assert_eq!(replies[0].region_id, region_id);
315        assert_eq!(replies[1].region_id, region_id2);
316        for reply in replies {
317            assert!(reply.ready);
318            assert!(reply.exists);
319            assert!(reply.error.is_none());
320        }
321    }
322
323    #[tokio::test]
324    async fn test_region_not_ready() {
325        let mock_region_server = mock_region_server();
326        let region_id = RegionId::new(1024, 1);
327
328        let (mock_engine, _) =
329            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
330                // Region is not ready.
331                region_engine.mock_role = Some(Some(RegionRole::Follower));
332                region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
333                // Note: Don't change.
334                region_engine.handle_request_delay = Some(Duration::from_secs(100));
335            });
336        mock_region_server.register_test_region(region_id, mock_engine);
337        let kv_backend = Arc::new(MemoryKvBackend::new());
338        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
339        let replay_timeout = Duration::from_millis(100u64);
340        let reply = UpgradeRegionsHandler::new_test()
341            .handle(
342                &handler_context,
343                vec![UpgradeRegion {
344                    region_id,
345                    replay_timeout,
346                    ..Default::default()
347                }],
348            )
349            .await;
350
351        let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
352        assert!(!reply.ready);
353        assert!(reply.exists);
354        assert!(reply.error.is_none(), "error: {:?}", reply.error);
355    }
356
357    #[tokio::test]
358    async fn test_region_not_ready_with_retry() {
359        common_telemetry::init_default_ut_logging();
360        let mock_region_server = mock_region_server();
361        let region_id = RegionId::new(1024, 1);
362
363        let (mock_engine, _) =
364            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
365                // Region is not ready.
366                region_engine.mock_role = Some(Some(RegionRole::Follower));
367                region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
368                region_engine.handle_request_delay = Some(Duration::from_millis(300));
369            });
370        mock_region_server.register_test_region(region_id, mock_engine);
371        let waits = vec![Duration::from_millis(100u64), Duration::from_millis(100u64)];
372        let kv_backend = Arc::new(MemoryKvBackend::new());
373        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
374        for replay_timeout in waits {
375            let reply = UpgradeRegionsHandler::new_test()
376                .handle(
377                    &handler_context,
378                    vec![UpgradeRegion {
379                        region_id,
380                        replay_timeout,
381                        ..Default::default()
382                    }],
383                )
384                .await;
385
386            let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
387            assert!(!reply.ready);
388            assert!(reply.exists);
389            assert!(reply.error.is_none(), "error: {:?}", reply.error);
390        }
391
392        let reply = UpgradeRegionsHandler::new_test()
393            .handle(
394                &handler_context,
395                vec![UpgradeRegion {
396                    region_id,
397                    replay_timeout: Duration::from_millis(500),
398                    ..Default::default()
399                }],
400            )
401            .await;
402        let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
403        assert!(reply.ready);
404        assert!(reply.exists);
405        assert!(reply.error.is_none(), "error: {:?}", reply.error);
406    }
407
408    #[tokio::test]
409    async fn test_region_error() {
410        common_telemetry::init_default_ut_logging();
411        let mock_region_server = mock_region_server();
412        let region_id = RegionId::new(1024, 1);
413
414        let (mock_engine, _) =
415            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
416                // Region is not ready.
417                region_engine.mock_role = Some(Some(RegionRole::Follower));
418                region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
419                    error::UnexpectedSnafu {
420                        violated: "mock_error".to_string(),
421                    }
422                    .fail()
423                }));
424                // Note: Don't change.
425                region_engine.handle_request_delay = Some(Duration::from_millis(100));
426            });
427        mock_region_server.register_test_region(region_id, mock_engine);
428        let kv_backend = Arc::new(MemoryKvBackend::new());
429        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
430        let reply = UpgradeRegionsHandler::new_test()
431            .handle(
432                &handler_context,
433                vec![UpgradeRegion {
434                    region_id,
435                    ..Default::default()
436                }],
437            )
438            .await;
439
440        // It didn't wait for handle returns; it had no idea about the error.
441        let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
442        assert!(!reply.ready);
443        assert!(reply.exists);
444        assert!(reply.error.is_none());
445
446        let reply = UpgradeRegionsHandler::new_test()
447            .handle(
448                &handler_context,
449                vec![UpgradeRegion {
450                    region_id,
451                    replay_timeout: Duration::from_millis(200),
452                    ..Default::default()
453                }],
454            )
455            .await;
456
457        let reply = &reply.unwrap().expect_upgrade_regions_reply()[0];
458        assert!(!reply.ready);
459        assert!(reply.exists);
460        assert!(reply.error.is_some());
461        assert!(reply.error.as_ref().unwrap().contains("mock_error"));
462    }
463}