datanode/heartbeat/handler/
enter_staging.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_meta::instruction::{
16    EnterStagingRegion, EnterStagingRegionReply, EnterStagingRegionsReply, InstructionReply,
17};
18use common_telemetry::{error, warn};
19use futures::future::join_all;
20use store_api::region_request::{EnterStagingRequest, RegionRequest};
21
22use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
23
24#[derive(Debug, Clone, Copy, Default)]
25pub struct EnterStagingRegionsHandler;
26
27#[async_trait::async_trait]
28impl InstructionHandler for EnterStagingRegionsHandler {
29    type Instruction = Vec<EnterStagingRegion>;
30
31    async fn handle(
32        &self,
33        ctx: &HandlerContext,
34        enter_staging: Self::Instruction,
35    ) -> Option<InstructionReply> {
36        let futures = enter_staging.into_iter().map(|enter_staging_region| {
37            Self::handle_enter_staging_region(ctx, enter_staging_region)
38        });
39        let results = join_all(futures).await;
40        Some(InstructionReply::EnterStagingRegions(
41            EnterStagingRegionsReply::new(results),
42        ))
43    }
44}
45
46impl EnterStagingRegionsHandler {
47    async fn handle_enter_staging_region(
48        ctx: &HandlerContext,
49        EnterStagingRegion {
50            region_id,
51            partition_expr,
52        }: EnterStagingRegion,
53    ) -> EnterStagingRegionReply {
54        common_telemetry::info!(
55            "Datanode received enter staging region: {}, partition_expr: {}",
56            region_id,
57            partition_expr
58        );
59        let Some(writable) = ctx.region_server.is_region_leader(region_id) else {
60            warn!("Region: {} is not found", region_id);
61            return EnterStagingRegionReply {
62                region_id,
63                ready: false,
64                exists: false,
65                error: None,
66            };
67        };
68        if !writable {
69            warn!("Region: {} is not writable", region_id);
70            return EnterStagingRegionReply {
71                region_id,
72                ready: false,
73                exists: true,
74                error: Some("Region is not writable".into()),
75            };
76        }
77
78        match ctx
79            .region_server
80            .handle_request(
81                region_id,
82                RegionRequest::EnterStaging(EnterStagingRequest { partition_expr }),
83            )
84            .await
85        {
86            Ok(_) => EnterStagingRegionReply {
87                region_id,
88                ready: true,
89                exists: true,
90                error: None,
91            },
92            Err(err) => {
93                error!(err; "Failed to enter staging region, region_id: {}", region_id);
94                EnterStagingRegionReply {
95                    region_id,
96                    ready: false,
97                    exists: true,
98                    error: Some(format!("{err:?}")),
99                }
100            }
101        }
102    }
103}
104
105#[cfg(test)]
106mod tests {
107    use std::sync::Arc;
108
109    use common_meta::instruction::EnterStagingRegion;
110    use mito2::config::MitoConfig;
111    use mito2::engine::MITO_ENGINE_NAME;
112    use mito2::test_util::{CreateRequestBuilder, TestEnv};
113    use store_api::path_utils::table_dir;
114    use store_api::region_engine::RegionRole;
115    use store_api::region_request::RegionRequest;
116    use store_api::storage::RegionId;
117
118    use crate::heartbeat::handler::enter_staging::EnterStagingRegionsHandler;
119    use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
120    use crate::region_server::RegionServer;
121    use crate::tests::{MockRegionEngine, mock_region_server};
122
123    const PARTITION_EXPR: &str = "partition_expr";
124
125    #[tokio::test]
126    async fn test_region_not_exist() {
127        let mut mock_region_server = mock_region_server();
128        let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
129        mock_region_server.register_engine(mock_engine);
130        let handler_context = HandlerContext::new_for_test(mock_region_server);
131        let region_id = RegionId::new(1024, 1);
132        let replies = EnterStagingRegionsHandler
133            .handle(
134                &handler_context,
135                vec![EnterStagingRegion {
136                    region_id,
137                    partition_expr: "".to_string(),
138                }],
139            )
140            .await
141            .unwrap();
142        let replies = replies.expect_enter_staging_regions_reply();
143        let reply = &replies[0];
144        assert!(!reply.exists);
145        assert!(reply.error.is_none());
146        assert!(!reply.ready);
147    }
148
149    #[tokio::test]
150    async fn test_region_not_writable() {
151        let mock_region_server = mock_region_server();
152        let region_id = RegionId::new(1024, 1);
153        let (mock_engine, _) =
154            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
155                region_engine.mock_role = Some(Some(RegionRole::Follower));
156                region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
157            });
158        mock_region_server.register_test_region(region_id, mock_engine);
159        let handler_context = HandlerContext::new_for_test(mock_region_server);
160        let replies = EnterStagingRegionsHandler
161            .handle(
162                &handler_context,
163                vec![EnterStagingRegion {
164                    region_id,
165                    partition_expr: "".to_string(),
166                }],
167            )
168            .await
169            .unwrap();
170        let replies = replies.expect_enter_staging_regions_reply();
171        let reply = &replies[0];
172        assert!(reply.exists);
173        assert!(reply.error.is_some());
174        assert!(!reply.ready);
175    }
176
177    async fn prepare_region(region_server: &RegionServer) {
178        let builder = CreateRequestBuilder::new();
179        let mut create_req = builder.build();
180        create_req.table_dir = table_dir("test", 1024);
181        let region_id = RegionId::new(1024, 1);
182        region_server
183            .handle_request(region_id, RegionRequest::Create(create_req))
184            .await
185            .unwrap();
186    }
187
188    #[tokio::test]
189    async fn test_enter_staging() {
190        let mut region_server = mock_region_server();
191        let region_id = RegionId::new(1024, 1);
192        let mut engine_env = TestEnv::new().await;
193        let engine = engine_env.create_engine(MitoConfig::default()).await;
194        region_server.register_engine(Arc::new(engine.clone()));
195        prepare_region(&region_server).await;
196
197        let handler_context = HandlerContext::new_for_test(region_server);
198        let replies = EnterStagingRegionsHandler
199            .handle(
200                &handler_context,
201                vec![EnterStagingRegion {
202                    region_id,
203                    partition_expr: PARTITION_EXPR.to_string(),
204                }],
205            )
206            .await
207            .unwrap();
208        let replies = replies.expect_enter_staging_regions_reply();
209        let reply = &replies[0];
210        assert!(reply.exists);
211        assert!(reply.error.is_none());
212        assert!(reply.ready);
213
214        // Should be ok to enter staging mode again with the same partition expr
215        let replies = EnterStagingRegionsHandler
216            .handle(
217                &handler_context,
218                vec![EnterStagingRegion {
219                    region_id,
220                    partition_expr: PARTITION_EXPR.to_string(),
221                }],
222            )
223            .await
224            .unwrap();
225        let replies = replies.expect_enter_staging_regions_reply();
226        let reply = &replies[0];
227        assert!(reply.exists);
228        assert!(reply.error.is_none());
229        assert!(reply.ready);
230
231        // Should throw error if try to enter staging mode again with a different partition expr
232        let replies = EnterStagingRegionsHandler
233            .handle(
234                &handler_context,
235                vec![EnterStagingRegion {
236                    region_id,
237                    partition_expr: "".to_string(),
238                }],
239            )
240            .await
241            .unwrap();
242        let replies = replies.expect_enter_staging_regions_reply();
243        let reply = &replies[0];
244        assert!(reply.exists);
245        assert!(reply.error.is_some());
246        assert!(!reply.ready);
247    }
248}