datanode/heartbeat/handler/
flush_region.rs1use std::time::Instant;
16
17use common_meta::instruction::{
18 FlushErrorStrategy, FlushRegionReply, FlushStrategy, Instruction, InstructionReply,
19};
20use common_telemetry::{debug, warn};
21use store_api::region_request::{RegionFlushRequest, RegionRequest};
22use store_api::storage::RegionId;
23
24use crate::error::{self, RegionNotFoundSnafu, RegionNotReadySnafu, Result, UnexpectedSnafu};
25use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
26
27pub struct FlushRegionsHandler;
28
29#[async_trait::async_trait]
30impl InstructionHandler for FlushRegionsHandler {
31 async fn handle(
32 &self,
33 ctx: &HandlerContext,
34 instruction: Instruction,
35 ) -> Option<InstructionReply> {
36 let start_time = Instant::now();
37 let flush_regions = instruction.into_flush_regions().unwrap();
38 let strategy = flush_regions.strategy;
39 let region_ids = flush_regions.region_ids;
40 let error_strategy = flush_regions.error_strategy;
41
42 let reply = if matches!(strategy, FlushStrategy::Async) {
43 ctx.handle_flush_hint(region_ids).await;
45 None
46 } else {
47 let reply = ctx.handle_flush_sync(region_ids, error_strategy).await;
49 Some(InstructionReply::FlushRegions(reply))
50 };
51
52 let elapsed = start_time.elapsed();
53 debug!(
54 "FlushRegions strategy: {:?}, elapsed: {:?}, reply: {:?}",
55 strategy, elapsed, reply
56 );
57
58 reply
59 }
60}
61
62impl HandlerContext {
63 async fn perform_region_flush(&self, region_id: RegionId) -> Result<()> {
65 let request = RegionRequest::Flush(RegionFlushRequest {
66 row_group_size: None,
67 });
68 self.region_server
69 .handle_request(region_id, request)
70 .await?;
71 Ok(())
72 }
73
74 async fn handle_flush_hint(&self, region_ids: Vec<RegionId>) {
76 let start_time = Instant::now();
77 for region_id in ®ion_ids {
78 let result = self.perform_region_flush(*region_id).await;
79 match result {
80 Ok(_) => {}
81 Err(error::Error::RegionNotFound { .. }) => {
82 warn!(
83 "Received a flush region hint from meta, but target region: {} is not found.",
84 region_id
85 );
86 }
87 Err(err) => {
88 warn!("Failed to flush region: {}, error: {}", region_id, err);
89 }
90 }
91 }
92 let elapsed = start_time.elapsed();
93 debug!(
94 "Flush regions hint: {:?}, elapsed: {:?}",
95 region_ids, elapsed
96 );
97 }
98
99 async fn handle_flush_sync(
101 &self,
102 region_ids: Vec<RegionId>,
103 error_strategy: FlushErrorStrategy,
104 ) -> FlushRegionReply {
105 let mut results = Vec::with_capacity(region_ids.len());
106
107 for region_id in region_ids {
108 let result = self.flush_single_region_sync(region_id).await;
109
110 match &result {
111 Ok(_) => results.push((region_id, Ok(()))),
112 Err(err) => {
113 let error_string = err.to_string();
115 results.push((region_id, Err(error_string)));
116
117 if matches!(error_strategy, FlushErrorStrategy::FailFast) {
119 break;
120 }
121 }
122 }
123 }
124
125 FlushRegionReply::from_results(results)
126 }
127
128 async fn flush_single_region_sync(&self, region_id: RegionId) -> Result<()> {
130 let Some(writable) = self.region_server.is_region_leader(region_id) else {
132 return Err(RegionNotFoundSnafu { region_id }.build());
133 };
134
135 if !writable {
136 return Err(RegionNotReadySnafu { region_id }.build());
137 }
138
139 let region_server_moved = self.region_server.clone();
141 let register_result = self
142 .flush_tasks
143 .try_register(
144 region_id,
145 Box::pin(async move {
146 region_server_moved
147 .handle_request(
148 region_id,
149 RegionRequest::Flush(RegionFlushRequest {
150 row_group_size: None,
151 }),
152 )
153 .await?;
154 Ok(())
155 }),
156 )
157 .await;
158
159 if register_result.is_busy() {
160 warn!("Another flush task is running for the region: {region_id}");
161 }
162
163 let mut watcher = register_result.into_watcher();
164 match self.flush_tasks.wait_until_finish(&mut watcher).await {
165 Ok(()) => Ok(()),
166 Err(err) => Err(UnexpectedSnafu {
167 violated: format!("Flush task failed: {err:?}"),
168 }
169 .build()),
170 }
171 }
172}
173
174#[cfg(test)]
175mod tests {
176 use std::sync::{Arc, RwLock};
177
178 use common_meta::instruction::{FlushErrorStrategy, FlushRegions};
179 use mito2::engine::MITO_ENGINE_NAME;
180 use store_api::storage::RegionId;
181
182 use super::*;
183 use crate::tests::{MockRegionEngine, mock_region_server};
184
185 #[tokio::test]
186 async fn test_handle_flush_region_hint() {
187 let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
188
189 let mock_region_server = mock_region_server();
190 let region_ids = (0..16).map(|i| RegionId::new(1024, i)).collect::<Vec<_>>();
191 for region_id in ®ion_ids {
192 let flushed_region_ids_ref = flushed_region_ids.clone();
193 let (mock_engine, _) =
194 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
195 region_engine.handle_request_mock_fn =
196 Some(Box::new(move |region_id, _request| {
197 flushed_region_ids_ref.write().unwrap().push(region_id);
198 Ok(0)
199 }))
200 });
201 mock_region_server.register_test_region(*region_id, mock_engine);
202 }
203 let handler_context = HandlerContext::new_for_test(mock_region_server);
204
205 let flush_instruction = FlushRegions::async_batch(region_ids.clone());
207 let reply = FlushRegionsHandler
208 .handle(
209 &handler_context,
210 Instruction::FlushRegions(flush_instruction),
211 )
212 .await;
213 assert!(reply.is_none()); assert_eq!(*flushed_region_ids.read().unwrap(), region_ids);
215
216 flushed_region_ids.write().unwrap().clear();
218 let not_found_region_ids = (0..2).map(|i| RegionId::new(2048, i)).collect::<Vec<_>>();
219 let flush_instruction = FlushRegions::async_batch(not_found_region_ids);
220 let reply = FlushRegionsHandler
221 .handle(
222 &handler_context,
223 Instruction::FlushRegions(flush_instruction),
224 )
225 .await;
226 assert!(reply.is_none());
227 assert!(flushed_region_ids.read().unwrap().is_empty());
228 }
229
230 #[tokio::test]
231 async fn test_handle_flush_region_sync_single() {
232 let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
233
234 let mock_region_server = mock_region_server();
235 let region_id = RegionId::new(1024, 1);
236
237 let flushed_region_ids_ref = flushed_region_ids.clone();
238 let (mock_engine, _) =
239 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
240 region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, _request| {
241 flushed_region_ids_ref.write().unwrap().push(region_id);
242 Ok(0)
243 }))
244 });
245 mock_region_server.register_test_region(region_id, mock_engine);
246 let handler_context = HandlerContext::new_for_test(mock_region_server);
247
248 let flush_instruction = FlushRegions::sync_single(region_id);
249 let reply = FlushRegionsHandler
250 .handle(
251 &handler_context,
252 Instruction::FlushRegions(flush_instruction),
253 )
254 .await;
255 let flush_reply = reply.unwrap().expect_flush_regions_reply();
256 assert!(flush_reply.overall_success);
257 assert_eq!(flush_reply.results.len(), 1);
258 assert_eq!(flush_reply.results[0].0, region_id);
259 assert!(flush_reply.results[0].1.is_ok());
260 assert_eq!(*flushed_region_ids.read().unwrap(), vec![region_id]);
261 }
262
263 #[tokio::test]
264 async fn test_handle_flush_region_sync_batch_fail_fast() {
265 let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
266
267 let mock_region_server = mock_region_server();
268 let region_ids = vec![
269 RegionId::new(1024, 1),
270 RegionId::new(1024, 2),
271 RegionId::new(1024, 3),
272 ];
273
274 let flushed_region_ids_ref = flushed_region_ids.clone();
276 let (mock_engine, _) =
277 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
278 region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, _request| {
279 flushed_region_ids_ref.write().unwrap().push(region_id);
280 Ok(0)
281 }))
282 });
283 mock_region_server.register_test_region(region_ids[0], mock_engine);
284 let handler_context = HandlerContext::new_for_test(mock_region_server);
285
286 let flush_instruction =
288 FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::FailFast);
289 let reply = FlushRegionsHandler
290 .handle(
291 &handler_context,
292 Instruction::FlushRegions(flush_instruction),
293 )
294 .await;
295 let flush_reply = reply.unwrap().expect_flush_regions_reply();
296 assert!(!flush_reply.overall_success); assert!(flush_reply.results.len() <= region_ids.len());
299 }
300
301 #[tokio::test]
302 async fn test_handle_flush_region_sync_batch_try_all() {
303 let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
304
305 let mock_region_server = mock_region_server();
306 let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
307
308 let flushed_region_ids_ref = flushed_region_ids.clone();
310 let (mock_engine, _) =
311 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
312 region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, _request| {
313 flushed_region_ids_ref.write().unwrap().push(region_id);
314 Ok(0)
315 }))
316 });
317 mock_region_server.register_test_region(region_ids[0], mock_engine);
318 let handler_context = HandlerContext::new_for_test(mock_region_server);
319
320 let flush_instruction =
322 FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::TryAll);
323 let reply = FlushRegionsHandler
324 .handle(
325 &handler_context,
326 Instruction::FlushRegions(flush_instruction),
327 )
328 .await;
329 let flush_reply = reply.unwrap().expect_flush_regions_reply();
330 assert!(!flush_reply.overall_success); assert_eq!(flush_reply.results.len(), region_ids.len());
333 assert!(flush_reply.results[0].1.is_ok());
335 assert!(flush_reply.results[1].1.is_err());
336 }
337}