datanode/heartbeat/handler/
remap_manifest.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_meta::instruction::{InstructionReply, RemapManifest, RemapManifestReply};
16use common_telemetry::{error, info, warn};
17use store_api::region_engine::RemapManifestsRequest;
18
19use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
20
21pub struct RemapManifestHandler;
22
23#[async_trait::async_trait]
24impl InstructionHandler for RemapManifestHandler {
25    type Instruction = RemapManifest;
26    async fn handle(
27        &self,
28        ctx: &HandlerContext,
29        request: Self::Instruction,
30    ) -> Option<InstructionReply> {
31        let RemapManifest {
32            region_id,
33            input_regions,
34            region_mapping,
35            new_partition_exprs,
36        } = request;
37        info!(
38            "Datanode received remap manifest request, region_id: {}, input_regions: {}, target_regions: {}",
39            region_id,
40            input_regions.len(),
41            new_partition_exprs.len()
42        );
43        let Some(leader) = ctx.region_server.is_region_leader(region_id) else {
44            warn!("Region: {} is not found", region_id);
45            return Some(InstructionReply::RemapManifest(RemapManifestReply {
46                exists: false,
47                manifest_paths: Default::default(),
48                error: None,
49            }));
50        };
51
52        if !leader {
53            warn!("Region: {} is not leader", region_id);
54            return Some(InstructionReply::RemapManifest(RemapManifestReply {
55                exists: true,
56                manifest_paths: Default::default(),
57                error: Some("Region is not leader".into()),
58            }));
59        }
60
61        let reply = match ctx
62            .region_server
63            .remap_manifests(RemapManifestsRequest {
64                region_id,
65                input_regions,
66                region_mapping,
67                new_partition_exprs,
68            })
69            .await
70        {
71            Ok(result) => InstructionReply::RemapManifest(RemapManifestReply {
72                exists: true,
73                manifest_paths: result.manifest_paths,
74                error: None,
75            }),
76            Err(e) => {
77                error!(
78                    e;
79                    "Remap manifests failed on datanode, region_id: {}",
80                    region_id
81                );
82                InstructionReply::RemapManifest(RemapManifestReply {
83                    exists: true,
84                    manifest_paths: Default::default(),
85                    error: Some(format!("{e:?}")),
86                })
87            }
88        };
89
90        Some(reply)
91    }
92}
93
94#[cfg(test)]
95mod tests {
96    use std::collections::HashMap;
97    use std::sync::Arc;
98
99    use common_meta::instruction::RemapManifest;
100    use common_meta::kv_backend::memory::MemoryKvBackend;
101    use datatypes::value::Value;
102    use mito2::config::MitoConfig;
103    use mito2::engine::MITO_ENGINE_NAME;
104    use mito2::test_util::{CreateRequestBuilder, TestEnv};
105    use partition::expr::{PartitionExpr, col};
106    use store_api::path_utils::table_dir;
107    use store_api::region_engine::RegionRole;
108    use store_api::region_request::{
109        EnterStagingRequest, RegionRequest, StagingPartitionDirective,
110    };
111    use store_api::storage::RegionId;
112
113    use crate::heartbeat::handler::remap_manifest::RemapManifestHandler;
114    use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
115    use crate::region_server::RegionServer;
116    use crate::tests::{MockRegionEngine, mock_region_server};
117
118    #[tokio::test]
119    async fn test_region_not_exist() {
120        let mut mock_region_server = mock_region_server();
121        let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
122        mock_region_server.register_engine(mock_engine);
123        let kv_backend = Arc::new(MemoryKvBackend::new());
124        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
125        let region_id = RegionId::new(1024, 1);
126        let reply = RemapManifestHandler
127            .handle(
128                &handler_context,
129                RemapManifest {
130                    region_id,
131                    input_regions: vec![],
132                    region_mapping: HashMap::new(),
133                    new_partition_exprs: HashMap::new(),
134                },
135            )
136            .await
137            .unwrap();
138        let reply = &reply.expect_remap_manifest_reply();
139        assert!(!reply.exists);
140        assert!(reply.error.is_none());
141        assert!(reply.manifest_paths.is_empty());
142    }
143
144    #[tokio::test]
145    async fn test_region_not_leader() {
146        let mock_region_server = mock_region_server();
147        let region_id = RegionId::new(1024, 1);
148        let (mock_engine, _) =
149            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
150                region_engine.mock_role = Some(Some(RegionRole::Follower));
151                region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
152            });
153        mock_region_server.register_test_region(region_id, mock_engine);
154        let kv_backend = Arc::new(MemoryKvBackend::new());
155        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
156        let reply = RemapManifestHandler
157            .handle(
158                &handler_context,
159                RemapManifest {
160                    region_id,
161                    input_regions: vec![],
162                    region_mapping: HashMap::new(),
163                    new_partition_exprs: HashMap::new(),
164                },
165            )
166            .await
167            .unwrap();
168        let reply = reply.expect_remap_manifest_reply();
169        assert!(reply.exists);
170        assert!(reply.error.is_some());
171    }
172
173    fn range_expr(col_name: &str, start: i64, end: i64) -> PartitionExpr {
174        col(col_name)
175            .gt_eq(Value::Int64(start))
176            .and(col(col_name).lt(Value::Int64(end)))
177    }
178
179    async fn prepare_region(region_server: &RegionServer) {
180        let region_specs = [
181            (RegionId::new(1024, 1), range_expr("x", 0, 50)),
182            (RegionId::new(1024, 2), range_expr("x", 50, 100)),
183        ];
184
185        for (region_id, partition_expr) in region_specs {
186            let builder = CreateRequestBuilder::new();
187            let mut create_req = builder.build();
188            create_req.table_dir = table_dir("test", 1024);
189            region_server
190                .handle_request(region_id, RegionRequest::Create(create_req))
191                .await
192                .unwrap();
193            region_server
194                .handle_request(
195                    region_id,
196                    RegionRequest::EnterStaging(EnterStagingRequest {
197                        partition_directive: StagingPartitionDirective::UpdatePartitionExpr(
198                            partition_expr.as_json_str().unwrap(),
199                        ),
200                    }),
201                )
202                .await
203                .unwrap();
204        }
205    }
206
207    #[tokio::test]
208    async fn test_remap_manifest() {
209        common_telemetry::init_default_ut_logging();
210        let mut region_server = mock_region_server();
211        let region_id = RegionId::new(1024, 1);
212        let mut engine_env = TestEnv::new().await;
213        let engine = engine_env.create_engine(MitoConfig::default()).await;
214        region_server.register_engine(Arc::new(engine.clone()));
215        prepare_region(&region_server).await;
216
217        let kv_backend = Arc::new(MemoryKvBackend::new());
218        let handler_context = HandlerContext::new_for_test(region_server, kv_backend);
219        let region_id2 = RegionId::new(1024, 2);
220        let reply = RemapManifestHandler
221            .handle(
222                &handler_context,
223                RemapManifest {
224                    region_id,
225                    input_regions: vec![region_id, region_id2],
226                    region_mapping: HashMap::from([
227                        (region_id, vec![region_id]),
228                        (region_id2, vec![region_id]),
229                    ]),
230                    new_partition_exprs: HashMap::from([(
231                        region_id,
232                        range_expr("x", 0, 100).as_json_str().unwrap(),
233                    )]),
234                },
235            )
236            .await
237            .unwrap();
238        let reply = reply.expect_remap_manifest_reply();
239        assert!(reply.exists);
240        assert!(reply.error.is_none(), "{}", reply.error.unwrap());
241        assert_eq!(reply.manifest_paths.len(), 1);
242
243        // Remap failed
244        let reply = RemapManifestHandler
245            .handle(
246                &handler_context,
247                RemapManifest {
248                    region_id,
249                    input_regions: vec![region_id],
250                    region_mapping: HashMap::from([
251                        (region_id, vec![region_id]),
252                        (region_id2, vec![region_id]),
253                    ]),
254                    new_partition_exprs: HashMap::from([(
255                        region_id,
256                        range_expr("x", 0, 100).as_json_str().unwrap(),
257                    )]),
258                },
259            )
260            .await
261            .unwrap();
262        let reply = reply.expect_remap_manifest_reply();
263        assert!(reply.exists);
264        assert!(reply.error.is_some());
265        assert!(reply.manifest_paths.is_empty());
266    }
267}