datanode/heartbeat/handler/
enter_staging.rs1use common_meta::instruction::{
16 EnterStagingRegion, EnterStagingRegionReply, EnterStagingRegionsReply, InstructionReply,
17};
18use common_telemetry::{error, warn};
19use futures::future::join_all;
20use store_api::region_request::{EnterStagingRequest, RegionRequest};
21
22use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
23
24#[derive(Debug, Clone, Copy, Default)]
25pub struct EnterStagingRegionsHandler;
26
27#[async_trait::async_trait]
28impl InstructionHandler for EnterStagingRegionsHandler {
29 type Instruction = Vec<EnterStagingRegion>;
30
31 async fn handle(
32 &self,
33 ctx: &HandlerContext,
34 enter_staging: Self::Instruction,
35 ) -> Option<InstructionReply> {
36 let futures = enter_staging.into_iter().map(|enter_staging_region| {
37 Self::handle_enter_staging_region(ctx, enter_staging_region)
38 });
39 let results = join_all(futures).await;
40 Some(InstructionReply::EnterStagingRegions(
41 EnterStagingRegionsReply::new(results),
42 ))
43 }
44}
45
46impl EnterStagingRegionsHandler {
47 async fn handle_enter_staging_region(
48 ctx: &HandlerContext,
49 EnterStagingRegion {
50 region_id,
51 partition_expr,
52 }: EnterStagingRegion,
53 ) -> EnterStagingRegionReply {
54 common_telemetry::info!(
55 "Datanode received enter staging region: {}, partition_expr: {}",
56 region_id,
57 partition_expr
58 );
59 let Some(writable) = ctx.region_server.is_region_leader(region_id) else {
60 warn!("Region: {} is not found", region_id);
61 return EnterStagingRegionReply {
62 region_id,
63 ready: false,
64 exists: false,
65 error: None,
66 };
67 };
68 if !writable {
69 warn!("Region: {} is not writable", region_id);
70 return EnterStagingRegionReply {
71 region_id,
72 ready: false,
73 exists: true,
74 error: Some("Region is not writable".into()),
75 };
76 }
77
78 match ctx
79 .region_server
80 .handle_request(
81 region_id,
82 RegionRequest::EnterStaging(EnterStagingRequest { partition_expr }),
83 )
84 .await
85 {
86 Ok(_) => EnterStagingRegionReply {
87 region_id,
88 ready: true,
89 exists: true,
90 error: None,
91 },
92 Err(err) => {
93 error!(err; "Failed to enter staging region, region_id: {}", region_id);
94 EnterStagingRegionReply {
95 region_id,
96 ready: false,
97 exists: true,
98 error: Some(format!("{err:?}")),
99 }
100 }
101 }
102 }
103}
104
105#[cfg(test)]
106mod tests {
107 use std::sync::Arc;
108
109 use common_meta::instruction::EnterStagingRegion;
110 use mito2::config::MitoConfig;
111 use mito2::engine::MITO_ENGINE_NAME;
112 use mito2::test_util::{CreateRequestBuilder, TestEnv};
113 use store_api::path_utils::table_dir;
114 use store_api::region_engine::RegionRole;
115 use store_api::region_request::RegionRequest;
116 use store_api::storage::RegionId;
117
118 use crate::heartbeat::handler::enter_staging::EnterStagingRegionsHandler;
119 use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
120 use crate::region_server::RegionServer;
121 use crate::tests::{MockRegionEngine, mock_region_server};
122
123 const PARTITION_EXPR: &str = "partition_expr";
124
125 #[tokio::test]
126 async fn test_region_not_exist() {
127 let mut mock_region_server = mock_region_server();
128 let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
129 mock_region_server.register_engine(mock_engine);
130 let handler_context = HandlerContext::new_for_test(mock_region_server);
131 let region_id = RegionId::new(1024, 1);
132 let replies = EnterStagingRegionsHandler
133 .handle(
134 &handler_context,
135 vec![EnterStagingRegion {
136 region_id,
137 partition_expr: "".to_string(),
138 }],
139 )
140 .await
141 .unwrap();
142 let replies = replies.expect_enter_staging_regions_reply();
143 let reply = &replies[0];
144 assert!(!reply.exists);
145 assert!(reply.error.is_none());
146 assert!(!reply.ready);
147 }
148
149 #[tokio::test]
150 async fn test_region_not_writable() {
151 let mock_region_server = mock_region_server();
152 let region_id = RegionId::new(1024, 1);
153 let (mock_engine, _) =
154 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
155 region_engine.mock_role = Some(Some(RegionRole::Follower));
156 region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
157 });
158 mock_region_server.register_test_region(region_id, mock_engine);
159 let handler_context = HandlerContext::new_for_test(mock_region_server);
160 let replies = EnterStagingRegionsHandler
161 .handle(
162 &handler_context,
163 vec![EnterStagingRegion {
164 region_id,
165 partition_expr: "".to_string(),
166 }],
167 )
168 .await
169 .unwrap();
170 let replies = replies.expect_enter_staging_regions_reply();
171 let reply = &replies[0];
172 assert!(reply.exists);
173 assert!(reply.error.is_some());
174 assert!(!reply.ready);
175 }
176
177 async fn prepare_region(region_server: &RegionServer) {
178 let builder = CreateRequestBuilder::new();
179 let mut create_req = builder.build();
180 create_req.table_dir = table_dir("test", 1024);
181 let region_id = RegionId::new(1024, 1);
182 region_server
183 .handle_request(region_id, RegionRequest::Create(create_req))
184 .await
185 .unwrap();
186 }
187
188 #[tokio::test]
189 async fn test_enter_staging() {
190 let mut region_server = mock_region_server();
191 let region_id = RegionId::new(1024, 1);
192 let mut engine_env = TestEnv::new().await;
193 let engine = engine_env.create_engine(MitoConfig::default()).await;
194 region_server.register_engine(Arc::new(engine.clone()));
195 prepare_region(®ion_server).await;
196
197 let handler_context = HandlerContext::new_for_test(region_server);
198 let replies = EnterStagingRegionsHandler
199 .handle(
200 &handler_context,
201 vec![EnterStagingRegion {
202 region_id,
203 partition_expr: PARTITION_EXPR.to_string(),
204 }],
205 )
206 .await
207 .unwrap();
208 let replies = replies.expect_enter_staging_regions_reply();
209 let reply = &replies[0];
210 assert!(reply.exists);
211 assert!(reply.error.is_none());
212 assert!(reply.ready);
213
214 let replies = EnterStagingRegionsHandler
216 .handle(
217 &handler_context,
218 vec![EnterStagingRegion {
219 region_id,
220 partition_expr: PARTITION_EXPR.to_string(),
221 }],
222 )
223 .await
224 .unwrap();
225 let replies = replies.expect_enter_staging_regions_reply();
226 let reply = &replies[0];
227 assert!(reply.exists);
228 assert!(reply.error.is_none());
229 assert!(reply.ready);
230
231 let replies = EnterStagingRegionsHandler
233 .handle(
234 &handler_context,
235 vec![EnterStagingRegion {
236 region_id,
237 partition_expr: "".to_string(),
238 }],
239 )
240 .await
241 .unwrap();
242 let replies = replies.expect_enter_staging_regions_reply();
243 let reply = &replies[0];
244 assert!(reply.exists);
245 assert!(reply.error.is_some());
246 assert!(!reply.ready);
247 }
248}