datanode/heartbeat/handler/
enter_staging.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_meta::instruction::{
16    EnterStagingRegion, EnterStagingRegionReply, EnterStagingRegionsReply, InstructionReply,
17    StagingPartitionDirective as InstructionStagingPartitionDirective,
18};
19use common_telemetry::{error, warn};
20use futures::future::join_all;
21use store_api::region_request::{
22    EnterStagingRequest, RegionRequest,
23    StagingPartitionDirective as RequestStagingPartitionDirective,
24};
25
26use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
27
28#[derive(Debug, Clone, Copy, Default)]
29pub struct EnterStagingRegionsHandler;
30
31#[async_trait::async_trait]
32impl InstructionHandler for EnterStagingRegionsHandler {
33    type Instruction = Vec<EnterStagingRegion>;
34
35    async fn handle(
36        &self,
37        ctx: &HandlerContext,
38        enter_staging: Self::Instruction,
39    ) -> Option<InstructionReply> {
40        let futures = enter_staging.into_iter().map(|enter_staging_region| {
41            Self::handle_enter_staging_region(ctx, enter_staging_region)
42        });
43        let results = join_all(futures).await;
44        Some(InstructionReply::EnterStagingRegions(
45            EnterStagingRegionsReply::new(results),
46        ))
47    }
48}
49
50impl EnterStagingRegionsHandler {
51    async fn handle_enter_staging_region(
52        ctx: &HandlerContext,
53        EnterStagingRegion {
54            region_id,
55            partition_directive,
56        }: EnterStagingRegion,
57    ) -> EnterStagingRegionReply {
58        let Some(writable) = ctx.region_server.is_region_leader(region_id) else {
59            warn!("Region: {} is not found", region_id);
60            return EnterStagingRegionReply {
61                region_id,
62                ready: false,
63                exists: false,
64                error: None,
65            };
66        };
67        if !writable {
68            warn!("Region: {} is not writable", region_id);
69            return EnterStagingRegionReply {
70                region_id,
71                ready: false,
72                exists: true,
73                error: Some("Region is not writable".into()),
74            };
75        }
76
77        common_telemetry::info!("Datanode received enter staging region: {}", region_id);
78
79        let partition_directive = match partition_directive {
80            InstructionStagingPartitionDirective::UpdatePartitionExpr(expr) => {
81                RequestStagingPartitionDirective::UpdatePartitionExpr(expr)
82            }
83            InstructionStagingPartitionDirective::RejectAllWrites => {
84                RequestStagingPartitionDirective::RejectAllWrites
85            }
86        };
87
88        match ctx
89            .region_server
90            .handle_request(
91                region_id,
92                RegionRequest::EnterStaging(EnterStagingRequest {
93                    partition_directive,
94                }),
95            )
96            .await
97        {
98            Ok(_) => EnterStagingRegionReply {
99                region_id,
100                ready: true,
101                exists: true,
102                error: None,
103            },
104            Err(err) => {
105                error!(err; "Failed to enter staging region, region_id: {}", region_id);
106                EnterStagingRegionReply {
107                    region_id,
108                    ready: false,
109                    exists: true,
110                    error: Some(format!("{err:?}")),
111                }
112            }
113        }
114    }
115}
116
117#[cfg(test)]
118mod tests {
119    use std::sync::Arc;
120
121    use common_meta::instruction::{
122        EnterStagingRegion, StagingPartitionDirective as InstructionStagingPartitionDirective,
123    };
124    use common_meta::kv_backend::memory::MemoryKvBackend;
125    use mito2::config::MitoConfig;
126    use mito2::engine::MITO_ENGINE_NAME;
127    use mito2::test_util::{CreateRequestBuilder, TestEnv};
128    use store_api::path_utils::table_dir;
129    use store_api::region_engine::RegionRole;
130    use store_api::region_request::RegionRequest;
131    use store_api::storage::RegionId;
132
133    use crate::heartbeat::handler::enter_staging::EnterStagingRegionsHandler;
134    use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
135    use crate::region_server::RegionServer;
136    use crate::tests::{MockRegionEngine, mock_region_server};
137
138    const PARTITION_EXPR: &str = "partition_expr";
139
140    #[tokio::test]
141    async fn test_region_not_exist() {
142        let mut mock_region_server = mock_region_server();
143        let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
144        mock_region_server.register_engine(mock_engine);
145        let kv_backend = Arc::new(MemoryKvBackend::new());
146        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
147        let region_id = RegionId::new(1024, 1);
148        let replies = EnterStagingRegionsHandler
149            .handle(
150                &handler_context,
151                vec![EnterStagingRegion {
152                    region_id,
153                    partition_directive: InstructionStagingPartitionDirective::UpdatePartitionExpr(
154                        "".to_string(),
155                    ),
156                }],
157            )
158            .await
159            .unwrap();
160        let replies = replies.expect_enter_staging_regions_reply();
161        let reply = &replies[0];
162        assert!(!reply.exists);
163        assert!(reply.error.is_none());
164        assert!(!reply.ready);
165    }
166
167    #[tokio::test]
168    async fn test_region_not_writable() {
169        let mock_region_server = mock_region_server();
170        let region_id = RegionId::new(1024, 1);
171        let (mock_engine, _) =
172            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
173                region_engine.mock_role = Some(Some(RegionRole::Follower));
174                region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
175            });
176        mock_region_server.register_test_region(region_id, mock_engine);
177        let kv_backend = Arc::new(MemoryKvBackend::new());
178        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
179        let replies = EnterStagingRegionsHandler
180            .handle(
181                &handler_context,
182                vec![EnterStagingRegion {
183                    region_id,
184                    partition_directive: InstructionStagingPartitionDirective::UpdatePartitionExpr(
185                        "".to_string(),
186                    ),
187                }],
188            )
189            .await
190            .unwrap();
191        let replies = replies.expect_enter_staging_regions_reply();
192        let reply = &replies[0];
193        assert!(reply.exists);
194        assert!(reply.error.is_some());
195        assert!(!reply.ready);
196    }
197
198    async fn prepare_region(region_server: &RegionServer) {
199        let builder = CreateRequestBuilder::new();
200        let mut create_req = builder.build();
201        create_req.table_dir = table_dir("test", 1024);
202        let region_id = RegionId::new(1024, 1);
203        region_server
204            .handle_request(region_id, RegionRequest::Create(create_req))
205            .await
206            .unwrap();
207    }
208
209    #[tokio::test]
210    async fn test_enter_staging() {
211        let mut region_server = mock_region_server();
212        let region_id = RegionId::new(1024, 1);
213        let mut engine_env = TestEnv::new().await;
214        let engine = engine_env.create_engine(MitoConfig::default()).await;
215        region_server.register_engine(Arc::new(engine.clone()));
216        prepare_region(&region_server).await;
217
218        let kv_backend = Arc::new(MemoryKvBackend::new());
219        let handler_context = HandlerContext::new_for_test(region_server, kv_backend);
220        let replies = EnterStagingRegionsHandler
221            .handle(
222                &handler_context,
223                vec![EnterStagingRegion {
224                    region_id,
225                    partition_directive: InstructionStagingPartitionDirective::UpdatePartitionExpr(
226                        PARTITION_EXPR.to_string(),
227                    ),
228                }],
229            )
230            .await
231            .unwrap();
232        let replies = replies.expect_enter_staging_regions_reply();
233        let reply = &replies[0];
234        assert!(reply.exists);
235        assert!(reply.error.is_none());
236        assert!(reply.ready);
237
238        // Should be ok to enter staging mode again with the same partition expr
239        let replies = EnterStagingRegionsHandler
240            .handle(
241                &handler_context,
242                vec![EnterStagingRegion {
243                    region_id,
244                    partition_directive: InstructionStagingPartitionDirective::UpdatePartitionExpr(
245                        PARTITION_EXPR.to_string(),
246                    ),
247                }],
248            )
249            .await
250            .unwrap();
251        let replies = replies.expect_enter_staging_regions_reply();
252        let reply = &replies[0];
253        assert!(reply.exists);
254        assert!(reply.error.is_none());
255        assert!(reply.ready);
256
257        // Should throw error if try to enter staging mode again with a different partition expr
258        let replies = EnterStagingRegionsHandler
259            .handle(
260                &handler_context,
261                vec![EnterStagingRegion {
262                    region_id,
263                    partition_directive: InstructionStagingPartitionDirective::UpdatePartitionExpr(
264                        "".to_string(),
265                    ),
266                }],
267            )
268            .await
269            .unwrap();
270        let replies = replies.expect_enter_staging_regions_reply();
271        let reply = &replies[0];
272        assert!(reply.exists);
273        assert!(reply.error.is_some());
274        assert!(!reply.ready);
275    }
276
277    #[tokio::test]
278    async fn test_enter_staging_reject_all_writes() {
279        let mut region_server = mock_region_server();
280        let region_id = RegionId::new(1024, 1);
281        let mut engine_env = TestEnv::new().await;
282        let engine = engine_env.create_engine(MitoConfig::default()).await;
283        region_server.register_engine(Arc::new(engine.clone()));
284        prepare_region(&region_server).await;
285
286        let kv_backend = Arc::new(MemoryKvBackend::new());
287        let handler_context = HandlerContext::new_for_test(region_server, kv_backend);
288        let replies = EnterStagingRegionsHandler
289            .handle(
290                &handler_context,
291                vec![EnterStagingRegion {
292                    region_id,
293                    partition_directive: InstructionStagingPartitionDirective::RejectAllWrites,
294                }],
295            )
296            .await
297            .unwrap();
298        let replies = replies.expect_enter_staging_regions_reply();
299        let reply = &replies[0];
300        assert!(reply.exists);
301        assert!(reply.ready);
302        assert!(reply.error.is_none());
303    }
304}