datanode/heartbeat/handler/
remap_manifest.rs1use common_meta::instruction::{InstructionReply, RemapManifest, RemapManifestReply};
16use common_telemetry::{error, info, warn};
17use store_api::region_engine::RemapManifestsRequest;
18
19use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
20
21pub struct RemapManifestHandler;
22
23#[async_trait::async_trait]
24impl InstructionHandler for RemapManifestHandler {
25 type Instruction = RemapManifest;
26 async fn handle(
27 &self,
28 ctx: &HandlerContext,
29 request: Self::Instruction,
30 ) -> Option<InstructionReply> {
31 let RemapManifest {
32 region_id,
33 input_regions,
34 region_mapping,
35 new_partition_exprs,
36 } = request;
37 info!(
38 "Datanode received remap manifest request, region_id: {}, input_regions: {}, target_regions: {}",
39 region_id,
40 input_regions.len(),
41 new_partition_exprs.len()
42 );
43 let Some(leader) = ctx.region_server.is_region_leader(region_id) else {
44 warn!("Region: {} is not found", region_id);
45 return Some(InstructionReply::RemapManifest(RemapManifestReply {
46 exists: false,
47 manifest_paths: Default::default(),
48 error: None,
49 }));
50 };
51
52 if !leader {
53 warn!("Region: {} is not leader", region_id);
54 return Some(InstructionReply::RemapManifest(RemapManifestReply {
55 exists: true,
56 manifest_paths: Default::default(),
57 error: Some("Region is not leader".into()),
58 }));
59 }
60
61 let reply = match ctx
62 .region_server
63 .remap_manifests(RemapManifestsRequest {
64 region_id,
65 input_regions,
66 region_mapping,
67 new_partition_exprs,
68 })
69 .await
70 {
71 Ok(result) => InstructionReply::RemapManifest(RemapManifestReply {
72 exists: true,
73 manifest_paths: result.manifest_paths,
74 error: None,
75 }),
76 Err(e) => {
77 error!(
78 e;
79 "Remap manifests failed on datanode, region_id: {}",
80 region_id
81 );
82 InstructionReply::RemapManifest(RemapManifestReply {
83 exists: true,
84 manifest_paths: Default::default(),
85 error: Some(format!("{e:?}")),
86 })
87 }
88 };
89
90 Some(reply)
91 }
92}
93
94#[cfg(test)]
95mod tests {
96 use std::collections::HashMap;
97 use std::sync::Arc;
98
99 use common_meta::instruction::RemapManifest;
100 use common_meta::kv_backend::memory::MemoryKvBackend;
101 use datatypes::value::Value;
102 use mito2::config::MitoConfig;
103 use mito2::engine::MITO_ENGINE_NAME;
104 use mito2::test_util::{CreateRequestBuilder, TestEnv};
105 use partition::expr::{PartitionExpr, col};
106 use store_api::path_utils::table_dir;
107 use store_api::region_engine::RegionRole;
108 use store_api::region_request::{
109 EnterStagingRequest, RegionRequest, StagingPartitionDirective,
110 };
111 use store_api::storage::RegionId;
112
113 use crate::heartbeat::handler::remap_manifest::RemapManifestHandler;
114 use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
115 use crate::region_server::RegionServer;
116 use crate::tests::{MockRegionEngine, mock_region_server};
117
118 #[tokio::test]
119 async fn test_region_not_exist() {
120 let mut mock_region_server = mock_region_server();
121 let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
122 mock_region_server.register_engine(mock_engine);
123 let kv_backend = Arc::new(MemoryKvBackend::new());
124 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
125 let region_id = RegionId::new(1024, 1);
126 let reply = RemapManifestHandler
127 .handle(
128 &handler_context,
129 RemapManifest {
130 region_id,
131 input_regions: vec![],
132 region_mapping: HashMap::new(),
133 new_partition_exprs: HashMap::new(),
134 },
135 )
136 .await
137 .unwrap();
138 let reply = &reply.expect_remap_manifest_reply();
139 assert!(!reply.exists);
140 assert!(reply.error.is_none());
141 assert!(reply.manifest_paths.is_empty());
142 }
143
144 #[tokio::test]
145 async fn test_region_not_leader() {
146 let mock_region_server = mock_region_server();
147 let region_id = RegionId::new(1024, 1);
148 let (mock_engine, _) =
149 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
150 region_engine.mock_role = Some(Some(RegionRole::Follower));
151 region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
152 });
153 mock_region_server.register_test_region(region_id, mock_engine);
154 let kv_backend = Arc::new(MemoryKvBackend::new());
155 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
156 let reply = RemapManifestHandler
157 .handle(
158 &handler_context,
159 RemapManifest {
160 region_id,
161 input_regions: vec![],
162 region_mapping: HashMap::new(),
163 new_partition_exprs: HashMap::new(),
164 },
165 )
166 .await
167 .unwrap();
168 let reply = reply.expect_remap_manifest_reply();
169 assert!(reply.exists);
170 assert!(reply.error.is_some());
171 }
172
173 fn range_expr(col_name: &str, start: i64, end: i64) -> PartitionExpr {
174 col(col_name)
175 .gt_eq(Value::Int64(start))
176 .and(col(col_name).lt(Value::Int64(end)))
177 }
178
179 async fn prepare_region(region_server: &RegionServer) {
180 let region_specs = [
181 (RegionId::new(1024, 1), range_expr("x", 0, 50)),
182 (RegionId::new(1024, 2), range_expr("x", 50, 100)),
183 ];
184
185 for (region_id, partition_expr) in region_specs {
186 let builder = CreateRequestBuilder::new();
187 let mut create_req = builder.build();
188 create_req.table_dir = table_dir("test", 1024);
189 region_server
190 .handle_request(region_id, RegionRequest::Create(create_req))
191 .await
192 .unwrap();
193 region_server
194 .handle_request(
195 region_id,
196 RegionRequest::EnterStaging(EnterStagingRequest {
197 partition_directive: StagingPartitionDirective::UpdatePartitionExpr(
198 partition_expr.as_json_str().unwrap(),
199 ),
200 }),
201 )
202 .await
203 .unwrap();
204 }
205 }
206
207 #[tokio::test]
208 async fn test_remap_manifest() {
209 common_telemetry::init_default_ut_logging();
210 let mut region_server = mock_region_server();
211 let region_id = RegionId::new(1024, 1);
212 let mut engine_env = TestEnv::new().await;
213 let engine = engine_env.create_engine(MitoConfig::default()).await;
214 region_server.register_engine(Arc::new(engine.clone()));
215 prepare_region(®ion_server).await;
216
217 let kv_backend = Arc::new(MemoryKvBackend::new());
218 let handler_context = HandlerContext::new_for_test(region_server, kv_backend);
219 let region_id2 = RegionId::new(1024, 2);
220 let reply = RemapManifestHandler
221 .handle(
222 &handler_context,
223 RemapManifest {
224 region_id,
225 input_regions: vec![region_id, region_id2],
226 region_mapping: HashMap::from([
227 (region_id, vec![region_id]),
228 (region_id2, vec![region_id]),
229 ]),
230 new_partition_exprs: HashMap::from([(
231 region_id,
232 range_expr("x", 0, 100).as_json_str().unwrap(),
233 )]),
234 },
235 )
236 .await
237 .unwrap();
238 let reply = reply.expect_remap_manifest_reply();
239 assert!(reply.exists);
240 assert!(reply.error.is_none(), "{}", reply.error.unwrap());
241 assert_eq!(reply.manifest_paths.len(), 1);
242
243 let reply = RemapManifestHandler
245 .handle(
246 &handler_context,
247 RemapManifest {
248 region_id,
249 input_regions: vec![region_id],
250 region_mapping: HashMap::from([
251 (region_id, vec![region_id]),
252 (region_id2, vec![region_id]),
253 ]),
254 new_partition_exprs: HashMap::from([(
255 region_id,
256 range_expr("x", 0, 100).as_json_str().unwrap(),
257 )]),
258 },
259 )
260 .await
261 .unwrap();
262 let reply = reply.expect_remap_manifest_reply();
263 assert!(reply.exists);
264 assert!(reply.error.is_some());
265 assert!(reply.manifest_paths.is_empty());
266 }
267}