datanode/heartbeat/handler/
flush_region.rs1use std::time::Instant;
16
17use common_meta::instruction::{
18 FlushErrorStrategy, FlushRegionReply, FlushRegions, FlushStrategy, InstructionReply,
19};
20use common_telemetry::{debug, warn};
21use store_api::region_request::{RegionFlushRequest, RegionRequest};
22use store_api::storage::RegionId;
23
24use crate::error::{self, RegionNotFoundSnafu, RegionNotReadySnafu, Result, UnexpectedSnafu};
25use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
26
27pub struct FlushRegionsHandler;
28
29#[async_trait::async_trait]
30impl InstructionHandler for FlushRegionsHandler {
31 type Instruction = FlushRegions;
32
33 async fn handle(
34 &self,
35 ctx: &HandlerContext,
36 flush_regions: FlushRegions,
37 ) -> Option<InstructionReply> {
38 let start_time = Instant::now();
39 let strategy = flush_regions.strategy;
40 let region_ids = flush_regions.region_ids;
41 let error_strategy = flush_regions.error_strategy;
42
43 let reply = if matches!(strategy, FlushStrategy::Async) {
44 ctx.handle_flush_hint(region_ids).await;
46 None
47 } else {
48 let reply = ctx.handle_flush_sync(region_ids, error_strategy).await;
50 Some(InstructionReply::FlushRegions(reply))
51 };
52
53 let elapsed = start_time.elapsed();
54 debug!(
55 "FlushRegions strategy: {:?}, elapsed: {:?}, reply: {:?}",
56 strategy, elapsed, reply
57 );
58
59 reply
60 }
61}
62
63impl HandlerContext {
64 async fn perform_region_flush(&self, region_id: RegionId) -> Result<()> {
66 let request = RegionRequest::Flush(RegionFlushRequest {
67 row_group_size: None,
68 });
69 self.region_server
70 .handle_request(region_id, request)
71 .await?;
72 Ok(())
73 }
74
75 async fn handle_flush_hint(&self, region_ids: Vec<RegionId>) {
77 let start_time = Instant::now();
78 for region_id in ®ion_ids {
79 let result = self.perform_region_flush(*region_id).await;
80 match result {
81 Ok(_) => {}
82 Err(error::Error::RegionNotFound { .. }) => {
83 warn!(
84 "Received a flush region hint from meta, but target region: {} is not found.",
85 region_id
86 );
87 }
88 Err(err) => {
89 warn!("Failed to flush region: {}, error: {}", region_id, err);
90 }
91 }
92 }
93 let elapsed = start_time.elapsed();
94 debug!(
95 "Flush regions hint: {:?}, elapsed: {:?}",
96 region_ids, elapsed
97 );
98 }
99
100 async fn handle_flush_sync(
102 &self,
103 region_ids: Vec<RegionId>,
104 error_strategy: FlushErrorStrategy,
105 ) -> FlushRegionReply {
106 let mut results = Vec::with_capacity(region_ids.len());
107
108 for region_id in region_ids {
109 let result = self.flush_single_region_sync(region_id).await;
110
111 match &result {
112 Ok(_) => results.push((region_id, Ok(()))),
113 Err(err) => {
114 let error_string = err.to_string();
116 results.push((region_id, Err(error_string)));
117
118 if matches!(error_strategy, FlushErrorStrategy::FailFast) {
120 break;
121 }
122 }
123 }
124 }
125
126 FlushRegionReply::from_results(results)
127 }
128
129 async fn flush_single_region_sync(&self, region_id: RegionId) -> Result<()> {
131 let Some(writable) = self.region_server.is_region_leader(region_id) else {
133 return Err(RegionNotFoundSnafu { region_id }.build());
134 };
135
136 if !writable {
137 return Err(RegionNotReadySnafu { region_id }.build());
138 }
139
140 let region_server_moved = self.region_server.clone();
142 let register_result = self
143 .flush_tasks
144 .try_register(
145 region_id,
146 Box::pin(async move {
147 region_server_moved
148 .handle_request(
149 region_id,
150 RegionRequest::Flush(RegionFlushRequest {
151 row_group_size: None,
152 }),
153 )
154 .await?;
155 Ok(())
156 }),
157 )
158 .await;
159
160 if register_result.is_busy() {
161 warn!("Another flush task is running for the region: {region_id}");
162 }
163
164 let mut watcher = register_result.into_watcher();
165 match self.flush_tasks.wait_until_finish(&mut watcher).await {
166 Ok(()) => Ok(()),
167 Err(err) => Err(UnexpectedSnafu {
168 violated: format!("Flush task failed: {err:?}"),
169 }
170 .build()),
171 }
172 }
173}
174
175#[cfg(test)]
176mod tests {
177 use std::sync::{Arc, RwLock};
178
179 use common_meta::instruction::{FlushErrorStrategy, FlushRegions};
180 use mito2::engine::MITO_ENGINE_NAME;
181 use store_api::storage::RegionId;
182
183 use super::*;
184 use crate::tests::{MockRegionEngine, mock_region_server};
185
186 #[tokio::test]
187 async fn test_handle_flush_region_hint() {
188 let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
189
190 let mock_region_server = mock_region_server();
191 let region_ids = (0..16).map(|i| RegionId::new(1024, i)).collect::<Vec<_>>();
192 for region_id in ®ion_ids {
193 let flushed_region_ids_ref = flushed_region_ids.clone();
194 let (mock_engine, _) =
195 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
196 region_engine.handle_request_mock_fn =
197 Some(Box::new(move |region_id, _request| {
198 flushed_region_ids_ref.write().unwrap().push(region_id);
199 Ok(0)
200 }))
201 });
202 mock_region_server.register_test_region(*region_id, mock_engine);
203 }
204 let handler_context = HandlerContext::new_for_test(mock_region_server);
205
206 let flush_instruction = FlushRegions::async_batch(region_ids.clone());
208 let reply = FlushRegionsHandler
209 .handle(&handler_context, flush_instruction)
210 .await;
211 assert!(reply.is_none()); assert_eq!(*flushed_region_ids.read().unwrap(), region_ids);
213
214 flushed_region_ids.write().unwrap().clear();
216 let not_found_region_ids = (0..2).map(|i| RegionId::new(2048, i)).collect::<Vec<_>>();
217 let flush_instruction = FlushRegions::async_batch(not_found_region_ids);
218 let reply = FlushRegionsHandler
219 .handle(&handler_context, flush_instruction)
220 .await;
221 assert!(reply.is_none());
222 assert!(flushed_region_ids.read().unwrap().is_empty());
223 }
224
225 #[tokio::test]
226 async fn test_handle_flush_region_sync_single() {
227 let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
228
229 let mock_region_server = mock_region_server();
230 let region_id = RegionId::new(1024, 1);
231
232 let flushed_region_ids_ref = flushed_region_ids.clone();
233 let (mock_engine, _) =
234 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
235 region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, _request| {
236 flushed_region_ids_ref.write().unwrap().push(region_id);
237 Ok(0)
238 }))
239 });
240 mock_region_server.register_test_region(region_id, mock_engine);
241 let handler_context = HandlerContext::new_for_test(mock_region_server);
242
243 let flush_instruction = FlushRegions::sync_single(region_id);
244 let reply = FlushRegionsHandler
245 .handle(&handler_context, flush_instruction)
246 .await;
247 let flush_reply = reply.unwrap().expect_flush_regions_reply();
248 assert!(flush_reply.overall_success);
249 assert_eq!(flush_reply.results.len(), 1);
250 assert_eq!(flush_reply.results[0].0, region_id);
251 assert!(flush_reply.results[0].1.is_ok());
252 assert_eq!(*flushed_region_ids.read().unwrap(), vec![region_id]);
253 }
254
255 #[tokio::test]
256 async fn test_handle_flush_region_sync_batch_fail_fast() {
257 let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
258
259 let mock_region_server = mock_region_server();
260 let region_ids = vec![
261 RegionId::new(1024, 1),
262 RegionId::new(1024, 2),
263 RegionId::new(1024, 3),
264 ];
265
266 let flushed_region_ids_ref = flushed_region_ids.clone();
268 let (mock_engine, _) =
269 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
270 region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, _request| {
271 flushed_region_ids_ref.write().unwrap().push(region_id);
272 Ok(0)
273 }))
274 });
275 mock_region_server.register_test_region(region_ids[0], mock_engine);
276 let handler_context = HandlerContext::new_for_test(mock_region_server);
277
278 let flush_instruction =
280 FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::FailFast);
281 let reply = FlushRegionsHandler
282 .handle(&handler_context, flush_instruction)
283 .await;
284 let flush_reply = reply.unwrap().expect_flush_regions_reply();
285 assert!(!flush_reply.overall_success); assert!(flush_reply.results.len() <= region_ids.len());
288 }
289
290 #[tokio::test]
291 async fn test_handle_flush_region_sync_batch_try_all() {
292 let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
293
294 let mock_region_server = mock_region_server();
295 let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
296
297 let flushed_region_ids_ref = flushed_region_ids.clone();
299 let (mock_engine, _) =
300 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
301 region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, _request| {
302 flushed_region_ids_ref.write().unwrap().push(region_id);
303 Ok(0)
304 }))
305 });
306 mock_region_server.register_test_region(region_ids[0], mock_engine);
307 let handler_context = HandlerContext::new_for_test(mock_region_server);
308
309 let flush_instruction =
311 FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::TryAll);
312 let reply = FlushRegionsHandler
313 .handle(&handler_context, flush_instruction)
314 .await;
315 let flush_reply = reply.unwrap().expect_flush_regions_reply();
316 assert!(!flush_reply.overall_success); assert_eq!(flush_reply.results.len(), region_ids.len());
319 assert!(flush_reply.results[0].1.is_ok());
321 assert!(flush_reply.results[1].1.is_err());
322 }
323}