datanode/heartbeat/handler/
remap_manifest.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_meta::instruction::{InstructionReply, RemapManifest, RemapManifestReply};
16use common_telemetry::{error, info, warn};
17use store_api::region_engine::RemapManifestsRequest;
18
19use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
20
21pub struct RemapManifestHandler;
22
23#[async_trait::async_trait]
24impl InstructionHandler for RemapManifestHandler {
25    type Instruction = RemapManifest;
26    async fn handle(
27        &self,
28        ctx: &HandlerContext,
29        request: Self::Instruction,
30    ) -> Option<InstructionReply> {
31        let RemapManifest {
32            region_id,
33            input_regions,
34            region_mapping,
35            new_partition_exprs,
36        } = request;
37        info!(
38            "Datanode received remap manifest request, region_id: {}, input_regions: {}, target_regions: {}",
39            region_id,
40            input_regions.len(),
41            new_partition_exprs.len()
42        );
43        let Some(leader) = ctx.region_server.is_region_leader(region_id) else {
44            warn!("Region: {} is not found", region_id);
45            return Some(InstructionReply::RemapManifest(RemapManifestReply {
46                exists: false,
47                manifest_paths: Default::default(),
48                error: None,
49            }));
50        };
51
52        if !leader {
53            warn!("Region: {} is not leader", region_id);
54            return Some(InstructionReply::RemapManifest(RemapManifestReply {
55                exists: true,
56                manifest_paths: Default::default(),
57                error: Some("Region is not leader".into()),
58            }));
59        }
60
61        let reply = match ctx
62            .region_server
63            .remap_manifests(RemapManifestsRequest {
64                region_id,
65                input_regions,
66                region_mapping,
67                new_partition_exprs,
68            })
69            .await
70        {
71            Ok(result) => InstructionReply::RemapManifest(RemapManifestReply {
72                exists: true,
73                manifest_paths: result.manifest_paths,
74                error: None,
75            }),
76            Err(e) => {
77                error!(
78                    e;
79                    "Remap manifests failed on datanode, region_id: {}",
80                    region_id
81                );
82                InstructionReply::RemapManifest(RemapManifestReply {
83                    exists: true,
84                    manifest_paths: Default::default(),
85                    error: Some(format!("{e:?}")),
86                })
87            }
88        };
89
90        Some(reply)
91    }
92}
93
94#[cfg(test)]
95mod tests {
96    use std::collections::HashMap;
97    use std::sync::Arc;
98
99    use common_meta::instruction::RemapManifest;
100    use datatypes::value::Value;
101    use mito2::config::MitoConfig;
102    use mito2::engine::MITO_ENGINE_NAME;
103    use mito2::test_util::{CreateRequestBuilder, TestEnv};
104    use partition::expr::{PartitionExpr, col};
105    use store_api::path_utils::table_dir;
106    use store_api::region_engine::RegionRole;
107    use store_api::region_request::{EnterStagingRequest, RegionRequest};
108    use store_api::storage::RegionId;
109
110    use crate::heartbeat::handler::remap_manifest::RemapManifestHandler;
111    use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
112    use crate::region_server::RegionServer;
113    use crate::tests::{MockRegionEngine, mock_region_server};
114
115    #[tokio::test]
116    async fn test_region_not_exist() {
117        let mut mock_region_server = mock_region_server();
118        let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
119        mock_region_server.register_engine(mock_engine);
120        let handler_context = HandlerContext::new_for_test(mock_region_server);
121        let region_id = RegionId::new(1024, 1);
122        let reply = RemapManifestHandler
123            .handle(
124                &handler_context,
125                RemapManifest {
126                    region_id,
127                    input_regions: vec![],
128                    region_mapping: HashMap::new(),
129                    new_partition_exprs: HashMap::new(),
130                },
131            )
132            .await
133            .unwrap();
134        let reply = &reply.expect_remap_manifest_reply();
135        assert!(!reply.exists);
136        assert!(reply.error.is_none());
137        assert!(reply.manifest_paths.is_empty());
138    }
139
140    #[tokio::test]
141    async fn test_region_not_leader() {
142        let mock_region_server = mock_region_server();
143        let region_id = RegionId::new(1024, 1);
144        let (mock_engine, _) =
145            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
146                region_engine.mock_role = Some(Some(RegionRole::Follower));
147                region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
148            });
149        mock_region_server.register_test_region(region_id, mock_engine);
150        let handler_context = HandlerContext::new_for_test(mock_region_server);
151        let reply = RemapManifestHandler
152            .handle(
153                &handler_context,
154                RemapManifest {
155                    region_id,
156                    input_regions: vec![],
157                    region_mapping: HashMap::new(),
158                    new_partition_exprs: HashMap::new(),
159                },
160            )
161            .await
162            .unwrap();
163        let reply = reply.expect_remap_manifest_reply();
164        assert!(reply.exists);
165        assert!(reply.error.is_some());
166    }
167
168    fn range_expr(col_name: &str, start: i64, end: i64) -> PartitionExpr {
169        col(col_name)
170            .gt_eq(Value::Int64(start))
171            .and(col(col_name).lt(Value::Int64(end)))
172    }
173
174    async fn prepare_region(region_server: &RegionServer) {
175        let region_specs = [
176            (RegionId::new(1024, 1), range_expr("x", 0, 50)),
177            (RegionId::new(1024, 2), range_expr("x", 50, 100)),
178        ];
179
180        for (region_id, partition_expr) in region_specs {
181            let builder = CreateRequestBuilder::new();
182            let mut create_req = builder.build();
183            create_req.table_dir = table_dir("test", 1024);
184            region_server
185                .handle_request(region_id, RegionRequest::Create(create_req))
186                .await
187                .unwrap();
188            region_server
189                .handle_request(
190                    region_id,
191                    RegionRequest::EnterStaging(EnterStagingRequest {
192                        partition_expr: partition_expr.as_json_str().unwrap(),
193                    }),
194                )
195                .await
196                .unwrap();
197        }
198    }
199
200    #[tokio::test]
201    async fn test_remap_manifest() {
202        common_telemetry::init_default_ut_logging();
203        let mut region_server = mock_region_server();
204        let region_id = RegionId::new(1024, 1);
205        let mut engine_env = TestEnv::new().await;
206        let engine = engine_env.create_engine(MitoConfig::default()).await;
207        region_server.register_engine(Arc::new(engine.clone()));
208        prepare_region(&region_server).await;
209
210        let handler_context = HandlerContext::new_for_test(region_server);
211        let region_id2 = RegionId::new(1024, 2);
212        let reply = RemapManifestHandler
213            .handle(
214                &handler_context,
215                RemapManifest {
216                    region_id,
217                    input_regions: vec![region_id, region_id2],
218                    region_mapping: HashMap::from([
219                        (region_id, vec![region_id]),
220                        (region_id2, vec![region_id]),
221                    ]),
222                    new_partition_exprs: HashMap::from([(
223                        region_id,
224                        range_expr("x", 0, 100).as_json_str().unwrap(),
225                    )]),
226                },
227            )
228            .await
229            .unwrap();
230        let reply = reply.expect_remap_manifest_reply();
231        assert!(reply.exists);
232        assert!(reply.error.is_none(), "{}", reply.error.unwrap());
233        assert_eq!(reply.manifest_paths.len(), 1);
234
235        // Remap failed
236        let reply = RemapManifestHandler
237            .handle(
238                &handler_context,
239                RemapManifest {
240                    region_id,
241                    input_regions: vec![region_id],
242                    region_mapping: HashMap::from([
243                        (region_id, vec![region_id]),
244                        (region_id2, vec![region_id]),
245                    ]),
246                    new_partition_exprs: HashMap::from([(
247                        region_id,
248                        range_expr("x", 0, 100).as_json_str().unwrap(),
249                    )]),
250                },
251            )
252            .await
253            .unwrap();
254        let reply = reply.expect_remap_manifest_reply();
255        assert!(reply.exists);
256        assert!(reply.error.is_some());
257        assert!(reply.manifest_paths.is_empty());
258    }
259}