datanode/heartbeat/handler/
flush_region.rs1use std::time::Instant;
16
17use common_meta::instruction::{
18 FlushErrorStrategy, FlushRegionReply, FlushRegions, FlushStrategy, InstructionReply,
19};
20use common_telemetry::{debug, warn};
21use futures_util::future::BoxFuture;
22use store_api::region_request::{RegionFlushRequest, RegionRequest};
23use store_api::storage::RegionId;
24
25use crate::error::{self, RegionNotFoundSnafu, RegionNotReadySnafu, UnexpectedSnafu};
26use crate::heartbeat::handler::HandlerContext;
27
28impl HandlerContext {
29 async fn perform_region_flush(&self, region_id: RegionId) -> Result<(), error::Error> {
31 let request = RegionRequest::Flush(RegionFlushRequest {
32 row_group_size: None,
33 });
34 self.region_server
35 .handle_request(region_id, request)
36 .await?;
37 Ok(())
38 }
39
40 async fn handle_flush_hint(&self, region_ids: Vec<RegionId>) {
42 let start_time = Instant::now();
43 for region_id in ®ion_ids {
44 let result = self.perform_region_flush(*region_id).await;
45 match result {
46 Ok(_) => {}
47 Err(error::Error::RegionNotFound { .. }) => {
48 warn!(
49 "Received a flush region hint from meta, but target region: {} is not found.",
50 region_id
51 );
52 }
53 Err(err) => {
54 warn!("Failed to flush region: {}, error: {}", region_id, err);
55 }
56 }
57 }
58 let elapsed = start_time.elapsed();
59 debug!(
60 "Flush regions hint: {:?}, elapsed: {:?}",
61 region_ids, elapsed
62 );
63 }
64
65 async fn handle_flush_sync(
67 &self,
68 region_ids: Vec<RegionId>,
69 error_strategy: FlushErrorStrategy,
70 ) -> FlushRegionReply {
71 let mut results = Vec::with_capacity(region_ids.len());
72
73 for region_id in region_ids {
74 let result = self.flush_single_region_sync(region_id).await;
75
76 match &result {
77 Ok(_) => results.push((region_id, Ok(()))),
78 Err(err) => {
79 let error_string = err.to_string();
81 results.push((region_id, Err(error_string)));
82
83 if matches!(error_strategy, FlushErrorStrategy::FailFast) {
85 break;
86 }
87 }
88 }
89 }
90
91 FlushRegionReply::from_results(results)
92 }
93
94 async fn flush_single_region_sync(&self, region_id: RegionId) -> Result<(), error::Error> {
96 let Some(writable) = self.region_server.is_region_leader(region_id) else {
98 return Err(RegionNotFoundSnafu { region_id }.build());
99 };
100
101 if !writable {
102 return Err(RegionNotReadySnafu { region_id }.build());
103 }
104
105 let region_server_moved = self.region_server.clone();
107 let register_result = self
108 .flush_tasks
109 .try_register(
110 region_id,
111 Box::pin(async move {
112 region_server_moved
113 .handle_request(
114 region_id,
115 RegionRequest::Flush(RegionFlushRequest {
116 row_group_size: None,
117 }),
118 )
119 .await?;
120 Ok(())
121 }),
122 )
123 .await;
124
125 if register_result.is_busy() {
126 warn!("Another flush task is running for the region: {region_id}");
127 }
128
129 let mut watcher = register_result.into_watcher();
130 match self.flush_tasks.wait_until_finish(&mut watcher).await {
131 Ok(()) => Ok(()),
132 Err(err) => Err(UnexpectedSnafu {
133 violated: format!("Flush task failed: {err:?}"),
134 }
135 .build()),
136 }
137 }
138
139 pub(crate) fn handle_flush_regions_instruction(
141 self,
142 flush_regions: FlushRegions,
143 ) -> BoxFuture<'static, Option<InstructionReply>> {
144 Box::pin(async move {
145 let start_time = Instant::now();
146 let strategy = flush_regions.strategy;
147 let region_ids = flush_regions.region_ids;
148 let error_strategy = flush_regions.error_strategy;
149
150 let reply = if matches!(strategy, FlushStrategy::Async) {
151 self.handle_flush_hint(region_ids).await;
153 None
154 } else {
155 let reply = self.handle_flush_sync(region_ids, error_strategy).await;
157 Some(InstructionReply::FlushRegions(reply))
158 };
159
160 let elapsed = start_time.elapsed();
161 debug!(
162 "FlushRegions strategy: {:?}, elapsed: {:?}, reply: {:?}",
163 strategy, elapsed, reply
164 );
165
166 reply
167 })
168 }
169}
170
171#[cfg(test)]
172mod tests {
173 use std::sync::{Arc, RwLock};
174
175 use common_meta::instruction::{FlushErrorStrategy, FlushRegions};
176 use mito2::engine::MITO_ENGINE_NAME;
177 use store_api::storage::RegionId;
178
179 use super::*;
180 use crate::tests::{mock_region_server, MockRegionEngine};
181
182 #[tokio::test]
183 async fn test_handle_flush_region_hint() {
184 let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
185
186 let mock_region_server = mock_region_server();
187 let region_ids = (0..16).map(|i| RegionId::new(1024, i)).collect::<Vec<_>>();
188 for region_id in ®ion_ids {
189 let flushed_region_ids_ref = flushed_region_ids.clone();
190 let (mock_engine, _) =
191 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
192 region_engine.handle_request_mock_fn =
193 Some(Box::new(move |region_id, _request| {
194 flushed_region_ids_ref.write().unwrap().push(region_id);
195 Ok(0)
196 }))
197 });
198 mock_region_server.register_test_region(*region_id, mock_engine);
199 }
200 let handler_context = HandlerContext::new_for_test(mock_region_server);
201
202 let flush_instruction = FlushRegions::async_batch(region_ids.clone());
204 let reply = handler_context
205 .clone()
206 .handle_flush_regions_instruction(flush_instruction)
207 .await;
208 assert!(reply.is_none()); assert_eq!(*flushed_region_ids.read().unwrap(), region_ids);
210
211 flushed_region_ids.write().unwrap().clear();
213 let not_found_region_ids = (0..2).map(|i| RegionId::new(2048, i)).collect::<Vec<_>>();
214 let flush_instruction = FlushRegions::async_batch(not_found_region_ids);
215 let reply = handler_context
216 .handle_flush_regions_instruction(flush_instruction)
217 .await;
218 assert!(reply.is_none());
219 assert!(flushed_region_ids.read().unwrap().is_empty());
220 }
221
222 #[tokio::test]
223 async fn test_handle_flush_region_sync_single() {
224 let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
225
226 let mock_region_server = mock_region_server();
227 let region_id = RegionId::new(1024, 1);
228
229 let flushed_region_ids_ref = flushed_region_ids.clone();
230 let (mock_engine, _) =
231 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
232 region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, _request| {
233 flushed_region_ids_ref.write().unwrap().push(region_id);
234 Ok(0)
235 }))
236 });
237 mock_region_server.register_test_region(region_id, mock_engine);
238 let handler_context = HandlerContext::new_for_test(mock_region_server);
239
240 let flush_instruction = FlushRegions::sync_single(region_id);
241 let reply = handler_context
242 .handle_flush_regions_instruction(flush_instruction)
243 .await;
244
245 assert!(reply.is_some());
246 if let Some(InstructionReply::FlushRegions(flush_reply)) = reply {
247 assert!(flush_reply.overall_success);
248 assert_eq!(flush_reply.results.len(), 1);
249 assert_eq!(flush_reply.results[0].0, region_id);
250 assert!(flush_reply.results[0].1.is_ok());
251 } else {
252 panic!("Expected FlushRegions reply");
253 }
254
255 assert_eq!(*flushed_region_ids.read().unwrap(), vec![region_id]);
256 }
257
258 #[tokio::test]
259 async fn test_handle_flush_region_sync_batch_fail_fast() {
260 let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
261
262 let mock_region_server = mock_region_server();
263 let region_ids = vec![
264 RegionId::new(1024, 1),
265 RegionId::new(1024, 2),
266 RegionId::new(1024, 3),
267 ];
268
269 let flushed_region_ids_ref = flushed_region_ids.clone();
271 let (mock_engine, _) =
272 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
273 region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, _request| {
274 flushed_region_ids_ref.write().unwrap().push(region_id);
275 Ok(0)
276 }))
277 });
278 mock_region_server.register_test_region(region_ids[0], mock_engine);
279 let handler_context = HandlerContext::new_for_test(mock_region_server);
280
281 let flush_instruction =
283 FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::FailFast);
284 let reply = handler_context
285 .handle_flush_regions_instruction(flush_instruction)
286 .await;
287
288 assert!(reply.is_some());
289 if let Some(InstructionReply::FlushRegions(flush_reply)) = reply {
290 assert!(!flush_reply.overall_success); assert!(flush_reply.results.len() <= region_ids.len());
293 } else {
294 panic!("Expected FlushRegions reply");
295 }
296 }
297
298 #[tokio::test]
299 async fn test_handle_flush_region_sync_batch_try_all() {
300 let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
301
302 let mock_region_server = mock_region_server();
303 let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
304
305 let flushed_region_ids_ref = flushed_region_ids.clone();
307 let (mock_engine, _) =
308 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
309 region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, _request| {
310 flushed_region_ids_ref.write().unwrap().push(region_id);
311 Ok(0)
312 }))
313 });
314 mock_region_server.register_test_region(region_ids[0], mock_engine);
315 let handler_context = HandlerContext::new_for_test(mock_region_server);
316
317 let flush_instruction =
319 FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::TryAll);
320 let reply = handler_context
321 .handle_flush_regions_instruction(flush_instruction)
322 .await;
323
324 assert!(reply.is_some());
325 if let Some(InstructionReply::FlushRegions(flush_reply)) = reply {
326 assert!(!flush_reply.overall_success); assert_eq!(flush_reply.results.len(), region_ids.len());
329 assert!(flush_reply.results[0].1.is_ok());
331 assert!(flush_reply.results[1].1.is_err());
332 } else {
333 panic!("Expected FlushRegions reply");
334 }
335 }
336}