datanode/heartbeat/handler/
remap_manifest.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_meta::instruction::{InstructionReply, RemapManifest, RemapManifestReply};
16use common_telemetry::{error, info, warn};
17use store_api::region_engine::RemapManifestsRequest;
18
19use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
20
21pub struct RemapManifestHandler;
22
23#[async_trait::async_trait]
24impl InstructionHandler for RemapManifestHandler {
25    type Instruction = RemapManifest;
26    async fn handle(
27        &self,
28        ctx: &HandlerContext,
29        request: Self::Instruction,
30    ) -> Option<InstructionReply> {
31        let RemapManifest {
32            region_id,
33            input_regions,
34            region_mapping,
35            new_partition_exprs,
36        } = request;
37        info!(
38            "Datanode received remap manifest request, region_id: {}, input_regions: {}, target_regions: {}",
39            region_id,
40            input_regions.len(),
41            new_partition_exprs.len()
42        );
43        let Some(leader) = ctx.region_server.is_region_leader(region_id) else {
44            warn!("Region: {} is not found", region_id);
45            return Some(InstructionReply::RemapManifest(RemapManifestReply {
46                exists: false,
47                manifest_paths: Default::default(),
48                error: None,
49            }));
50        };
51
52        if !leader {
53            warn!("Region: {} is not leader", region_id);
54            return Some(InstructionReply::RemapManifest(RemapManifestReply {
55                exists: true,
56                manifest_paths: Default::default(),
57                error: Some("Region is not leader".into()),
58            }));
59        }
60
61        let reply = match ctx
62            .region_server
63            .remap_manifests(RemapManifestsRequest {
64                region_id,
65                input_regions,
66                region_mapping,
67                new_partition_exprs,
68            })
69            .await
70        {
71            Ok(result) => InstructionReply::RemapManifest(RemapManifestReply {
72                exists: true,
73                manifest_paths: result.manifest_paths,
74                error: None,
75            }),
76            Err(e) => {
77                error!(
78                    e;
79                    "Remap manifests failed on datanode, region_id: {}",
80                    region_id
81                );
82                InstructionReply::RemapManifest(RemapManifestReply {
83                    exists: true,
84                    manifest_paths: Default::default(),
85                    error: Some(format!("{e:?}")),
86                })
87            }
88        };
89
90        Some(reply)
91    }
92}
93
94#[cfg(test)]
95mod tests {
96    use std::collections::HashMap;
97    use std::sync::Arc;
98
99    use common_meta::instruction::RemapManifest;
100    use common_meta::kv_backend::memory::MemoryKvBackend;
101    use datatypes::value::Value;
102    use mito2::config::MitoConfig;
103    use mito2::engine::MITO_ENGINE_NAME;
104    use mito2::test_util::{CreateRequestBuilder, TestEnv};
105    use partition::expr::{PartitionExpr, col};
106    use store_api::path_utils::table_dir;
107    use store_api::region_engine::RegionRole;
108    use store_api::region_request::{EnterStagingRequest, RegionRequest};
109    use store_api::storage::RegionId;
110
111    use crate::heartbeat::handler::remap_manifest::RemapManifestHandler;
112    use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
113    use crate::region_server::RegionServer;
114    use crate::tests::{MockRegionEngine, mock_region_server};
115
116    #[tokio::test]
117    async fn test_region_not_exist() {
118        let mut mock_region_server = mock_region_server();
119        let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
120        mock_region_server.register_engine(mock_engine);
121        let kv_backend = Arc::new(MemoryKvBackend::new());
122        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
123        let region_id = RegionId::new(1024, 1);
124        let reply = RemapManifestHandler
125            .handle(
126                &handler_context,
127                RemapManifest {
128                    region_id,
129                    input_regions: vec![],
130                    region_mapping: HashMap::new(),
131                    new_partition_exprs: HashMap::new(),
132                },
133            )
134            .await
135            .unwrap();
136        let reply = &reply.expect_remap_manifest_reply();
137        assert!(!reply.exists);
138        assert!(reply.error.is_none());
139        assert!(reply.manifest_paths.is_empty());
140    }
141
142    #[tokio::test]
143    async fn test_region_not_leader() {
144        let mock_region_server = mock_region_server();
145        let region_id = RegionId::new(1024, 1);
146        let (mock_engine, _) =
147            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
148                region_engine.mock_role = Some(Some(RegionRole::Follower));
149                region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
150            });
151        mock_region_server.register_test_region(region_id, mock_engine);
152        let kv_backend = Arc::new(MemoryKvBackend::new());
153        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
154        let reply = RemapManifestHandler
155            .handle(
156                &handler_context,
157                RemapManifest {
158                    region_id,
159                    input_regions: vec![],
160                    region_mapping: HashMap::new(),
161                    new_partition_exprs: HashMap::new(),
162                },
163            )
164            .await
165            .unwrap();
166        let reply = reply.expect_remap_manifest_reply();
167        assert!(reply.exists);
168        assert!(reply.error.is_some());
169    }
170
171    fn range_expr(col_name: &str, start: i64, end: i64) -> PartitionExpr {
172        col(col_name)
173            .gt_eq(Value::Int64(start))
174            .and(col(col_name).lt(Value::Int64(end)))
175    }
176
177    async fn prepare_region(region_server: &RegionServer) {
178        let region_specs = [
179            (RegionId::new(1024, 1), range_expr("x", 0, 50)),
180            (RegionId::new(1024, 2), range_expr("x", 50, 100)),
181        ];
182
183        for (region_id, partition_expr) in region_specs {
184            let builder = CreateRequestBuilder::new();
185            let mut create_req = builder.build();
186            create_req.table_dir = table_dir("test", 1024);
187            region_server
188                .handle_request(region_id, RegionRequest::Create(create_req))
189                .await
190                .unwrap();
191            region_server
192                .handle_request(
193                    region_id,
194                    RegionRequest::EnterStaging(EnterStagingRequest {
195                        partition_expr: partition_expr.as_json_str().unwrap(),
196                    }),
197                )
198                .await
199                .unwrap();
200        }
201    }
202
203    #[tokio::test]
204    async fn test_remap_manifest() {
205        common_telemetry::init_default_ut_logging();
206        let mut region_server = mock_region_server();
207        let region_id = RegionId::new(1024, 1);
208        let mut engine_env = TestEnv::new().await;
209        let engine = engine_env.create_engine(MitoConfig::default()).await;
210        region_server.register_engine(Arc::new(engine.clone()));
211        prepare_region(&region_server).await;
212
213        let kv_backend = Arc::new(MemoryKvBackend::new());
214        let handler_context = HandlerContext::new_for_test(region_server, kv_backend);
215        let region_id2 = RegionId::new(1024, 2);
216        let reply = RemapManifestHandler
217            .handle(
218                &handler_context,
219                RemapManifest {
220                    region_id,
221                    input_regions: vec![region_id, region_id2],
222                    region_mapping: HashMap::from([
223                        (region_id, vec![region_id]),
224                        (region_id2, vec![region_id]),
225                    ]),
226                    new_partition_exprs: HashMap::from([(
227                        region_id,
228                        range_expr("x", 0, 100).as_json_str().unwrap(),
229                    )]),
230                },
231            )
232            .await
233            .unwrap();
234        let reply = reply.expect_remap_manifest_reply();
235        assert!(reply.exists);
236        assert!(reply.error.is_none(), "{}", reply.error.unwrap());
237        assert_eq!(reply.manifest_paths.len(), 1);
238
239        // Remap failed
240        let reply = RemapManifestHandler
241            .handle(
242                &handler_context,
243                RemapManifest {
244                    region_id,
245                    input_regions: vec![region_id],
246                    region_mapping: HashMap::from([
247                        (region_id, vec![region_id]),
248                        (region_id2, vec![region_id]),
249                    ]),
250                    new_partition_exprs: HashMap::from([(
251                        region_id,
252                        range_expr("x", 0, 100).as_json_str().unwrap(),
253                    )]),
254                },
255            )
256            .await
257            .unwrap();
258        let reply = reply.expect_remap_manifest_reply();
259        assert!(reply.exists);
260        assert!(reply.error.is_some());
261        assert!(reply.manifest_paths.is_empty());
262    }
263}