datanode/heartbeat/handler/
flush_region.rs1use std::time::Instant;
16
17use common_meta::instruction::{
18 FlushErrorStrategy, FlushRegionReply, FlushRegions, FlushStrategy, InstructionReply,
19};
20use common_telemetry::{debug, warn};
21use store_api::region_request::{RegionFlushRequest, RegionRequest};
22use store_api::storage::RegionId;
23
24use crate::error::{self, RegionNotFoundSnafu, RegionNotReadySnafu, Result, UnexpectedSnafu};
25use crate::heartbeat::handler::{HandlerContext, InstructionHandler};
26
27pub struct FlushRegionsHandler;
28
29#[async_trait::async_trait]
30impl InstructionHandler for FlushRegionsHandler {
31 type Instruction = FlushRegions;
32
33 async fn handle(
34 &self,
35 ctx: &HandlerContext,
36 flush_regions: FlushRegions,
37 ) -> Option<InstructionReply> {
38 let start_time = Instant::now();
39 let strategy = flush_regions.strategy;
40 let region_ids = flush_regions.region_ids;
41 let error_strategy = flush_regions.error_strategy;
42
43 let reply = if matches!(strategy, FlushStrategy::Async) {
44 ctx.handle_flush_hint(region_ids).await;
46 None
47 } else {
48 let reply = ctx.handle_flush_sync(region_ids, error_strategy).await;
50 Some(InstructionReply::FlushRegions(reply))
51 };
52
53 let elapsed = start_time.elapsed();
54 debug!(
55 "FlushRegions strategy: {:?}, elapsed: {:?}, reply: {:?}",
56 strategy, elapsed, reply
57 );
58
59 reply
60 }
61}
62
63impl HandlerContext {
64 async fn perform_region_flush(&self, region_id: RegionId) -> Result<()> {
66 let request = RegionRequest::Flush(RegionFlushRequest {
67 row_group_size: None,
68 });
69 self.region_server
70 .handle_request(region_id, request)
71 .await?;
72 Ok(())
73 }
74
75 async fn handle_flush_hint(&self, region_ids: Vec<RegionId>) {
77 let start_time = Instant::now();
78 for region_id in ®ion_ids {
79 let result = self.perform_region_flush(*region_id).await;
80 match result {
81 Ok(_) => {}
82 Err(error::Error::RegionNotFound { .. }) => {
83 warn!(
84 "Received a flush region hint from meta, but target region: {} is not found.",
85 region_id
86 );
87 }
88 Err(err) => {
89 warn!("Failed to flush region: {}, error: {}", region_id, err);
90 }
91 }
92 }
93 let elapsed = start_time.elapsed();
94 debug!(
95 "Flush regions hint: {:?}, elapsed: {:?}",
96 region_ids, elapsed
97 );
98 }
99
100 async fn handle_flush_sync(
102 &self,
103 region_ids: Vec<RegionId>,
104 error_strategy: FlushErrorStrategy,
105 ) -> FlushRegionReply {
106 let mut results = Vec::with_capacity(region_ids.len());
107
108 for region_id in region_ids {
109 let result = self.flush_single_region_sync(region_id).await;
110
111 match &result {
112 Ok(_) => results.push((region_id, Ok(()))),
113 Err(err) => {
114 let error_string = err.to_string();
116 results.push((region_id, Err(error_string)));
117
118 if matches!(error_strategy, FlushErrorStrategy::FailFast) {
120 break;
121 }
122 }
123 }
124 }
125
126 FlushRegionReply::from_results(results)
127 }
128
129 async fn flush_single_region_sync(&self, region_id: RegionId) -> Result<()> {
131 let Some(writable) = self.region_server.is_region_leader(region_id) else {
133 return Err(RegionNotFoundSnafu { region_id }.build());
134 };
135
136 if !writable {
137 return Err(RegionNotReadySnafu { region_id }.build());
138 }
139
140 let region_server_moved = self.region_server.clone();
142 let register_result = self
143 .flush_tasks
144 .try_register(
145 region_id,
146 Box::pin(async move {
147 region_server_moved
148 .handle_request(
149 region_id,
150 RegionRequest::Flush(RegionFlushRequest {
151 row_group_size: None,
152 }),
153 )
154 .await?;
155 Ok(())
156 }),
157 )
158 .await;
159
160 if register_result.is_busy() {
161 warn!("Another flush task is running for the region: {region_id}");
162 }
163
164 let mut watcher = register_result.into_watcher();
165 match self.flush_tasks.wait_until_finish(&mut watcher).await {
166 Ok(()) => Ok(()),
167 Err(err) => Err(UnexpectedSnafu {
168 violated: format!("Flush task failed: {err:?}"),
169 }
170 .build()),
171 }
172 }
173}
174
175#[cfg(test)]
176mod tests {
177 use std::sync::{Arc, RwLock};
178
179 use common_meta::instruction::{FlushErrorStrategy, FlushRegions};
180 use common_meta::kv_backend::memory::MemoryKvBackend;
181 use mito2::engine::MITO_ENGINE_NAME;
182 use store_api::storage::RegionId;
183
184 use super::*;
185 use crate::tests::{MockRegionEngine, mock_region_server};
186
187 #[tokio::test]
188 async fn test_handle_flush_region_hint() {
189 let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
190
191 let mock_region_server = mock_region_server();
192 let region_ids = (0..16).map(|i| RegionId::new(1024, i)).collect::<Vec<_>>();
193 for region_id in ®ion_ids {
194 let flushed_region_ids_ref = flushed_region_ids.clone();
195 let (mock_engine, _) =
196 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
197 region_engine.handle_request_mock_fn =
198 Some(Box::new(move |region_id, _request| {
199 flushed_region_ids_ref.write().unwrap().push(region_id);
200 Ok(0)
201 }))
202 });
203 mock_region_server.register_test_region(*region_id, mock_engine);
204 }
205 let kv_backend = Arc::new(MemoryKvBackend::new());
206 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
207
208 let flush_instruction = FlushRegions::async_batch(region_ids.clone());
210 let reply = FlushRegionsHandler
211 .handle(&handler_context, flush_instruction)
212 .await;
213 assert!(reply.is_none()); assert_eq!(*flushed_region_ids.read().unwrap(), region_ids);
215
216 flushed_region_ids.write().unwrap().clear();
218 let not_found_region_ids = (0..2).map(|i| RegionId::new(2048, i)).collect::<Vec<_>>();
219 let flush_instruction = FlushRegions::async_batch(not_found_region_ids);
220 let reply = FlushRegionsHandler
221 .handle(&handler_context, flush_instruction)
222 .await;
223 assert!(reply.is_none());
224 assert!(flushed_region_ids.read().unwrap().is_empty());
225 }
226
227 #[tokio::test]
228 async fn test_handle_flush_region_sync_single() {
229 let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
230
231 let mock_region_server = mock_region_server();
232 let region_id = RegionId::new(1024, 1);
233
234 let flushed_region_ids_ref = flushed_region_ids.clone();
235 let (mock_engine, _) =
236 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
237 region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, _request| {
238 flushed_region_ids_ref.write().unwrap().push(region_id);
239 Ok(0)
240 }))
241 });
242 mock_region_server.register_test_region(region_id, mock_engine);
243 let kv_backend = Arc::new(MemoryKvBackend::new());
244 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
245
246 let flush_instruction = FlushRegions::sync_single(region_id);
247 let reply = FlushRegionsHandler
248 .handle(&handler_context, flush_instruction)
249 .await;
250 let flush_reply = reply.unwrap().expect_flush_regions_reply();
251 assert!(flush_reply.overall_success);
252 assert_eq!(flush_reply.results.len(), 1);
253 assert_eq!(flush_reply.results[0].0, region_id);
254 assert!(flush_reply.results[0].1.is_ok());
255 assert_eq!(*flushed_region_ids.read().unwrap(), vec![region_id]);
256 }
257
258 #[tokio::test]
259 async fn test_handle_flush_region_sync_batch_fail_fast() {
260 let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
261
262 let mock_region_server = mock_region_server();
263 let region_ids = vec![
264 RegionId::new(1024, 1),
265 RegionId::new(1024, 2),
266 RegionId::new(1024, 3),
267 ];
268
269 let flushed_region_ids_ref = flushed_region_ids.clone();
271 let (mock_engine, _) =
272 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
273 region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, _request| {
274 flushed_region_ids_ref.write().unwrap().push(region_id);
275 Ok(0)
276 }))
277 });
278 mock_region_server.register_test_region(region_ids[0], mock_engine);
279 let kv_backend = Arc::new(MemoryKvBackend::new());
280 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
281
282 let flush_instruction =
284 FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::FailFast);
285 let reply = FlushRegionsHandler
286 .handle(&handler_context, flush_instruction)
287 .await;
288 let flush_reply = reply.unwrap().expect_flush_regions_reply();
289 assert!(!flush_reply.overall_success); assert!(flush_reply.results.len() <= region_ids.len());
292 }
293
294 #[tokio::test]
295 async fn test_handle_flush_region_sync_batch_try_all() {
296 let flushed_region_ids: Arc<RwLock<Vec<RegionId>>> = Arc::new(RwLock::new(Vec::new()));
297
298 let mock_region_server = mock_region_server();
299 let region_ids = vec![RegionId::new(1024, 1), RegionId::new(1024, 2)];
300
301 let flushed_region_ids_ref = flushed_region_ids.clone();
303 let (mock_engine, _) =
304 MockRegionEngine::with_custom_apply_fn(MITO_ENGINE_NAME, move |region_engine| {
305 region_engine.handle_request_mock_fn = Some(Box::new(move |region_id, _request| {
306 flushed_region_ids_ref.write().unwrap().push(region_id);
307 Ok(0)
308 }))
309 });
310 mock_region_server.register_test_region(region_ids[0], mock_engine);
311 let kv_backend = Arc::new(MemoryKvBackend::new());
312 let handler_context = HandlerContext::new_for_test(mock_region_server, kv_backend);
313
314 let flush_instruction =
316 FlushRegions::sync_batch(region_ids.clone(), FlushErrorStrategy::TryAll);
317 let reply = FlushRegionsHandler
318 .handle(&handler_context, flush_instruction)
319 .await;
320 let flush_reply = reply.unwrap().expect_flush_regions_reply();
321 assert!(!flush_reply.overall_success); assert_eq!(flush_reply.results.len(), region_ids.len());
324 assert!(flush_reply.results[0].1.is_ok());
326 assert!(flush_reply.results[1].1.is_err());
327 }
328
329 #[test]
330 fn test_flush_regions_display() {
331 let region_id = RegionId::new(1024, 1);
332 let flush_regions = FlushRegions::sync_single(region_id);
333 let display = format!("{}", flush_regions);
334 assert_eq!(
335 display,
336 "FlushRegions(region_ids=[4398046511105(1024, 1)], strategy=Sync, error_strategy=FailFast)"
337 );
338 }
339}