datanode/heartbeat/handler/
enter_staging.rs1use common_meta::instruction::{
16 EnterStagingRegion, EnterStagingRegionReply, EnterStagingRegionsReply, InstructionReply,
17 StagingPartitionDirective as InstructionStagingPartitionDirective,
18};
19use common_telemetry::{error, warn};
20use futures::future::join_all;
21use store_api::region_request::{
22 EnterStagingRequest, RegionRequest,
23 StagingPartitionDirective as RequestStagingPartitionDirective,
24};
25
26use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
27
28#[derive(Debug, Clone, Copy, Default)]
29pub struct EnterStagingRegionsHandler;
30
31#[async_trait::async_trait]
32impl InstructionHandler for EnterStagingRegionsHandler {
33 type Instruction = Vec<EnterStagingRegion>;
34
35 async fn handle(
36 &self,
37 ctx: &HandlerContext,
38 enter_staging: Self::Instruction,
39 ) -> Option<InstructionReply> {
40 let futures = enter_staging.into_iter().map(|enter_staging_region| {
41 Self::handle_enter_staging_region(ctx, enter_staging_region)
42 });
43 let results = join_all(futures).await;
44 Some(InstructionReply::EnterStagingRegions(
45 EnterStagingRegionsReply::new(results),
46 ))
47 }
48}
49
50impl EnterStagingRegionsHandler {
51 async fn handle_enter_staging_region(
52 ctx: &HandlerContext,
53 EnterStagingRegion {
54 region_id,
55 partition_directive,
56 }: EnterStagingRegion,
57 ) -> EnterStagingRegionReply {
58 let Some(writable) = ctx.region_server.is_region_leader(region_id) else {
59 warn!("Region: {} is not found", region_id);
60 return EnterStagingRegionReply {
61 region_id,
62 ready: false,
63 exists: false,
64 error: None,
65 };
66 };
67 if !writable {
68 warn!("Region: {} is not writable", region_id);
69 return EnterStagingRegionReply {
70 region_id,
71 ready: false,
72 exists: true,
73 error: Some("Region is not writable".into()),
74 };
75 }
76
77 common_telemetry::info!("Datanode received enter staging region: {}", region_id);
78
79 let partition_directive = match partition_directive {
80 InstructionStagingPartitionDirective::UpdatePartitionExpr(expr) => {
81 RequestStagingPartitionDirective::UpdatePartitionExpr(expr)
82 }
83 InstructionStagingPartitionDirective::RejectAllWrites => {
84 RequestStagingPartitionDirective::RejectAllWrites
85 }
86 };
87
88 match ctx
89 .region_server
90 .handle_request(
91 region_id,
92 RegionRequest::EnterStaging(EnterStagingRequest {
93 partition_directive,
94 }),
95 )
96 .await
97 {
98 Ok(_) => EnterStagingRegionReply {
99 region_id,
100 ready: true,
101 exists: true,
102 error: None,
103 },
104 Err(err) => {
105 error!(err; "Failed to enter staging region, region_id: {}", region_id);
106 EnterStagingRegionReply {
107 region_id,
108 ready: false,
109 exists: true,
110 error: Some(format!("{err:?}")),
111 }
112 }
113 }
114 }
115}
116
117#[cfg(test)]
118mod tests {
119 use std::sync::Arc;
120
121 use common_meta::instruction::{
122 EnterStagingRegion, StagingPartitionDirective as InstructionStagingPartitionDirective,
123 };
124 use common_meta::kv_backend::memory::MemoryKvBackend;
125 use mito2::config::MitoConfig;
126 use mito2::engine::MITO_ENGINE_NAME;
127 use mito2::test_util::{CreateRequestBuilder, TestEnv};
128 use store_api::path_utils::table_dir;
129 use store_api::region_engine::RegionRole;
130 use store_api::region_request::RegionRequest;
131 use store_api::storage::RegionId;
132
133 use crate::heartbeat::handler::enter_staging::EnterStagingRegionsHandler;
134 use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
135 use crate::region_server::RegionServer;
136 use crate::tests::{MockRegionEngine, mock_region_server};
137
138 const PARTITION_EXPR: &str = "partition_expr";
139
140 #[tokio::test]
141 async fn test_region_not_exist() {
142 let mut mock_region_server = mock_region_server();
143 let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
144 mock_region_server.register_engine(mock_engine);
145 let kv_backend = Arc::new(MemoryKvBackend::new());
146 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
147 let region_id = RegionId::new(1024, 1);
148 let replies = EnterStagingRegionsHandler
149 .handle(
150 &handler_context,
151 vec![EnterStagingRegion {
152 region_id,
153 partition_directive: InstructionStagingPartitionDirective::UpdatePartitionExpr(
154 "".to_string(),
155 ),
156 }],
157 )
158 .await
159 .unwrap();
160 let replies = replies.expect_enter_staging_regions_reply();
161 let reply = &replies[0];
162 assert!(!reply.exists);
163 assert!(reply.error.is_none());
164 assert!(!reply.ready);
165 }
166
167 #[tokio::test]
168 async fn test_region_not_writable() {
169 let mock_region_server = mock_region_server();
170 let region_id = RegionId::new(1024, 1);
171 let (mock_engine, _) =
172 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
173 region_engine.mock_role = Some(Some(RegionRole::Follower));
174 region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
175 });
176 mock_region_server.register_test_region(region_id, mock_engine);
177 let kv_backend = Arc::new(MemoryKvBackend::new());
178 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
179 let replies = EnterStagingRegionsHandler
180 .handle(
181 &handler_context,
182 vec![EnterStagingRegion {
183 region_id,
184 partition_directive: InstructionStagingPartitionDirective::UpdatePartitionExpr(
185 "".to_string(),
186 ),
187 }],
188 )
189 .await
190 .unwrap();
191 let replies = replies.expect_enter_staging_regions_reply();
192 let reply = &replies[0];
193 assert!(reply.exists);
194 assert!(reply.error.is_some());
195 assert!(!reply.ready);
196 }
197
198 async fn prepare_region(region_server: &RegionServer) {
199 let builder = CreateRequestBuilder::new();
200 let mut create_req = builder.build();
201 create_req.table_dir = table_dir("test", 1024);
202 let region_id = RegionId::new(1024, 1);
203 region_server
204 .handle_request(region_id, RegionRequest::Create(create_req))
205 .await
206 .unwrap();
207 }
208
209 #[tokio::test]
210 async fn test_enter_staging() {
211 let mut region_server = mock_region_server();
212 let region_id = RegionId::new(1024, 1);
213 let mut engine_env = TestEnv::new().await;
214 let engine = engine_env.create_engine(MitoConfig::default()).await;
215 region_server.register_engine(Arc::new(engine.clone()));
216 prepare_region(®ion_server).await;
217
218 let kv_backend = Arc::new(MemoryKvBackend::new());
219 let handler_context = HandlerContext::new_for_test(region_server, kv_backend);
220 let replies = EnterStagingRegionsHandler
221 .handle(
222 &handler_context,
223 vec![EnterStagingRegion {
224 region_id,
225 partition_directive: InstructionStagingPartitionDirective::UpdatePartitionExpr(
226 PARTITION_EXPR.to_string(),
227 ),
228 }],
229 )
230 .await
231 .unwrap();
232 let replies = replies.expect_enter_staging_regions_reply();
233 let reply = &replies[0];
234 assert!(reply.exists);
235 assert!(reply.error.is_none());
236 assert!(reply.ready);
237
238 let replies = EnterStagingRegionsHandler
240 .handle(
241 &handler_context,
242 vec![EnterStagingRegion {
243 region_id,
244 partition_directive: InstructionStagingPartitionDirective::UpdatePartitionExpr(
245 PARTITION_EXPR.to_string(),
246 ),
247 }],
248 )
249 .await
250 .unwrap();
251 let replies = replies.expect_enter_staging_regions_reply();
252 let reply = &replies[0];
253 assert!(reply.exists);
254 assert!(reply.error.is_none());
255 assert!(reply.ready);
256
257 let replies = EnterStagingRegionsHandler
259 .handle(
260 &handler_context,
261 vec![EnterStagingRegion {
262 region_id,
263 partition_directive: InstructionStagingPartitionDirective::UpdatePartitionExpr(
264 "".to_string(),
265 ),
266 }],
267 )
268 .await
269 .unwrap();
270 let replies = replies.expect_enter_staging_regions_reply();
271 let reply = &replies[0];
272 assert!(reply.exists);
273 assert!(reply.error.is_some());
274 assert!(!reply.ready);
275 }
276
277 #[tokio::test]
278 async fn test_enter_staging_reject_all_writes() {
279 let mut region_server = mock_region_server();
280 let region_id = RegionId::new(1024, 1);
281 let mut engine_env = TestEnv::new().await;
282 let engine = engine_env.create_engine(MitoConfig::default()).await;
283 region_server.register_engine(Arc::new(engine.clone()));
284 prepare_region(®ion_server).await;
285
286 let kv_backend = Arc::new(MemoryKvBackend::new());
287 let handler_context = HandlerContext::new_for_test(region_server, kv_backend);
288 let replies = EnterStagingRegionsHandler
289 .handle(
290 &handler_context,
291 vec![EnterStagingRegion {
292 region_id,
293 partition_directive: InstructionStagingPartitionDirective::RejectAllWrites,
294 }],
295 )
296 .await
297 .unwrap();
298 let replies = replies.expect_enter_staging_regions_reply();
299 let reply = &replies[0];
300 assert!(reply.exists);
301 assert!(reply.ready);
302 assert!(reply.error.is_none());
303 }
304}