datanode/heartbeat/handler/
enter_staging.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_meta::instruction::{
16    EnterStagingRegion, EnterStagingRegionReply, EnterStagingRegionsReply, InstructionReply,
17};
18use common_telemetry::{error, warn};
19use futures::future::join_all;
20use store_api::region_request::{EnterStagingRequest, RegionRequest};
21
22use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
23
24#[derive(Debug, Clone, Copy, Default)]
25pub struct EnterStagingRegionsHandler;
26
27#[async_trait::async_trait]
28impl InstructionHandler for EnterStagingRegionsHandler {
29    type Instruction = Vec<EnterStagingRegion>;
30
31    async fn handle(
32        &self,
33        ctx: &HandlerContext,
34        enter_staging: Self::Instruction,
35    ) -> Option<InstructionReply> {
36        let futures = enter_staging.into_iter().map(|enter_staging_region| {
37            Self::handle_enter_staging_region(ctx, enter_staging_region)
38        });
39        let results = join_all(futures).await;
40        Some(InstructionReply::EnterStagingRegions(
41            EnterStagingRegionsReply::new(results),
42        ))
43    }
44}
45
46impl EnterStagingRegionsHandler {
47    async fn handle_enter_staging_region(
48        ctx: &HandlerContext,
49        EnterStagingRegion {
50            region_id,
51            partition_expr,
52        }: EnterStagingRegion,
53    ) -> EnterStagingRegionReply {
54        common_telemetry::info!(
55            "Datanode received enter staging region: {}, partition_expr: {}",
56            region_id,
57            partition_expr
58        );
59        let Some(writable) = ctx.region_server.is_region_leader(region_id) else {
60            warn!("Region: {} is not found", region_id);
61            return EnterStagingRegionReply {
62                region_id,
63                ready: false,
64                exists: false,
65                error: None,
66            };
67        };
68        if !writable {
69            warn!("Region: {} is not writable", region_id);
70            return EnterStagingRegionReply {
71                region_id,
72                ready: false,
73                exists: true,
74                error: Some("Region is not writable".into()),
75            };
76        }
77
78        match ctx
79            .region_server
80            .handle_request(
81                region_id,
82                RegionRequest::EnterStaging(EnterStagingRequest { partition_expr }),
83            )
84            .await
85        {
86            Ok(_) => EnterStagingRegionReply {
87                region_id,
88                ready: true,
89                exists: true,
90                error: None,
91            },
92            Err(err) => {
93                error!(err; "Failed to enter staging region, region_id: {}", region_id);
94                EnterStagingRegionReply {
95                    region_id,
96                    ready: false,
97                    exists: true,
98                    error: Some(format!("{err:?}")),
99                }
100            }
101        }
102    }
103}
104
105#[cfg(test)]
106mod tests {
107    use std::sync::Arc;
108
109    use common_meta::instruction::EnterStagingRegion;
110    use common_meta::kv_backend::memory::MemoryKvBackend;
111    use mito2::config::MitoConfig;
112    use mito2::engine::MITO_ENGINE_NAME;
113    use mito2::test_util::{CreateRequestBuilder, TestEnv};
114    use store_api::path_utils::table_dir;
115    use store_api::region_engine::RegionRole;
116    use store_api::region_request::RegionRequest;
117    use store_api::storage::RegionId;
118
119    use crate::heartbeat::handler::enter_staging::EnterStagingRegionsHandler;
120    use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
121    use crate::region_server::RegionServer;
122    use crate::tests::{MockRegionEngine, mock_region_server};
123
124    const PARTITION_EXPR: &str = "partition_expr";
125
126    #[tokio::test]
127    async fn test_region_not_exist() {
128        let mut mock_region_server = mock_region_server();
129        let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
130        mock_region_server.register_engine(mock_engine);
131        let kv_backend = Arc::new(MemoryKvBackend::new());
132        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
133        let region_id = RegionId::new(1024, 1);
134        let replies = EnterStagingRegionsHandler
135            .handle(
136                &handler_context,
137                vec![EnterStagingRegion {
138                    region_id,
139                    partition_expr: "".to_string(),
140                }],
141            )
142            .await
143            .unwrap();
144        let replies = replies.expect_enter_staging_regions_reply();
145        let reply = &replies[0];
146        assert!(!reply.exists);
147        assert!(reply.error.is_none());
148        assert!(!reply.ready);
149    }
150
151    #[tokio::test]
152    async fn test_region_not_writable() {
153        let mock_region_server = mock_region_server();
154        let region_id = RegionId::new(1024, 1);
155        let (mock_engine, _) =
156            MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
157                region_engine.mock_role = Some(Some(RegionRole::Follower));
158                region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
159            });
160        mock_region_server.register_test_region(region_id, mock_engine);
161        let kv_backend = Arc::new(MemoryKvBackend::new());
162        let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
163        let replies = EnterStagingRegionsHandler
164            .handle(
165                &handler_context,
166                vec![EnterStagingRegion {
167                    region_id,
168                    partition_expr: "".to_string(),
169                }],
170            )
171            .await
172            .unwrap();
173        let replies = replies.expect_enter_staging_regions_reply();
174        let reply = &replies[0];
175        assert!(reply.exists);
176        assert!(reply.error.is_some());
177        assert!(!reply.ready);
178    }
179
180    async fn prepare_region(region_server: &RegionServer) {
181        let builder = CreateRequestBuilder::new();
182        let mut create_req = builder.build();
183        create_req.table_dir = table_dir("test", 1024);
184        let region_id = RegionId::new(1024, 1);
185        region_server
186            .handle_request(region_id, RegionRequest::Create(create_req))
187            .await
188            .unwrap();
189    }
190
191    #[tokio::test]
192    async fn test_enter_staging() {
193        let mut region_server = mock_region_server();
194        let region_id = RegionId::new(1024, 1);
195        let mut engine_env = TestEnv::new().await;
196        let engine = engine_env.create_engine(MitoConfig::default()).await;
197        region_server.register_engine(Arc::new(engine.clone()));
198        prepare_region(&region_server).await;
199
200        let kv_backend = Arc::new(MemoryKvBackend::new());
201        let handler_context = HandlerContext::new_for_test(region_server, kv_backend);
202        let replies = EnterStagingRegionsHandler
203            .handle(
204                &handler_context,
205                vec![EnterStagingRegion {
206                    region_id,
207                    partition_expr: PARTITION_EXPR.to_string(),
208                }],
209            )
210            .await
211            .unwrap();
212        let replies = replies.expect_enter_staging_regions_reply();
213        let reply = &replies[0];
214        assert!(reply.exists);
215        assert!(reply.error.is_none());
216        assert!(reply.ready);
217
218        // Should be ok to enter staging mode again with the same partition expr
219        let replies = EnterStagingRegionsHandler
220            .handle(
221                &handler_context,
222                vec![EnterStagingRegion {
223                    region_id,
224                    partition_expr: PARTITION_EXPR.to_string(),
225                }],
226            )
227            .await
228            .unwrap();
229        let replies = replies.expect_enter_staging_regions_reply();
230        let reply = &replies[0];
231        assert!(reply.exists);
232        assert!(reply.error.is_none());
233        assert!(reply.ready);
234
235        // Should throw error if try to enter staging mode again with a different partition expr
236        let replies = EnterStagingRegionsHandler
237            .handle(
238                &handler_context,
239                vec![EnterStagingRegion {
240                    region_id,
241                    partition_expr: "".to_string(),
242                }],
243            )
244            .await
245            .unwrap();
246        let replies = replies.expect_enter_staging_regions_reply();
247        let reply = &replies[0];
248        assert!(reply.exists);
249        assert!(reply.error.is_some());
250        assert!(!reply.ready);
251    }
252}