datanode/heartbeat/handler/
apply_staging_manifest.rs1use common_meta::instruction::{
16 ApplyStagingManifest, ApplyStagingManifestReply, ApplyStagingManifestsReply, InstructionReply,
17};
18use common_telemetry::{error, warn};
19use futures::future::join_all;
20use store_api::region_request::{ApplyStagingManifestRequest, RegionRequest};
21
22use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
23
24pub struct ApplyStagingManifestsHandler;
25
26#[async_trait::async_trait]
27impl InstructionHandler for ApplyStagingManifestsHandler {
28 type Instruction = Vec<ApplyStagingManifest>;
29 async fn handle(
30 &self,
31 ctx: &HandlerContext,
32 requests: Self::Instruction,
33 ) -> Option<InstructionReply> {
34 let results = join_all(
35 requests
36 .into_iter()
37 .map(|request| Self::handle_apply_staging_manifest(ctx, request)),
38 )
39 .await;
40 Some(InstructionReply::ApplyStagingManifests(
41 ApplyStagingManifestsReply::new(results),
42 ))
43 }
44}
45
46impl ApplyStagingManifestsHandler {
47 async fn handle_apply_staging_manifest(
48 ctx: &HandlerContext,
49 request: ApplyStagingManifest,
50 ) -> ApplyStagingManifestReply {
51 let ApplyStagingManifest {
52 region_id,
53 ref partition_expr,
54 central_region_id,
55 ref manifest_path,
56 } = request;
57 common_telemetry::info!(
58 "Datanode received apply staging manifest request, region_id: {}, central_region_id: {}, partition_expr: {}, manifest_path: {}",
59 region_id,
60 central_region_id,
61 partition_expr,
62 manifest_path
63 );
64 let Some(leader) = ctx.region_server.is_region_leader(region_id) else {
65 warn!("Region: {} is not found", region_id);
66 return ApplyStagingManifestReply {
67 region_id,
68 exists: false,
69 ready: false,
70 error: None,
71 };
72 };
73 if !leader {
74 warn!("Region: {} is not leader", region_id);
75 return ApplyStagingManifestReply {
76 region_id,
77 exists: true,
78 ready: false,
79 error: Some("Region is not leader".into()),
80 };
81 }
82
83 match ctx
84 .region_server
85 .handle_request(
86 region_id,
87 RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
88 partition_expr: partition_expr.clone(),
89 central_region_id,
90 manifest_path: manifest_path.clone(),
91 }),
92 )
93 .await
94 {
95 Ok(_) => ApplyStagingManifestReply {
96 region_id,
97 exists: true,
98 ready: true,
99 error: None,
100 },
101 Err(err) => {
102 error!(err; "Failed to apply staging manifest, region_id: {}", region_id);
103 ApplyStagingManifestReply {
104 region_id,
105 exists: true,
106 ready: false,
107 error: Some(format!("{err:?}")),
108 }
109 }
110 }
111 }
112}
113
114#[cfg(test)]
115mod tests {
116 use std::collections::HashMap;
117 use std::sync::Arc;
118
119 use common_meta::instruction::RemapManifest;
120 use common_meta::kv_backend::memory::MemoryKvBackend;
121 use datatypes::value::Value;
122 use mito2::config::MitoConfig;
123 use mito2::engine::MITO_ENGINE_NAME;
124 use mito2::test_util::{CreateRequestBuilder, TestEnv};
125 use partition::expr::{PartitionExpr, col};
126 use store_api::path_utils::table_dir;
127 use store_api::region_engine::RegionRole;
128 use store_api::region_request::EnterStagingRequest;
129 use store_api::storage::RegionId;
130
131 use super::*;
132 use crate::heartbeat::handler::remap_manifest::RemapManifestHandler;
133 use crate::region_server::RegionServer;
134 use crate::tests::{MockRegionEngine, mock_region_server};
135
136 #[tokio::test]
137 async fn test_region_not_exist() {
138 let mut mock_region_server = mock_region_server();
139 let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
140 mock_region_server.register_engine(mock_engine);
141 let kv_backend = Arc::new(MemoryKvBackend::new());
142 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
143 let region_id = RegionId::new(1024, 1);
144 let reply = ApplyStagingManifestsHandler
145 .handle(
146 &handler_context,
147 vec![ApplyStagingManifest {
148 region_id,
149 partition_expr: "".to_string(),
150 central_region_id: RegionId::new(1024, 9999), manifest_path: "".to_string(),
152 }],
153 )
154 .await
155 .unwrap();
156 let replies = reply.expect_apply_staging_manifests_reply();
157 let reply = &replies[0];
158 assert!(!reply.exists);
159 assert!(!reply.ready);
160 assert!(reply.error.is_none());
161 }
162
163 #[tokio::test]
164 async fn test_region_not_leader() {
165 let mock_region_server = mock_region_server();
166 let region_id = RegionId::new(1024, 1);
167 let (mock_engine, _) =
168 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
169 region_engine.mock_role = Some(Some(RegionRole::Follower));
170 region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
171 });
172 mock_region_server.register_test_region(region_id, mock_engine);
173 let kv_backend = Arc::new(MemoryKvBackend::new());
174 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
175 let region_id = RegionId::new(1024, 1);
176 let reply = ApplyStagingManifestsHandler
177 .handle(
178 &handler_context,
179 vec![ApplyStagingManifest {
180 region_id,
181 partition_expr: "".to_string(),
182 central_region_id: RegionId::new(1024, 2),
183 manifest_path: "".to_string(),
184 }],
185 )
186 .await
187 .unwrap();
188 let replies = reply.expect_apply_staging_manifests_reply();
189 let reply = &replies[0];
190 assert!(reply.exists);
191 assert!(!reply.ready);
192 assert!(reply.error.is_some());
193 }
194
195 fn range_expr(col_name: &str, start: i64, end: i64) -> PartitionExpr {
196 col(col_name)
197 .gt_eq(Value::Int64(start))
198 .and(col(col_name).lt(Value::Int64(end)))
199 }
200
201 async fn prepare_region(region_server: &RegionServer) {
202 let region_specs = [
203 (RegionId::new(1024, 1), range_expr("x", 0, 49)),
204 (RegionId::new(1024, 2), range_expr("x", 49, 100)),
205 ];
206
207 for (region_id, partition_expr) in region_specs {
208 let builder = CreateRequestBuilder::new();
209 let mut create_req = builder.build();
210 create_req.table_dir = table_dir("test", 1024);
211 region_server
212 .handle_request(region_id, RegionRequest::Create(create_req))
213 .await
214 .unwrap();
215 region_server
216 .handle_request(
217 region_id,
218 RegionRequest::EnterStaging(EnterStagingRequest {
219 partition_expr: partition_expr.as_json_str().unwrap(),
220 }),
221 )
222 .await
223 .unwrap();
224 }
225 }
226
227 #[tokio::test]
228 async fn test_apply_staging_manifest() {
229 common_telemetry::init_default_ut_logging();
230 let mut region_server = mock_region_server();
231 let region_id = RegionId::new(1024, 1);
232 let mut engine_env = TestEnv::new().await;
233 let engine = engine_env.create_engine(MitoConfig::default()).await;
234 region_server.register_engine(Arc::new(engine.clone()));
235 prepare_region(®ion_server).await;
236
237 let kv_backend = Arc::new(MemoryKvBackend::new());
238 let handler_context = HandlerContext::new_for_test(region_server, kv_backend);
239 let region_id2 = RegionId::new(1024, 2);
240 let reply = RemapManifestHandler
241 .handle(
242 &handler_context,
243 RemapManifest {
244 region_id,
245 input_regions: vec![region_id, region_id2],
246 region_mapping: HashMap::from([
247 (region_id, vec![region_id]),
249 (region_id2, vec![region_id, region_id2]),
251 ]),
252 new_partition_exprs: HashMap::from([
253 (region_id, range_expr("x", 0, 49).as_json_str().unwrap()),
254 (region_id2, range_expr("x", 49, 100).as_json_str().unwrap()),
255 ]),
256 },
257 )
258 .await
259 .unwrap();
260 let reply = reply.expect_remap_manifest_reply();
261 assert!(reply.exists);
262 assert!(reply.error.is_none(), "{}", reply.error.unwrap());
263 assert_eq!(reply.manifest_paths.len(), 2);
264 let manifest_path_1 = reply.manifest_paths[®ion_id].clone();
265 let manifest_path_2 = reply.manifest_paths[®ion_id2].clone();
266
267 let reply = ApplyStagingManifestsHandler
268 .handle(
269 &handler_context,
270 vec![ApplyStagingManifest {
271 region_id,
272 partition_expr: range_expr("x", 0, 49).as_json_str().unwrap(),
273 central_region_id: region_id,
274 manifest_path: manifest_path_1,
275 }],
276 )
277 .await
278 .unwrap();
279 let replies = reply.expect_apply_staging_manifests_reply();
280 let reply = &replies[0];
281 assert!(reply.exists);
282 assert!(reply.ready);
283 assert!(reply.error.is_none());
284
285 let reply = ApplyStagingManifestsHandler
287 .handle(
288 &handler_context,
289 vec![ApplyStagingManifest {
290 region_id: region_id2,
291 partition_expr: range_expr("x", 50, 100).as_json_str().unwrap(),
292 central_region_id: region_id,
293 manifest_path: manifest_path_2,
294 }],
295 )
296 .await
297 .unwrap();
298 let replies = reply.expect_apply_staging_manifests_reply();
299 let reply = &replies[0];
300 assert!(reply.exists);
301 assert!(!reply.ready);
302 assert!(reply.error.is_some());
303 }
304}