datanode/heartbeat/handler/
flush_region.rs1use std::time::Instant;
16
17use common_meta::instruction::{
18 FlushErrorStrategy, FlushRegionReply, FlushRegions, FlushStrategy, InstructionReply,
19};
20use common_telemetry::{debug, warn};
21use store_api::region_request::{RegionFlushReason, RegionFlushRequest, RegionRequest};
22use store_api::storage::RegionId;
23
24use crate::error::{self, RegionNotFoundSnafu, RegionNotReadySnafu, Result, UnexpectedSnafu};
25use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
26
27pub struct FlushRegionsHandler;
28
29#[async_trait::async_trait]
30impl InstructionHandler for FlushRegionsHandler {
31 type Instruction = FlushRegions;
32
33 async fn handle(
34 &self,
35 ctx: &HandlerContext,
36 flush_regions: FlushRegions,
37 ) -> Option<InstructionReply> {
38 let start_time = Instant::now();
39 let strategy = flush_regions.strategy;
40 let region_ids = flush_regions.region_ids;
41 let error_strategy = flush_regions.error_strategy;
42 let reason = flush_regions.reason;
43
44 let reply = if matches!(strategy, FlushStrategy::Async) {
45 ctx.handle_flush_hint(region_ids, reason).await;
47 None
48 } else {
49 let reply = ctx
51 .handle_flush_sync(region_ids, error_strategy, reason)
52 .await;
53 Some(InstructionReply::FlushRegions(reply))
54 };
55
56 let elapsed = start_time.elapsed();
57 debug!(
58 "FlushRegions strategy: {:?}, elapsed: {:?}, reply: {:?}",
59 strategy, elapsed, reply
60 );
61
62 reply
63 }
64}
65
66impl HandlerContext {
67 async fn perform_region_flush(
69 &self,
70 region_id: RegionId,
71 reason: Option<RegionFlushReason>,
72 ) -> Result<()> {
73 let request = RegionRequest::Flush(RegionFlushRequest {
74 reason,
75 ..Default::default()
76 });
77 self.region_server
78 .handle_request(region_id, request)
79 .await?;
80 Ok(())
81 }
82
83 async fn handle_flush_hint(
85 &self,
86 region_ids: Vec<RegionId>,
87 reason: Option<RegionFlushReason>,
88 ) {
89 let start_time = Instant::now();
90 for region_id in ®ion_ids {
91 let result = self.perform_region_flush(*region_id, reason).await;
92 match result {
93 Ok(_) => {}
94 Err(error::Error::RegionNotFound { .. }) => {
95 warn!(
96 "Received a flush region hint from meta, but target region: {} is not found.",
97 region_id
98 );
99 }
100 Err(err) => {
101 warn!("Failed to flush region: {}, error: {}", region_id, err);
102 }
103 }
104 }
105 let elapsed = start_time.elapsed();
106 debug!(
107 "Flush regions hint: {:?}, elapsed: {:?}",
108 region_ids, elapsed
109 );
110 }
111
112 async fn handle_flush_sync(
114 &self,
115 region_ids: Vec<RegionId>,
116 error_strategy: FlushErrorStrategy,
117 reason: Option<RegionFlushReason>,
118 ) -> FlushRegionReply {
119 let mut results = Vec::with_capacity(region_ids.len());
120
121 for region_id in region_ids {
122 let result = self.flush_single_region_sync(region_id, reason).await;
123
124 match &result {
125 Ok(_) => results.push((region_id, Ok(()))),
126 Err(err) => {
127 let error_string = err.to_string();
129 results.push((region_id, Err(error_string)));
130
131 if matches!(error_strategy, FlushErrorStrategy::FailFast) {
133 break;
134 }
135 }
136 }
137 }
138
139 FlushRegionReply::from_results(results)
140 }
141
142 async fn flush_single_region_sync(
144 &self,
145 region_id: RegionId,
146 reason: Option<RegionFlushReason>,
147 ) -> Result<()> {
148 let Some(writable) = self.region_server.is_region_leader(region_id) else {
150 return Err(RegionNotFoundSnafu { region_id }.build());
151 };
152
153 if !writable {
154 return Err(RegionNotReadySnafu { region_id }.build());
155 }
156
157 let region_server_moved = self.region_server.clone();
159 let register_result = self
160 .flush_tasks
161 .try_register(
162 region_id,
163 Box::pin(async move {
164 region_server_moved
165 .handle_request(
166 region_id,
167 RegionRequest::Flush(RegionFlushRequest {
168 reason,
169 ..Default::default()
170 }),
171 )
172 .await?;
173 Ok(())
174 }),
175 )
176 .await;
177
178 if register_result.is_busy() {
179 warn!("Another flush task is running for the region: {region_id}");
180 }
181
182 let mut watcher = register_result.into_watcher();
183 match self.flush_tasks.wait_until_finish(&mut watcher).await {
184 Ok(()) => Ok(()),
185 Err(err) => Err(UnexpectedSnafu {
186 violated: format!("Flush task failed: {err:?}"),
187 }
188 .build()),
189 }
190 }
191}
192
193#[cfg(test)]
194mod tests {
195 use std::sync::{Arc, RwLock};
196
197 use common_meta::instruction::{FlushErrorStrategy, FlushRegions};
198 use common_meta::kv_backend::memory::MemoryKvBackend;
199 use mito2::engine::MITO_ENGINE_NAME;
200 use store_api::storage::RegionId;
201
202 use super::*;
203 use crate::tests::{MockRegionEngine, mock_region_server};
204
205 type FlushedRequests = Arc<RwLock<Vec<(RegionId, Option<RegionFlushReason>)>>>;
206
207 #[tokio::test]
208 async fn test_handle_flush_region_hint() {
209 let flushed_requests: FlushedRequests = Arc::new(RwLock::new(Vec::new()));
210
211 let mock_region_server = mock_region_server();
212 let region_ids = (0..16).map(|i| RegionId::new(1024, i)).collect::<Vec<_>>();
213 for region_id in ®ion_ids {
214 let flushed_requests_ref = flushed_requests.clone();
215 let (mock_engine, _) =
216 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
217 region_engine.handle_request_mock_fn =
218 Some(Box::new(move |region_id, request| {
219 let RegionRequest::Flush(request) = request else {
220 panic!("Expected flush request");
221 };
222 flushed_requests_ref
223 .write()
224 .unwrap()
225 .push((region_id, request.reason));
226 Ok(0)
227 }))
228 });
229 mock_region_server.register_test_region(*region_id, mock_engine);
230 }
231 let kv_backend = Arc::new(MemoryKvBackend::new());
232 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
233
234 let flush_instruction = FlushRegions::async_batch(region_ids.clone())
236 .with_reason(RegionFlushReason::RemoteWalPrune);
237 let reply = FlushRegionsHandler
238 .handle(&handler_context, flush_instruction)
239 .await;
240 assert!(reply.is_none()); let expected = region_ids
242 .iter()
243 .map(|region_id| (*region_id, Some(RegionFlushReason::RemoteWalPrune)))
244 .collect::<Vec<_>>();
245 assert_eq!(*flushed_requests.read().unwrap(), expected);
246
247 flushed_requests.write().unwrap().clear();
249 let not_found_region_ids = (0..2).map(|i| RegionId::new(2048, i)).collect::<Vec<_>>();
250 let flush_instruction = FlushRegions::async_batch(not_found_region_ids);
251 let reply = FlushRegionsHandler
252 .handle(&handler_context, flush_instruction)
253 .await;
254 assert!(reply.is_none());
255 assert!(flushed_requests.read().unwrap().is_empty());
256 }
257
258 #[tokio::test]
259 async fn test_handle_flush_region_sync_single() {
260 let flushed_requests: FlushedRequests = Arc::new(RwLock::new(Vec::new()));
261
262 let mock_region_server = mock_region_server();
263 let region_id = RegionId::new(1024, 1);
264
265 let flushed_requests_ref = flushed_requests.clone();
266 let (mock_engine, _) =
267 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
268 region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, request| {
269 let RegionRequest::Flush(request) = request else {
270 panic!("Expected flush request");
271 };
272 flushed_requests_ref
273 .write()
274 .unwrap()
275 .push((region_id, request.reason));
276 Ok(0)
277 }))
278 });
279 mock_region_server.register_test_region(region_id, mock_engine);
280 let kv_backend = Arc::new(MemoryKvBackend::new());
281 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
282
283 let flush_instruction =
284 FlushRegions::sync_single(region_id).with_reason(RegionFlushReason::Repartition);
285 let reply = FlushRegionsHandler
286 .handle(&handler_context, flush_instruction)
287 .await;
288 let flush_reply = reply.unwrap().expect_flush_regions_reply();
289 assert!(flush_reply.overall_success);
290 assert_eq!(flush_reply.results.len(), 1);
291 assert_eq!(flush_reply.results[0].0, region_id);
292 assert!(flush_reply.results[0].1.is_ok());
293 assert_eq!(
294 *flushed_requests.read().unwrap(),
295 vec![(region_id, Some(RegionFlushReason::Repartition))]
296 );
297 }
298
299 #[tokio::test]
300 async fn test_handle_flush_region_sync_batch_fail_fast() {
301 let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
302
303 let mock_region_server = mock_region_server();
304 let region_ids = vec![
305 RegionId::new(1024, 1),
306 RegionId::new(1024, 2),
307 RegionId::new(1024, 3),
308 ];
309
310 let flushed_region_ids_ref = flushed_region_ids.clone();
312 let (mock_engine, _) =
313 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
314 region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, _request| {
315 flushed_region_ids_ref.write().unwrap().push(region_id);
316 Ok(0)
317 }))
318 });
319 mock_region_server.register_test_region(region_ids[0], mock_engine);
320 let kv_backend = Arc::new(MemoryKvBackend::new());
321 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
322
323 let flush_instruction =
325 FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::FailFast);
326 let reply = FlushRegionsHandler
327 .handle(&handler_context, flush_instruction)
328 .await;
329 let flush_reply = reply.unwrap().expect_flush_regions_reply();
330 assert!(!flush_reply.overall_success); assert!(flush_reply.results.len() <= region_ids.len());
333 }
334
335 #[tokio::test]
336 async fn test_handle_flush_region_sync_batch_try_all() {
337 let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
338
339 let mock_region_server = mock_region_server();
340 let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
341
342 let flushed_region_ids_ref = flushed_region_ids.clone();
344 let (mock_engine, _) =
345 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
346 region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, _request| {
347 flushed_region_ids_ref.write().unwrap().push(region_id);
348 Ok(0)
349 }))
350 });
351 mock_region_server.register_test_region(region_ids[0], mock_engine);
352 let kv_backend = Arc::new(MemoryKvBackend::new());
353 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
354
355 let flush_instruction =
357 FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::TryAll);
358 let reply = FlushRegionsHandler
359 .handle(&handler_context, flush_instruction)
360 .await;
361 let flush_reply = reply.unwrap().expect_flush_regions_reply();
362 assert!(!flush_reply.overall_success); assert_eq!(flush_reply.results.len(), region_ids.len());
365 assert!(flush_reply.results[0].1.is_ok());
367 assert!(flush_reply.results[1].1.is_err());
368 }
369
370 #[test]
371 fn test_flush_regions_display() {
372 let region_id = RegionId::new(1024, 1);
373 let flush_regions = FlushRegions::sync_single(region_id);
374 let display = format!("{}", flush_regions);
375 assert_eq!(
376 display,
377 "FlushRegions(region_ids=[4398046511105(1024, 1)], strategy=Sync, error_strategy=FailFast, reason=None)"
378 );
379 }
380}