datanode/heartbeat/handler/
apply_staging_manifest.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_meta::instruction::{
16    ApplyStagingManifest, ApplyStagingManifestReply, ApplyStagingManifestsReply, InstructionReply,
17};
18use common_telemetry::{error, warn};
19use futures::future::join_all;
20use store_api::region_request::{ApplyStagingManifestRequest, RegionRequest};
21
22use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
23
24pub struct ApplyStagingManifestsHandler;
25
26#[async_trait::async_trait]
27impl InstructionHandler for ApplyStagingManifestsHandler {
28    type Instruction = Vec<ApplyStagingManifest>;
29    async fn handle(
30        &self,
31        ctx: &HandlerContext,
32        requests: Self::Instruction,
33    ) -> Option<InstructionReply> {
34        let results = join_all(
35            requests
36                .into_iter()
37                .map(|request| Self::handle_apply_staging_manifest(ctx, request)),
38        )
39        .await;
40        Some(InstructionReply::ApplyStagingManifests(
41            ApplyStagingManifestsReply::new(results),
42        ))
43    }
44}
45
46impl ApplyStagingManifestsHandler {
47    async fn handle_apply_staging_manifest(
48        ctx: &HandlerContext,
49        request: ApplyStagingManifest,
50    ) -> ApplyStagingManifestReply {
51        let ApplyStagingManifest {
52            region_id,
53            ref partition_expr,
54            central_region_id,
55            ref manifest_path,
56        } = request;
57        common_telemetry::info!(
58            "Datanode received apply staging manifest request, region_id: {}, central_region_id: {}, partition_expr: {}, manifest_path: {}",
59            region_id,
60            central_region_id,
61            partition_expr,
62            manifest_path
63        );
64        let Some(leader) = ctx.region_server.is_region_leader(region_id) else {
65            warn!("Region: {} is not found", region_id);
66            return ApplyStagingManifestReply {
67                region_id,
68                exists: false,
69                ready: false,
70                error: None,
71            };
72        };
73        if !leader {
74            warn!("Region: {} is not leader", region_id);
75            return ApplyStagingManifestReply {
76                region_id,
77                exists: true,
78                ready: false,
79                error: Some("Region is not leader".into()),
80            };
81        }
82
83        match ctx
84            .region_server
85            .handle_request(
86                region_id,
87                RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
88                    partition_expr: partition_expr.clone(),
89                    central_region_id,
90                    manifest_path: manifest_path.clone(),
91                }),
92            )
93            .await
94        {
95            Ok(_) => ApplyStagingManifestReply {
96                region_id,
97                exists: true,
98                ready: true,
99                error: None,
100            },
101            Err(err) => {
102                error!(err; "Failed to apply staging manifest, region_id: {}", region_id);
103                ApplyStagingManifestReply {
104                    region_id,
105                    exists: true,
106                    ready: false,
107                    error: Some(format!("{err:?}")),
108                }
109            }
110        }
111    }
112}
113
114#[cfg(test)]
115mod tests {
116    use std::collections::HashMap;
117    use std::sync::Arc;
118
119    use common_meta::instruction::RemapManifest;
120    use common_meta::kv_backend::memory::MemoryKvBackend;
121    use datatypes::value::Value;
122    use mito2::config::MitoConfig;
123    use mito2::engine::MITO_ENGINE_NAME;
124    use mito2::test_util::{CreateRequestBuilder, TestEnv};
125    use partition::expr::{PartitionExpr, col};
126    use store_api::path_utils::table_dir;
127    use store_api::region_engine::RegionRole;
128    use store_api::region_request::EnterStagingRequest;
129    use store_api::storage::RegionId;
130
131    use super::*;
132    use crate::heartbeat::handler::remap_manifest::RemapManifestHandler;
133    use crate::region_server::RegionServer;
134    use crate::tests::{MockRegionEngine, mock_region_server};
135
136    #[tokio::test]
137    async fn test_region_not_exist() {
138        let mut mock_region_server = mock_region_server();
139        let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
140        mock_region_server.register_engine(mock_engine);
141        let kv_backend = Arc::new(MemoryKvBackend::new());
142        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
143        let region_id = RegionId::new(1024, 1);
144        let reply = ApplyStagingManifestsHandler
145            .handle(
146                &handler_context,
147                vec![ApplyStagingManifest {
148                    region_id,
149                    partition_expr: "".to_string(),
150                    central_region_id: RegionId::new(1024, 9999), // use a dummy value
151                    manifest_path: "".to_string(),
152                }],
153            )
154            .await
155            .unwrap();
156        let replies = reply.expect_apply_staging_manifests_reply();
157        let reply = &replies[0];
158        assert!(!reply.exists);
159        assert!(!reply.ready);
160        assert!(reply.error.is_none());
161    }
162
163    #[tokio::test]
164    async fn test_region_not_leader() {
165        let mock_region_server = mock_region_server();
166        let region_id = RegionId::new(1024, 1);
167        let (mock_engine, _) =
168            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
169                region_engine.mock_role = Some(Some(RegionRole::Follower));
170                region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
171            });
172        mock_region_server.register_test_region(region_id, mock_engine);
173        let kv_backend = Arc::new(MemoryKvBackend::new());
174        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
175        let region_id = RegionId::new(1024, 1);
176        let reply = ApplyStagingManifestsHandler
177            .handle(
178                &handler_context,
179                vec![ApplyStagingManifest {
180                    region_id,
181                    partition_expr: "".to_string(),
182                    central_region_id: RegionId::new(1024, 2),
183                    manifest_path: "".to_string(),
184                }],
185            )
186            .await
187            .unwrap();
188        let replies = reply.expect_apply_staging_manifests_reply();
189        let reply = &replies[0];
190        assert!(reply.exists);
191        assert!(!reply.ready);
192        assert!(reply.error.is_some());
193    }
194
195    fn range_expr(col_name: &str, start: i64, end: i64) -> PartitionExpr {
196        col(col_name)
197            .gt_eq(Value::Int64(start))
198            .and(col(col_name).lt(Value::Int64(end)))
199    }
200
201    async fn prepare_region(region_server: &RegionServer) {
202        let region_specs = [
203            (RegionId::new(1024, 1), range_expr("x", 0, 49)),
204            (RegionId::new(1024, 2), range_expr("x", 49, 100)),
205        ];
206
207        for (region_id, partition_expr) in region_specs {
208            let builder = CreateRequestBuilder::new();
209            let mut create_req = builder.build();
210            create_req.table_dir = table_dir("test", 1024);
211            region_server
212                .handle_request(region_id, RegionRequest::Create(create_req))
213                .await
214                .unwrap();
215            region_server
216                .handle_request(
217                    region_id,
218                    RegionRequest::EnterStaging(EnterStagingRequest {
219                        partition_expr: partition_expr.as_json_str().unwrap(),
220                    }),
221                )
222                .await
223                .unwrap();
224        }
225    }
226
227    #[tokio::test]
228    async fn test_apply_staging_manifest() {
229        common_telemetry::init_default_ut_logging();
230        let mut region_server = mock_region_server();
231        let region_id = RegionId::new(1024, 1);
232        let mut engine_env = TestEnv::new().await;
233        let engine = engine_env.create_engine(MitoConfig::default()).await;
234        region_server.register_engine(Arc::new(engine.clone()));
235        prepare_region(&region_server).await;
236
237        let kv_backend = Arc::new(MemoryKvBackend::new());
238        let handler_context = HandlerContext::new_for_test(region_server, kv_backend);
239        let region_id2 = RegionId::new(1024, 2);
240        let reply = RemapManifestHandler
241            .handle(
242                &handler_context,
243                RemapManifest {
244                    region_id,
245                    input_regions: vec![region_id, region_id2],
246                    region_mapping: HashMap::from([
247                        // [0,49) <- [0, 50)
248                        (region_id, vec![region_id]),
249                        // [49, 100) <- [0, 50), [50,100)
250                        (region_id2, vec![region_id, region_id2]),
251                    ]),
252                    new_partition_exprs: HashMap::from([
253                        (region_id, range_expr("x", 0, 49).as_json_str().unwrap()),
254                        (region_id2, range_expr("x", 49, 100).as_json_str().unwrap()),
255                    ]),
256                },
257            )
258            .await
259            .unwrap();
260        let reply = reply.expect_remap_manifest_reply();
261        assert!(reply.exists);
262        assert!(reply.error.is_none(), "{}", reply.error.unwrap());
263        assert_eq!(reply.manifest_paths.len(), 2);
264        let manifest_path_1 = reply.manifest_paths[&region_id].clone();
265        let manifest_path_2 = reply.manifest_paths[&region_id2].clone();
266
267        let reply = ApplyStagingManifestsHandler
268            .handle(
269                &handler_context,
270                vec![ApplyStagingManifest {
271                    region_id,
272                    partition_expr: range_expr("x", 0, 49).as_json_str().unwrap(),
273                    central_region_id: region_id,
274                    manifest_path: manifest_path_1,
275                }],
276            )
277            .await
278            .unwrap();
279        let replies = reply.expect_apply_staging_manifests_reply();
280        let reply = &replies[0];
281        assert!(reply.exists);
282        assert!(reply.ready);
283        assert!(reply.error.is_none());
284
285        // partition expr mismatch
286        let reply = ApplyStagingManifestsHandler
287            .handle(
288                &handler_context,
289                vec![ApplyStagingManifest {
290                    region_id: region_id2,
291                    partition_expr: range_expr("x", 50, 100).as_json_str().unwrap(),
292                    central_region_id: region_id,
293                    manifest_path: manifest_path_2,
294                }],
295            )
296            .await
297            .unwrap();
298        let replies = reply.expect_apply_staging_manifests_reply();
299        let reply = &replies[0];
300        assert!(reply.exists);
301        assert!(!reply.ready);
302        assert!(reply.error.is_some());
303    }
304}