datanode/heartbeat/handler/
upgrade_region.rs1use common_meta::instruction::{InstructionReply, UpgradeRegion, UpgradeRegionReply};
16use common_telemetry::{info, warn};
17use futures_util::future::BoxFuture;
18use store_api::region_request::{RegionCatchupRequest, RegionRequest, ReplayCheckpoint};
19
20use crate::heartbeat::handler::HandlerContext;
21use crate::heartbeat::task_tracker::WaitResult;
22
23impl HandlerContext {
24 pub(crate) fn handle_upgrade_region_instruction(
25 self,
26 UpgradeRegion {
27 region_id,
28 last_entry_id,
29 metadata_last_entry_id,
30 replay_timeout,
31 location_id,
32 replay_entry_id,
33 metadata_replay_entry_id,
34 }: UpgradeRegion,
35 ) -> BoxFuture<'static, Option<InstructionReply>> {
36 Box::pin(async move {
37 let Some(writable) = self.region_server.is_region_leader(region_id) else {
38 return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
39 ready: false,
40 exists: false,
41 error: None,
42 }));
43 };
44
45 if writable {
46 return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
47 ready: true,
48 exists: true,
49 error: None,
50 }));
51 }
52
53 let region_server_moved = self.region_server.clone();
54
55 let checkpoint = match (replay_entry_id, metadata_replay_entry_id) {
56 (Some(entry_id), metadata_entry_id) => Some(ReplayCheckpoint {
57 entry_id,
58 metadata_entry_id,
59 }),
60 _ => None,
61 };
62
63 let register_result = self
66 .catchup_tasks
67 .try_register(
68 region_id,
69 Box::pin(async move {
70 info!("Executing region: {region_id} catchup to: last entry id {last_entry_id:?}");
71 region_server_moved
72 .handle_request(
73 region_id,
74 RegionRequest::Catchup(RegionCatchupRequest {
75 set_writable: true,
76 entry_id: last_entry_id,
77 metadata_entry_id: metadata_last_entry_id,
78 location_id,
79 checkpoint,
80 }),
81 )
82 .await?;
83
84 Ok(())
85 }),
86 )
87 .await;
88
89 if register_result.is_busy() {
90 warn!("Another catchup task is running for the region: {region_id}");
91 }
92
93 let Some(replay_timeout) = replay_timeout else {
95 return Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
96 ready: false,
97 exists: true,
98 error: None,
99 }));
100 };
101
102 let mut watcher = register_result.into_watcher();
104 let result = self.catchup_tasks.wait(&mut watcher, replay_timeout).await;
105
106 match result {
107 WaitResult::Timeout => Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
108 ready: false,
109 exists: true,
110 error: None,
111 })),
112 WaitResult::Finish(Ok(_)) => {
113 Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
114 ready: true,
115 exists: true,
116 error: None,
117 }))
118 }
119 WaitResult::Finish(Err(err)) => {
120 Some(InstructionReply::UpgradeRegion(UpgradeRegionReply {
121 ready: false,
122 exists: true,
123 error: Some(format!("{err:?}")),
124 }))
125 }
126 }
127 })
128 }
129}
130
131#[cfg(test)]
132mod tests {
133 use std::assert_matches::assert_matches;
134 use std::time::Duration;
135
136 use common_meta::instruction::{InstructionReply, UpgradeRegion};
137 use mito2::engine::MITO_ENGINE_NAME;
138 use store_api::region_engine::RegionRole;
139 use store_api::storage::RegionId;
140 use tokio::time::Instant;
141
142 use crate::error;
143 use crate::heartbeat::handler::HandlerContext;
144 use crate::tests::{mock_region_server, MockRegionEngine};
145
146 #[tokio::test]
147 async fn test_region_not_exist() {
148 let mut mock_region_server = mock_region_server();
149 let (mock_engine, _) = MockRegionEngine::new(MITO_ENGINE_NAME);
150 mock_region_server.register_engine(mock_engine);
151
152 let handler_context = HandlerContext::new_for_test(mock_region_server);
153
154 let region_id = RegionId::new(1024, 1);
155 let waits = vec![None, Some(Duration::from_millis(100u64))];
156
157 for replay_timeout in waits {
158 let reply = handler_context
159 .clone()
160 .handle_upgrade_region_instruction(UpgradeRegion {
161 region_id,
162 replay_timeout,
163 ..Default::default()
164 })
165 .await;
166 assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
167
168 if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
169 assert!(!reply.exists);
170 assert!(reply.error.is_none());
171 }
172 }
173 }
174
175 #[tokio::test]
176 async fn test_region_writable() {
177 let mock_region_server = mock_region_server();
178 let region_id = RegionId::new(1024, 1);
179
180 let (mock_engine, _) =
181 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
182 region_engine.mock_role = Some(Some(RegionRole::Leader));
183 region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
184 unreachable!();
186 }));
187 });
188 mock_region_server.register_test_region(region_id, mock_engine);
189
190 let handler_context = HandlerContext::new_for_test(mock_region_server);
191
192 let waits = vec![None, Some(Duration::from_millis(100u64))];
193
194 for replay_timeout in waits {
195 let reply = handler_context
196 .clone()
197 .handle_upgrade_region_instruction(UpgradeRegion {
198 region_id,
199 replay_timeout,
200 ..Default::default()
201 })
202 .await;
203 assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
204
205 if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
206 assert!(reply.ready);
207 assert!(reply.exists);
208 assert!(reply.error.is_none());
209 }
210 }
211 }
212
213 #[tokio::test]
214 async fn test_region_not_ready() {
215 let mock_region_server = mock_region_server();
216 let region_id = RegionId::new(1024, 1);
217
218 let (mock_engine, _) =
219 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
220 region_engine.mock_role = Some(Some(RegionRole::Follower));
222 region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
223 region_engine.handle_request_delay = Some(Duration::from_secs(100));
225 });
226 mock_region_server.register_test_region(region_id, mock_engine);
227
228 let handler_context = HandlerContext::new_for_test(mock_region_server);
229
230 let waits = vec![None, Some(Duration::from_millis(100u64))];
231
232 for replay_timeout in waits {
233 let reply = handler_context
234 .clone()
235 .handle_upgrade_region_instruction(UpgradeRegion {
236 region_id,
237 replay_timeout,
238 ..Default::default()
239 })
240 .await;
241 assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
242
243 if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
244 assert!(!reply.ready);
245 assert!(reply.exists);
246 assert!(reply.error.is_none());
247 }
248 }
249 }
250
251 #[tokio::test]
252 async fn test_region_not_ready_with_retry() {
253 let mock_region_server = mock_region_server();
254 let region_id = RegionId::new(1024, 1);
255
256 let (mock_engine, _) =
257 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
258 region_engine.mock_role = Some(Some(RegionRole::Follower));
260 region_engine.handle_request_mock_fn = Some(Box::new(|_, _| Ok(0)));
261 region_engine.handle_request_delay = Some(Duration::from_millis(300));
263 });
264 mock_region_server.register_test_region(region_id, mock_engine);
265
266 let waits = vec![
267 Some(Duration::from_millis(100u64)),
268 Some(Duration::from_millis(100u64)),
269 ];
270
271 let handler_context = HandlerContext::new_for_test(mock_region_server);
272
273 for replay_timeout in waits {
274 let reply = handler_context
275 .clone()
276 .handle_upgrade_region_instruction(UpgradeRegion {
277 region_id,
278 replay_timeout,
279 ..Default::default()
280 })
281 .await;
282 assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
283
284 if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
285 assert!(!reply.ready);
286 assert!(reply.exists);
287 assert!(reply.error.is_none());
288 }
289 }
290
291 let timer = Instant::now();
292 let reply = handler_context
293 .handle_upgrade_region_instruction(UpgradeRegion {
294 region_id,
295 replay_timeout: Some(Duration::from_millis(500)),
296 ..Default::default()
297 })
298 .await;
299 assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
300 assert!(timer.elapsed().as_millis() < 300);
302
303 if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
304 assert!(reply.ready);
305 assert!(reply.exists);
306 assert!(reply.error.is_none());
307 }
308 }
309
310 #[tokio::test]
311 async fn test_region_error() {
312 let mock_region_server = mock_region_server();
313 let region_id = RegionId::new(1024, 1);
314
315 let (mock_engine, _) =
316 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, |region_engine| {
317 region_engine.mock_role = Some(Some(RegionRole::Follower));
319 region_engine.handle_request_mock_fn = Some(Box::new(|_, _| {
320 error::UnexpectedSnafu {
321 violated: "mock_error".to_string(),
322 }
323 .fail()
324 }));
325 region_engine.handle_request_delay = Some(Duration::from_millis(100));
327 });
328 mock_region_server.register_test_region(region_id, mock_engine);
329
330 let handler_context = HandlerContext::new_for_test(mock_region_server);
331
332 let reply = handler_context
333 .clone()
334 .handle_upgrade_region_instruction(UpgradeRegion {
335 region_id,
336 ..Default::default()
337 })
338 .await;
339 assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
340
341 if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
343 assert!(!reply.ready);
344 assert!(reply.exists);
345 assert!(reply.error.is_none());
346 }
347
348 let reply = handler_context
349 .clone()
350 .handle_upgrade_region_instruction(UpgradeRegion {
351 region_id,
352 replay_timeout: Some(Duration::from_millis(200)),
353 ..Default::default()
354 })
355 .await;
356 assert_matches!(reply, Some(InstructionReply::UpgradeRegion(_)));
357
358 if let InstructionReply::UpgradeRegion(reply) = reply.unwrap() {
359 assert!(!reply.ready);
360 assert!(reply.exists);
361 assert!(reply.error.is_some());
362 assert!(reply.error.unwrap().contains("mock_error"));
363 }
364 }
365}