datanode/heartbeat/handler/
enter_staging.rs1use common_meta::instruction::{
16 EnterStagingRegion, EnterStagingRegionReply, EnterStagingRegionsReply, InstructionReply,
17};
18use common_telemetry::{error, warn};
19use futures::future::join_all;
20use store_api::region_request::{EnterStagingRequest, RegionRequest};
21
22use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
23
24#[derive(Debug, Clone, Copy, Default)]
25pub struct EnterStagingRegionsHandler;
26
27#[async_trait::async_trait]
28impl InstructionHandler for EnterStagingRegionsHandler {
29 type Instruction = Vec<EnterStagingRegion>;
30
31 async fn handle(
32 &self,
33 ctx: &HandlerContext,
34 enter_staging: Self::Instruction,
35 ) -> Option<InstructionReply> {
36 let futures = enter_staging.into_iter().map(|enter_staging_region| {
37 Self::handle_enter_staging_region(ctx, enter_staging_region)
38 });
39 let results = join_all(futures).await;
40 Some(InstructionReply::EnterStagingRegions(
41 EnterStagingRegionsReply::new(results),
42 ))
43 }
44}
45
46impl EnterStagingRegionsHandler {
47 async fn handle_enter_staging_region(
48 ctx: &HandlerContext,
49 EnterStagingRegion {
50 region_id,
51 partition_expr,
52 }: EnterStagingRegion,
53 ) -> EnterStagingRegionReply {
54 common_telemetry::info!(
55 "Datanode received enter staging region: {}, partition_expr: {}",
56 region_id,
57 partition_expr
58 );
59 let Some(writable) = ctx.region_server.is_region_leader(region_id) else {
60 warn!("Region: {} is not found", region_id);
61 return EnterStagingRegionReply {
62 region_id,
63 ready: false,
64 exists: false,
65 error: None,
66 };
67 };
68 if !writable {
69 warn!("Region: {} is not writable", region_id);
70 return EnterStagingRegionReply {
71 region_id,
72 ready: false,
73 exists: true,
74 error: Some("Region is not writable".into()),
75 };
76 }
77
78 match ctx
79 .region_server
80 .handle_request(
81 region_id,
82 RegionRequest::EnterStaging(EnterStagingRequest { partition_expr }),
83 )
84 .await
85 {
86 Ok(_) => EnterStagingRegionReply {
87 region_id,
88 ready: true,
89 exists: true,
90 error: None,
91 },
92 Err(err) => {
93 error!(err; "Failed to enter staging region, region_id: {}", region_id);
94 EnterStagingRegionReply {
95 region_id,
96 ready: false,
97 exists: true,
98 error: Some(format!("{err:?}")),
99 }
100 }
101 }
102 }
103}
104
105#[cfg(test)]
106mod tests {
107 use std::sync::Arc;
108
109 use common_meta::instruction::EnterStagingRegion;
110 use common_meta::kv_backend::memory::MemoryKvBackend;
111 use mito2::config::MitoConfig;
112 use mito2::engine::MITO_ENGINE_NAME;
113 use mito2::test_util::{CreateRequestBuilder, TestEnv};
114 use store_api::path_utils::table_dir;
115 use store_api::region_engine::RegionRole;
116 use store_api::region_request::RegionRequest;
117 use store_api::storage::RegionId;
118
119 use crate::heartbeat::handler::enter_staging::EnterStagingRegionsHandler;
120 use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
121 use crate::region_server::RegionServer;
122 use crate::tests::{MockRegionEngine, mock_region_server};
123
124 const PARTITION_EXPR: &str = "partition_expr";
125
126 #[tokio::test]
127 async fn test_region_not_exist() {
128 let mut mock_region_server = mock_region_server();
129 let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
130 mock_region_server.register_engine(mock_engine);
131 let kv_backend = Arc::new(MemoryKvBackend::new());
132 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
133 let region_id = RegionId::new(1024, 1);
134 let replies = EnterStagingRegionsHandler
135 .handle(
136 &handler_context,
137 vec![EnterStagingRegion {
138 region_id,
139 partition_expr: "".to_string(),
140 }],
141 )
142 .await
143 .unwrap();
144 let replies = replies.expect_enter_staging_regions_reply();
145 let reply = &replies[0];
146 assert!(!reply.exists);
147 assert!(reply.error.is_none());
148 assert!(!reply.ready);
149 }
150
151 #[tokio::test]
152 async fn test_region_not_writable() {
153 let mock_region_server = mock_region_server();
154 let region_id = RegionId::new(1024, 1);
155 let (mock_engine, _) =
156 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
157 region_engine.mock_role = Some(Some(RegionRole::Follower));
158 region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
159 });
160 mock_region_server.register_test_region(region_id, mock_engine);
161 let kv_backend = Arc::new(MemoryKvBackend::new());
162 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
163 let replies = EnterStagingRegionsHandler
164 .handle(
165 &handler_context,
166 vec![EnterStagingRegion {
167 region_id,
168 partition_expr: "".to_string(),
169 }],
170 )
171 .await
172 .unwrap();
173 let replies = replies.expect_enter_staging_regions_reply();
174 let reply = &replies[0];
175 assert!(reply.exists);
176 assert!(reply.error.is_some());
177 assert!(!reply.ready);
178 }
179
180 async fn prepare_region(region_server: &RegionServer) {
181 let builder = CreateRequestBuilder::new();
182 let mut create_req = builder.build();
183 create_req.table_dir = table_dir("test", 1024);
184 let region_id = RegionId::new(1024, 1);
185 region_server
186 .handle_request(region_id, RegionRequest::Create(create_req))
187 .await
188 .unwrap();
189 }
190
191 #[tokio::test]
192 async fn test_enter_staging() {
193 let mut region_server = mock_region_server();
194 let region_id = RegionId::new(1024, 1);
195 let mut engine_env = TestEnv::new().await;
196 let engine = engine_env.create_engine(MitoConfig::default()).await;
197 region_server.register_engine(Arc::new(engine.clone()));
198 prepare_region(®ion_server).await;
199
200 let kv_backend = Arc::new(MemoryKvBackend::new());
201 let handler_context = HandlerContext::new_for_test(region_server, kv_backend);
202 let replies = EnterStagingRegionsHandler
203 .handle(
204 &handler_context,
205 vec![EnterStagingRegion {
206 region_id,
207 partition_expr: PARTITION_EXPR.to_string(),
208 }],
209 )
210 .await
211 .unwrap();
212 let replies = replies.expect_enter_staging_regions_reply();
213 let reply = &replies[0];
214 assert!(reply.exists);
215 assert!(reply.error.is_none());
216 assert!(reply.ready);
217
218 let replies = EnterStagingRegionsHandler
220 .handle(
221 &handler_context,
222 vec![EnterStagingRegion {
223 region_id,
224 partition_expr: PARTITION_EXPR.to_string(),
225 }],
226 )
227 .await
228 .unwrap();
229 let replies = replies.expect_enter_staging_regions_reply();
230 let reply = &replies[0];
231 assert!(reply.exists);
232 assert!(reply.error.is_none());
233 assert!(reply.ready);
234
235 let replies = EnterStagingRegionsHandler
237 .handle(
238 &handler_context,
239 vec![EnterStagingRegion {
240 region_id,
241 partition_expr: "".to_string(),
242 }],
243 )
244 .await
245 .unwrap();
246 let replies = replies.expect_enter_staging_regions_reply();
247 let reply = &replies[0];
248 assert!(reply.exists);
249 assert!(reply.error.is_some());
250 assert!(!reply.ready);
251 }
252}